Files
rmser/ocr-service/python_project_dump.py

29 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# PYTHON PROJECT DUMP
# GENERATED BY pack_python_project.py
project_tree = '''
ocr-service/
├── .dockerignore
├── Dockerfile
├── imgproc.py
├── llm_parser.py
├── main.py
├── ocr.py
├── parser.py
├── qr_manager.py
├── requirements.txt
└── yandex_ocr.py
'''
project_files = {
"imgproc.py": "import cv2\nimport numpy as np\nimport logging\n\nlogger = logging.getLogger(__name__)\n\ndef order_points(pts):\n rect = np.zeros((4, 2), dtype=\"float32\")\n s = pts.sum(axis=1)\n rect[0] = pts[np.argmin(s)]\n rect[2] = pts[np.argmax(s)]\n diff = np.diff(pts, axis=1)\n rect[1] = pts[np.argmin(diff)]\n rect[3] = pts[np.argmax(diff)]\n return rect\n\ndef four_point_transform(image, pts):\n rect = order_points(pts)\n (tl, tr, br, bl) = rect\n\n widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))\n widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))\n maxWidth = max(int(widthA), int(widthB))\n\n heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))\n heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))\n maxHeight = max(int(heightA), int(heightB))\n\n dst = np.array([\n [0, 0],\n [maxWidth - 1, 0],\n [maxWidth - 1, maxHeight - 1],\n [0, maxHeight - 1]], dtype=\"float32\")\n\n M = cv2.getPerspectiveTransform(rect, dst)\n warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))\n return warped\n\ndef preprocess_image(image_bytes: bytes) -> np.ndarray:\n \"\"\"\n Возвращает БИНАРНОЕ (Ч/Б) изображение для Tesseract.\n \"\"\"\n nparr = np.frombuffer(image_bytes, np.uint8)\n image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n \n if image is None:\n raise ValueError(\"Could not decode image\")\n\n # Ресайз для поиска контуров\n ratio = image.shape[0] / 500.0\n orig = image.copy()\n image_small = cv2.resize(image, (int(image.shape[1] / ratio), 500))\n\n gray = cv2.cvtColor(image_small, cv2.COLOR_BGR2GRAY)\n gray = cv2.GaussianBlur(gray, (5, 5), 0)\n edged = cv2.Canny(gray, 75, 200)\n\n cnts, _ = cv2.findContours(edged.copy(), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)\n cnts = sorted(cnts, key=cv2.contourArea, reverse=True)[:5]\n\n screenCnt = None\n found = False\n\n for c in cnts:\n peri = cv2.arcLength(c, True)\n approx = cv2.approxPolyDP(c, 0.02 * peri, True)\n if len(approx) == 4:\n screenCnt = approx\n found = True\n break\n\n # Изображение, с которым будем работать дальше\n target_img = None\n\n if found:\n logger.info(\"Receipt contour found (Tesseract mode).\")\n target_img = four_point_transform(orig, screenCnt.reshape(4, 2) * ratio)\n else:\n logger.warning(\"Receipt contour NOT found. Using full image.\")\n target_img = orig\n\n # --- Подготовка для Tesseract (Бинаризация) ---\n # Переводим в Gray\n gray_final = cv2.cvtColor(target_img, cv2.COLOR_BGR2GRAY)\n \n # Адаптивный порог (превращаем в чисто черное и белое)\n # block_size=11, C=2 - классические параметры для текста\n thresh = cv2.adaptiveThreshold(\n gray_final, 255, \n cv2.ADAPTIVE_THRESH_GAUSSIAN_C, \n cv2.THRESH_BINARY, 11, 2\n )\n \n # Немного убираем шум\n # thresh = cv2.medianBlur(thresh, 3) \n \n return thresh",
"llm_parser.py": "import os\nimport requests\nimport logging\nimport json\nimport uuid\nimport time\nfrom typing import List, Optional\nfrom parser import ParsedItem\n\nlogger = logging.getLogger(__name__)\n\nYANDEX_GPT_URL = \"https://llm.api.cloud.yandex.net/foundationModels/v1/completion\"\nGIGACHAT_OAUTH_URL = \"https://ngw.devices.sberbank.ru:9443/api/v2/oauth\"\nGIGACHAT_COMPLETION_URL = \"https://gigachat.devices.sberbank.ru/api/v1/chat/completions\"\n\nclass YandexGPTParser:\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n self.api_key = os.getenv(\"YANDEX_OAUTH_TOKEN\")\n\n def parse(self, raw_text: str, iam_token: str) -> List[ParsedItem]:\n if not iam_token:\n return []\n \n prompt = {\n \"modelUri\": f\"gpt://{self.folder_id}/yandexgpt/latest\",\n \"completionOptions\": {\"stream\": False, \"temperature\": 0.1, \"maxTokens\": \"2000\"},\n \"messages\": [\n {\n \"role\": \"system\",\n \"text\": (\n \"Ты — помощник по бухгалтерии. Извлеки список товаров из текста документа. \"\n \"Верни ответ строго в формате JSON: \"\n '[{\"raw_name\": string, \"amount\": float, \"price\": float, \"sum\": float}]. '\n \"Если количество не указано, считай 1.0. Не пиши ничего, кроме JSON.\"\n )\n },\n {\"role\": \"user\", \"text\": raw_text}\n ]\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id\n }\n\n try:\n response = requests.post(YANDEX_GPT_URL, headers=headers, json=prompt, timeout=30)\n response.raise_for_status()\n content = response.json()['result']['alternatives'][0]['message']['text']\n clean_json = content.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n return [ParsedItem(**item) for item in json.loads(clean_json)]\n except Exception as e:\n logger.error(f\"YandexGPT Parsing error: {e}\")\n return []\n\nclass GigaChatParser:\n def __init__(self):\n self.auth_key = os.getenv(\"GIGACHAT_AUTH_KEY\")\n self._access_token = None\n self._expires_at = 0\n\n def _get_token(self) -> Optional[str]:\n if self._access_token and time.time() < self._expires_at:\n return self._access_token\n\n logger.info(\"Obtaining GigaChat access token...\")\n headers = {\n 'Content-Type': 'application/x-www-form-urlencoded',\n 'Accept': 'application/json',\n 'RqUID': str(uuid.uuid4()),\n 'Authorization': f'Basic {self.auth_key}'\n }\n payload = {'scope': 'GIGACHAT_API_PERS'}\n\n try:\n # verify=False может понадобиться, если сертификаты Минцифры не в системном хранилище, \n # но вы указали, что установите их в контейнер.\n response = requests.post(GIGACHAT_OAUTH_URL, headers=headers, data=payload, timeout=10)\n response.raise_for_status()\n data = response.json()\n self._access_token = data['access_token']\n self._expires_at = data['expires_at'] / 1000 # Переводим мс в сек\n return self._access_token\n except Exception as e:\n logger.error(f\"GigaChat Auth error: {e}\")\n return None\n\n def parse(self, raw_text: str) -> List[ParsedItem]:\n token = self._get_token()\n if not token:\n return []\n\n headers = {\n 'Content-Type': 'application/json',\n 'Accept': 'application/json',\n 'Authorization': f'Bearer {token}'\n }\n\n payload = {\n \"model\": \"GigaChat\",\n \"messages\": [\n {\n \"role\": \"system\",\n \"content\": (\n \"Ты — эксперт по распознаванию чеков. Извлеки товары из текста. \"\n \"Верни ТОЛЬКО JSON массив объектов с полями: raw_name (строка), \"\n \"amount (число), price (число), sum (число). \"\n \"Если данных нет, верни []. Никаких пояснений.\"\n )\n },\n {\"role\": \"user\", \"content\": raw_text}\n ],\n \"temperature\": 0.1\n }\n\n try:\n response = requests.post(GIGACHAT_COMPLETION_URL, headers=headers, json=payload, timeout=30)\n response.raise_for_status()\n content = response.json()['choices'][0]['message']['content']\n clean_json = content.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n return [ParsedItem(**item) for item in json.loads(clean_json)]\n except Exception as e:\n logger.error(f\"GigaChat Parsing error: {e}\")\n return []\n\nclass LLMManager:\n def __init__(self):\n self.yandex = YandexGPTParser()\n self.giga = GigaChatParser()\n self.engine = os.getenv(\"LLM_ENGINE\", \"yandex\").lower()\n\n def parse_with_priority(self, raw_text: str, yandex_iam_token: Optional[str] = None) -> List[ParsedItem]:\n if self.engine == \"gigachat\":\n logger.info(\"Using GigaChat as primary LLM\")\n items = self.giga.parse(raw_text)\n if not items and yandex_iam_token:\n logger.info(\"GigaChat failed, falling back to YandexGPT\")\n items = self.yandex.parse(raw_text, yandex_iam_token)\n return items\n else:\n logger.info(\"Using YandexGPT as primary LLM\")\n items = self.yandex.parse(raw_text, yandex_iam_token) if yandex_iam_token else []\n if not items:\n logger.info(\"YandexGPT failed, falling back to GigaChat\")\n items = self.giga.parse(raw_text)\n return items\n\nllm_parser = LLMManager()",
"main.py": "import logging\nimport os\nfrom typing import List\n\nfrom fastapi import FastAPI, File, UploadFile, HTTPException\nfrom pydantic import BaseModel\nimport cv2\nimport numpy as np\n\n# Импортируем модули\nfrom imgproc import preprocess_image\nfrom parser import parse_receipt_text, ParsedItem, extract_fiscal_data\nfrom ocr import ocr_engine\nfrom qr_manager import detect_and_decode_qr, fetch_data_from_api\n# Импортируем новый модуль\nfrom yandex_ocr import yandex_engine\nfrom llm_parser import llm_parser\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\napp = FastAPI(title=\"RMSER OCR Service (Hybrid: QR + Yandex + Tesseract)\")\n\nclass RecognitionResult(BaseModel):\n source: str # 'qr_api', 'yandex_vision', 'tesseract_ocr'\n items: List[ParsedItem]\n raw_text: str = \"\"\n\n@app.get(\"/health\")\ndef health_check():\n return {\"status\": \"ok\"}\n\n@app.post(\"/recognize\", response_model=RecognitionResult)\nasync def recognize_receipt(image: UploadFile = File(...)):\n \"\"\"\n Стратегия:\n 1. QR Code + FNS API (Приоритет 1 - Идеальная точность)\n 2. Yandex Vision OCR (Приоритет 2 - Высокая точность, если настроен)\n 3. Tesseract OCR (Приоритет 3 - Локальный фолбэк)\n \"\"\"\n logger.info(f\"Received file: {image.filename}, content_type: {image.content_type}\")\n\n if not image.content_type.startswith(\"image/\"):\n raise HTTPException(status_code=400, detail=\"File must be an image\")\n\n try:\n # Читаем сырые байты\n content = await image.read()\n \n # Конвертируем в numpy для QR и локального препроцессинга\n nparr = np.frombuffer(content, np.uint8)\n original_cv_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n\n if original_cv_image is None:\n raise HTTPException(status_code=400, detail=\"Invalid image data\")\n\n # --- ЭТАП 1: QR Code Strategy ---\n logger.info(\"--- Stage 1: QR Code Detection ---\")\n qr_raw = detect_and_decode_qr(original_cv_image)\n \n if qr_raw:\n logger.info(\"QR found! Fetching data from API...\")\n api_items = fetch_data_from_api(qr_raw)\n \n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via QR API.\")\n return RecognitionResult(\n source=\"qr_api\",\n items=api_items,\n raw_text=f\"QR Content: {qr_raw}\"\n )\n else:\n logger.warning(\"QR found but API failed. Falling back to OCR.\")\n else:\n logger.info(\"QR code not found. Proceeding to OCR.\")\n\n # --- ЭТАП 2: OCR + Virtual QR Strategy ---\n if yandex_engine.oauth_token and yandex_engine.folder_id:\n logger.info(\"--- Stage 2: Yandex Vision OCR + Virtual QR ---\")\n yandex_text = yandex_engine.recognize(content)\n \n if yandex_text and len(yandex_text) > 10:\n logger.info(f\"OCR success. Raw text length: {len(yandex_text)}\")\n \n # Попытка собрать виртуальный QR из текста\n virtual_qr = extract_fiscal_data(yandex_text)\n if virtual_qr:\n logger.info(f\"Virtual QR constructed: {virtual_qr}\")\n api_items = fetch_data_from_api(virtual_qr)\n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via Virtual QR API.\")\n return RecognitionResult(\n source=\"virtual_qr_api\",\n items=api_items,\n raw_text=yandex_text\n )\n \n # Если виртуальный QR не сработал, пробуем Regex\n yandex_items = parse_receipt_text(yandex_text)\n \n # Если Regex пуст — вызываем LLM (GigaChat / YandexGPT)\n if not yandex_items:\n logger.info(\"Regex found nothing. Calling LLM Manager...\")\n iam_token = yandex_engine._get_iam_token()\n yandex_items = llm_parser.parse_with_priority(yandex_text, iam_token)\n \n return RecognitionResult(\n source=\"yandex_vision_llm\",\n items=yandex_items,\n raw_text=yandex_text\n )\n else:\n logger.warning(\"Yandex Vision returned empty text or failed. Falling back to Tesseract.\")\n else:\n logger.info(\"Yandex Vision credentials not set. Skipping Stage 2.\")\n\n # --- ЭТАП 3: Tesseract Strategy (Local Fallback) ---\n logger.info(\"--- Stage 3: Tesseract OCR (Local) ---\")\n \n # 1. Image Processing (бинаризация, выравнивание)\n processed_img = preprocess_image(content)\n \n # 2. OCR\n tesseract_text = ocr_engine.recognize(processed_img)\n \n # 3. Parsing\n ocr_items = parse_receipt_text(tesseract_text)\n \n return RecognitionResult(\n source=\"tesseract_ocr\",\n items=ocr_items,\n raw_text=tesseract_text\n )\n\n except Exception as e:\n logger.error(f\"Error processing request: {e}\", exc_info=True)\n raise HTTPException(status_code=500, detail=str(e))\n\nif __name__ == \"__main__\":\n import uvicorn\n uvicorn.run(app, host=\"0.0.0.0\", port=5000)",
"ocr.py": "import logging\nimport pytesseract\nfrom PIL import Image\nimport numpy as np\n\nlogger = logging.getLogger(__name__)\n\n# Если tesseract не в PATH, раскомментируй и укажи путь:\n# pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'\n\nclass OCREngine:\n def __init__(self):\n logger.info(\"Initializing Tesseract OCR wrapper...\")\n # Tesseract не требует загрузки моделей в память, \n # проверка версии просто чтобы убедиться, что он установлен\n try:\n version = pytesseract.get_tesseract_version()\n logger.info(f\"Tesseract version found: {version}\")\n except Exception as e:\n logger.error(\"Tesseract not found! Make sure it is installed (apt install tesseract-ocr).\")\n raise e\n\n def recognize(self, image: np.ndarray) -> str:\n \"\"\"\n Принимает бинарное изображение (numpy array).\n \"\"\"\n # Tesseract работает лучше с PIL Image\n pil_img = Image.fromarray(image)\n \n # Конфигурация:\n # -l rus+eng: русский и английский\n # --psm 6: Assume a single uniform block of text (хорошо для чеков)\n custom_config = r'--oem 3 --psm 6'\n \n text = pytesseract.image_to_string(pil_img, lang='rus+eng', config=custom_config)\n return text\n\nocr_engine = OCREngine()",
"parser.py": "import re\nfrom typing import List, Optional\nfrom pydantic import BaseModel\nfrom datetime import datetime\n\nclass ParsedItem(BaseModel):\n raw_name: str\n amount: float\n price: float\n sum: float\n\n# Регулярка для поиска чисел с плавающей точкой: 123.00, 123,00, 10.5\nFLOAT_RE = r'\\d+[.,]\\d{2}'\n\ndef clean_text(text: str) -> str:\n \"\"\"Удаляет лишние символы из названия товара.\"\"\"\n return re.sub(r'[^\\w\\s.,%/-]', '', text).strip()\n\ndef parse_float(val: str) -> float:\n \"\"\"Преобразует строку '123,45' или '123.45' в float.\"\"\"\n if not val:\n return 0.0\n return float(val.replace(',', '.').replace(' ', ''))\n\ndef extract_fiscal_data(text: str) -> Optional[str]:\n \"\"\"\n Ищет в тексте:\n - Дата и время (t): 19.12.25 12:16 -> 20251219T1216\n - Сумма (s): ИТОГ: 770.00\n - ФН (fn): 16 цифр, начинается на 73\n - ФД (i): до 8 цифр (ищем после Д: или ФД:)\n - ФП (fp): 10 цифр (ищем после П: или ФП:)\n \n Возвращает строку формата: t=20251219T1216&s=770.00&fn=7384440800514469&i=11194&fp=3334166168&n=1\n \"\"\"\n # 1. Поиск даты и времени\n # Ищем форматы DD.MM.YY HH:MM или DD.MM.YYYY HH:MM\n date_match = re.search(r'(\\d{2})\\.(\\d{2})\\.(\\d{2,4})\\s+(\\d{2}):(\\d{2})', text)\n t_param = \"\"\n if date_match:\n d, m, y, hh, mm = date_match.groups()\n if len(y) == 2: y = \"20\" + y\n t_param = f\"{y}{m}{d}T{hh}{mm}\"\n\n # 2. Поиск суммы (Итог)\n # Ищем слово ИТОГ и число после него\n sum_match = re.search(r'(?:ИТОГ|СУММА|СУММА:)\\s*[:]*\\s*(\\d+[.,]\\d{2})', text, re.IGNORECASE)\n s_param = \"\"\n if sum_match:\n s_param = sum_match.group(1).replace(',', '.')\n\n # 3. Поиск ФН (16 цифр, начинается с 73)\n fn_match = re.search(r'\\b(73\\d{14})\\b', text)\n fn_param = fn_match.group(1) if fn_match else \"\"\n\n # 4. Поиск ФД (i) - ищем после маркеров Д: или ФД:\n # Берем набор цифр до 8 знаков\n fd_match = re.search(r'(?:ФД|Д)[:\\s]+(\\d{1,8})\\b', text)\n i_param = fd_match.group(1) if fd_match else \"\"\n\n # 5. Поиск ФП (fp) - ищем после маркеров П: или ФП:\n # Строго 10 цифр\n fp_match = re.search(r'(?:ФП|П)[:\\s]+(\\d{10})\\b', text)\n fp_param = fp_match.group(1) if fp_match else \"\"\n\n # Валидация: для формирования запроса к API нам критически важны все параметры\n if all([t_param, s_param, fn_param, i_param, fp_param]):\n return f\"t={t_param}&s={s_param}&fn={fn_param}&i={i_param}&fp={fp_param}&n=1\"\n \n return None\n\ndef parse_receipt_text(text: str) -> List[ParsedItem]:\n \"\"\"\n Парсит текст чека построчно (Regex-метод).\n \"\"\"\n lines = text.split('\\n')\n items = []\n name_buffer = []\n\n for line in lines:\n line = line.strip()\n if not line:\n continue\n\n floats = re.findall(FLOAT_RE, line)\n is_price_line = False\n \n if len(floats) >= 2:\n is_price_line = True\n vals = [parse_float(f) for f in floats]\n \n price = 0.0\n amount = 1.0\n total = vals[-1]\n \n if len(vals) == 2:\n price = vals[0]\n amount = 1.0\n if total > price and price > 0:\n calc_amount = total / price\n if abs(round(calc_amount) - calc_amount) < 0.05:\n amount = float(round(calc_amount))\n elif len(vals) >= 3:\n v1, v2 = vals[-3], vals[-2]\n if abs(v1 * v2 - total) < 0.5:\n price, amount = v1, v2\n elif abs(v2 * v1 - total) < 0.5:\n price, amount = v2, v1\n else:\n price, amount = vals[-2], 1.0\n\n full_name = \" \".join(name_buffer).strip()\n if not full_name:\n text_without_floats = re.sub(FLOAT_RE, '', line)\n full_name = clean_text(text_without_floats)\n\n if len(full_name) > 2 and total > 0:\n items.append(ParsedItem(\n raw_name=full_name,\n amount=amount,\n price=price,\n sum=total\n ))\n name_buffer = []\n else:\n upper_line = line.upper()\n if any(stop in upper_line for stop in [\"ИТОГ\", \"СУММА\", \"ПРИХОД\"]):\n name_buffer = [] \n continue\n name_buffer.append(line)\n\n return items",
"qr_manager.py": "import logging\nimport requests\nfrom typing import Optional, List\nfrom pyzbar.pyzbar import decode\nfrom PIL import Image\nimport numpy as np\n\n# Импортируем модель из parser.py\nfrom parser import ParsedItem \n\nAPI_TOKEN = \"36590.yqtiephCvvkYUKM2W\"\nAPI_URL = \"https://proverkacheka.com/api/v1/check/get\"\n\nlogger = logging.getLogger(__name__)\n\ndef is_valid_fiscal_qr(qr_string: str) -> bool:\n \"\"\"\n Проверяет, соответствует ли строка формату фискального чека ФНС.\n Ожидаемый формат: t=...&s=...&fn=...&i=...&fp=...&n=...\n Мы проверяем наличие хотя бы 3-х ключевых параметров.\n \"\"\"\n if not qr_string:\n return False\n \n # Ключевые параметры, которые обязаны быть в строке чека\n required_keys = [\"t=\", \"s=\", \"fn=\"]\n \n # Проверяем, что все ключевые параметры присутствуют\n # (порядок может отличаться, поэтому проверяем вхождение каждого)\n matches = [key in qr_string for key in required_keys]\n \n return all(matches)\n\ndef detect_and_decode_qr(image: np.ndarray) -> Optional[str]:\n \"\"\"\n Ищет ВСЕ QR-коды на изображении и возвращает только тот, \n который похож на фискальный чек.\n \"\"\"\n try:\n pil_img = Image.fromarray(image)\n \n # Декодируем все коды на картинке\n decoded_objects = decode(pil_img)\n \n if not decoded_objects:\n logger.info(\"No QR codes detected on the image.\")\n return None\n \n logger.info(f\"Detected {len(decoded_objects)} code(s). Scanning for fiscal data...\")\n\n for obj in decoded_objects:\n if obj.type == 'QRCODE':\n qr_data = obj.data.decode(\"utf-8\")\n \n # Логируем найденное (для отладки, если вдруг формат хитрый)\n # Обрезаем длинные строки, чтобы не засорять лог\n log_preview = (qr_data[:75] + '..') if len(qr_data) > 75 else qr_data\n logger.info(f\"Checking QR content: {log_preview}\")\n \n if is_valid_fiscal_qr(qr_data):\n logger.info(\"Valid fiscal QR found!\")\n return qr_data\n else:\n logger.info(\"QR skipped (not a fiscal receipt pattern).\")\n \n logger.warning(\"QR codes were found, but none matched the fiscal receipt format.\")\n return None\n\n except Exception as e:\n logger.error(f\"Error during QR detection: {e}\")\n return None\n\ndef fetch_data_from_api(qr_raw: str) -> List[ParsedItem]:\n \"\"\"\n Отправляет данные QR-кода в API proverkacheka.com и парсит JSON-ответ.\n \"\"\"\n try:\n payload = {\n 'qrraw': qr_raw,\n 'token': API_TOKEN\n }\n \n logger.info(\"Sending request to Check API...\")\n response = requests.post(API_URL, data=payload, timeout=10)\n \n if response.status_code != 200:\n logger.error(f\"API Error: Status {response.status_code}\")\n return []\n\n data = response.json()\n \n if data.get('code') != 1:\n logger.warning(f\"API returned non-success code: {data.get('code')}\")\n return []\n \n json_data = data.get('data', {}).get('json', {})\n items_data = json_data.get('items', [])\n \n parsed_items = []\n \n for item in items_data:\n price = float(item.get('price', 0)) / 100.0\n total_sum = float(item.get('sum', 0)) / 100.0\n quantity = float(item.get('quantity', 0))\n name = item.get('name', 'Unknown')\n \n parsed_items.append(ParsedItem(\n raw_name=name,\n amount=quantity,\n price=price,\n sum=total_sum\n ))\n \n return parsed_items\n\n except Exception as e:\n logger.error(f\"Error fetching/parsing API data: {e}\")\n return []",
"requirements.txt": "fastapi\nuvicorn\npython-multipart\npydantic\nnumpy\nopencv-python-headless\npytesseract\nrequests\npyzbar\npillow\ncertifi",
"yandex_ocr.py": "import os\nimport time\nimport json\nimport base64\nimport logging\nimport requests\nfrom typing import Optional\n\nlogger = logging.getLogger(__name__)\n\nIAM_TOKEN_URL = \"https://iam.api.cloud.yandex.net/iam/v1/tokens\"\nVISION_URL = \"https://ocr.api.cloud.yandex.net/ocr/v1/recognizeText\"\n\nclass YandexOCREngine:\n def __init__(self):\n self.oauth_token = os.getenv(\"YANDEX_OAUTH_TOKEN\")\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n \n # Кэширование IAM токена\n self._iam_token = None\n self._token_expire_time = 0\n\n if not self.oauth_token or not self.folder_id:\n logger.warning(\"Yandex OCR credentials (YANDEX_OAUTH_TOKEN, YANDEX_FOLDER_ID) not set. Yandex OCR will be unavailable.\")\n\n def _get_iam_token(self) -> Optional[str]:\n \"\"\"\n Получает IAM-токен. Если есть живой кэшированный — возвращает его.\n Если нет — обменивает OAuth на IAM.\n \"\"\"\n current_time = time.time()\n \n # Если токен есть и он \"свежий\" (с запасом в 5 минут)\n if self._iam_token and current_time < self._token_expire_time - 300:\n return self._iam_token\n\n logger.info(\"Obtaining new IAM token from Yandex...\")\n try:\n response = requests.post(\n IAM_TOKEN_URL,\n json={\"yandexPassportOauthToken\": self.oauth_token},\n timeout=10\n )\n response.raise_for_status()\n data = response.json()\n \n self._iam_token = data[\"iamToken\"]\n \n # Токен обычно живет 12 часов, но мы будем ориентироваться на поле expiresAt если нужно,\n # или просто поставим таймер. Для простоты берем 1 час жизни кэша.\n self._token_expire_time = current_time + 3600 \n \n logger.info(\"IAM token received successfully.\")\n return self._iam_token\n except Exception as e:\n logger.error(f\"Failed to get IAM token: {e}\")\n return None\n\n def recognize(self, image_bytes: bytes) -> str:\n \"\"\"\n Отправляет изображение в Yandex Vision и возвращает полный текст.\n \"\"\"\n if not self.oauth_token or not self.folder_id:\n logger.error(\"Yandex credentials missing.\")\n return \"\"\n\n iam_token = self._get_iam_token()\n if not iam_token:\n return \"\"\n\n # 1. Кодируем в Base64\n b64_image = base64.b64encode(image_bytes).decode(\"utf-8\")\n\n # 2. Формируем тело запроса\n # Используем модель 'page' (для документов) и '*' для автоопределения языка\n payload = {\n \"mimeType\": \"JPEG\", # Yandex переваривает и PNG под видом JPEG часто, но лучше быть аккуратным.\n # В идеале определять mime-type из файла, но JPEG - безопасный дефолт для фото.\n \"languageCodes\": [\"*\"],\n \"model\": \"page\",\n \"content\": b64_image\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id,\n \"x-data-logging-enabled\": \"true\"\n }\n\n # 3. Отправляем запрос\n try:\n logger.info(\"Sending request to Yandex Vision OCR...\")\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n # Если 401 Unauthorized, возможно токен протух раньше времени (редко, но бывает)\n if response.status_code == 401:\n logger.warning(\"Got 401 from Yandex. Retrying with fresh token...\")\n self._iam_token = None # сброс кэша\n iam_token = self._get_iam_token()\n if iam_token:\n headers[\"Authorization\"] = f\"Bearer {iam_token}\"\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n response.raise_for_status()\n result_json = response.json()\n \n # 4. Парсим ответ\n # Структура: result -> textAnnotation -> fullText\n # Или (если fullText нет) blocks -> lines -> text\n \n text_annotation = result_json.get(\"result\", {}).get(\"textAnnotation\", {})\n \n if not text_annotation:\n logger.warning(\"Yandex returned success but no textAnnotation found.\")\n return \"\"\n \n # Самый простой способ - взять fullText, он обычно склеен с \\n\n full_text = text_annotation.get(\"fullText\", \"\")\n \n if not full_text:\n # Фолбэк: если fullText пуст, собираем вручную по блокам\n logger.info(\"fullText empty, assembling from blocks...\")\n lines_text = []\n for block in text_annotation.get(\"blocks\", []):\n for line in block.get(\"lines\", []):\n lines_text.append(line.get(\"text\", \"\"))\n full_text = \"\\n\".join(lines_text)\n\n return full_text\n\n except Exception as e:\n logger.error(f\"Error during Yandex Vision request: {e}\")\n return \"\"\n\n# Глобальный инстанс\nyandex_engine = YandexOCREngine()",
}