mirror of
https://github.com/serty2005/rmser.git
synced 2026-02-04 19:02:33 -06:00
30 lines
30 KiB
Python
30 lines
30 KiB
Python
# -*- coding: utf-8 -*-
|
||
# PYTHON PROJECT DUMP
|
||
# GENERATED BY pack_python_project.py
|
||
|
||
project_tree = '''
|
||
ocr-service/
|
||
├── .dockerignore
|
||
├── Dockerfile
|
||
├── imgproc.py
|
||
├── llm_parser.py
|
||
├── main.py
|
||
├── ocr.py
|
||
├── parser.py
|
||
├── qr_manager.py
|
||
├── requirements.txt
|
||
├── system-prompt.md
|
||
└── yandex_ocr.py
|
||
'''
|
||
|
||
project_files = {
|
||
"imgproc.py": "import cv2\nimport numpy as np\nimport logging\n\nlogger = logging.getLogger(__name__)\n\ndef order_points(pts):\n rect = np.zeros((4, 2), dtype=\"float32\")\n s = pts.sum(axis=1)\n rect[0] = pts[np.argmin(s)]\n rect[2] = pts[np.argmax(s)]\n diff = np.diff(pts, axis=1)\n rect[1] = pts[np.argmin(diff)]\n rect[3] = pts[np.argmax(diff)]\n return rect\n\ndef four_point_transform(image, pts):\n rect = order_points(pts)\n (tl, tr, br, bl) = rect\n\n widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))\n widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))\n maxWidth = max(int(widthA), int(widthB))\n\n heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))\n heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))\n maxHeight = max(int(heightA), int(heightB))\n\n dst = np.array([\n [0, 0],\n [maxWidth - 1, 0],\n [maxWidth - 1, maxHeight - 1],\n [0, maxHeight - 1]], dtype=\"float32\")\n\n M = cv2.getPerspectiveTransform(rect, dst)\n warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))\n return warped\n\ndef preprocess_image(image_bytes: bytes) -> np.ndarray:\n \"\"\"\n Возвращает БИНАРНОЕ (Ч/Б) изображение для Tesseract.\n \"\"\"\n nparr = np.frombuffer(image_bytes, np.uint8)\n image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n \n if image is None:\n raise ValueError(\"Could not decode image\")\n\n # Ресайз для поиска контуров\n ratio = image.shape[0] / 500.0\n orig = image.copy()\n image_small = cv2.resize(image, (int(image.shape[1] / ratio), 500))\n\n gray = cv2.cvtColor(image_small, cv2.COLOR_BGR2GRAY)\n gray = cv2.GaussianBlur(gray, (5, 5), 0)\n edged = cv2.Canny(gray, 75, 200)\n\n cnts, _ = cv2.findContours(edged.copy(), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)\n cnts = sorted(cnts, key=cv2.contourArea, reverse=True)[:5]\n\n screenCnt = None\n found = False\n\n for c in cnts:\n peri = cv2.arcLength(c, True)\n approx = cv2.approxPolyDP(c, 0.02 * peri, True)\n if len(approx) == 4:\n screenCnt = approx\n found = True\n break\n\n # Изображение, с которым будем работать дальше\n target_img = None\n\n if found:\n logger.info(\"Receipt contour found (Tesseract mode).\")\n target_img = four_point_transform(orig, screenCnt.reshape(4, 2) * ratio)\n else:\n logger.warning(\"Receipt contour NOT found. Using full image.\")\n target_img = orig\n\n # --- Подготовка для Tesseract (Бинаризация) ---\n # Переводим в Gray\n gray_final = cv2.cvtColor(target_img, cv2.COLOR_BGR2GRAY)\n \n # Адаптивный порог (превращаем в чисто черное и белое)\n # block_size=11, C=2 - классические параметры для текста\n thresh = cv2.adaptiveThreshold(\n gray_final, 255, \n cv2.ADAPTIVE_THRESH_GAUSSIAN_C, \n cv2.THRESH_BINARY, 11, 2\n )\n \n # Немного убираем шум\n # thresh = cv2.medianBlur(thresh, 3) \n \n return thresh",
|
||
"llm_parser.py": "import os\nimport requests\nimport logging\nimport json\nfrom typing import List\nfrom parser import ParsedItem\n\nlogger = logging.getLogger(__name__)\n\nYANDEX_GPT_URL = \"https://llm.api.cloud.yandex.net/foundationModels/v1/completion\"\n\nclass YandexGPTParser:\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n self.api_key = os.getenv(\"YANDEX_OAUTH_TOKEN\") # Используем тот же доступ\n\n def parse_with_llm(self, raw_text: str, iam_token: str) -> List[ParsedItem]:\n \"\"\"\n Отправляет текст в YandexGPT для структурирования.\n \"\"\"\n if not iam_token:\n return []\n\n prompt = {\n \"modelUri\": f\"gpt://{self.folder_id}/yandexgpt/latest\",\n \"completionOptions\": {\n \"stream\": False,\n \"temperature\": 0.1, # Низкая температура для точности\n \"maxTokens\": \"2000\"\n },\n \"messages\": [\n {\n \"role\": \"system\",\n \"text\": (\n \"Ты — помощник по бухгалтерии. Извлеки список товаров из текста документа. \"\n \"Верни ответ строго в формате JSON: \"\n '[{\"raw_name\": string, \"amount\": float, \"price\": float, \"sum\": float}]. '\n \"Если количество не указано, считай 1.0. Не пиши ничего, кроме JSON.\"\n )\n },\n {\n \"role\": \"user\",\n \"text\": raw_text\n }\n ]\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id\n }\n\n try:\n response = requests.post(YANDEX_GPT_URL, headers=headers, json=prompt, timeout=30)\n response.raise_for_status()\n result = response.json()\n \n # Извлекаем текст ответа\n content = result['result']['alternatives'][0]['message']['text']\n \n # Очищаем от возможных markdown-оберток ```json ... ```\n clean_json = content.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n \n items_raw = json.loads(clean_json)\n \n parsed_items = [ParsedItem(**item) for item in items_raw]\n return parsed_items\n \n except Exception as e:\n logger.error(f\"LLM Parsing error: {e}\")\n return []\n\nllm_parser = YandexGPTParser()",
|
||
"main.py": "import logging\nimport os\nfrom typing import List\n\nfrom fastapi import FastAPI, File, UploadFile, HTTPException\nfrom pydantic import BaseModel\nimport cv2\nimport numpy as np\n\n# Импортируем модули\nfrom imgproc import preprocess_image\nfrom parser import parse_receipt_text, ParsedItem\nfrom ocr import ocr_engine\nfrom qr_manager import detect_and_decode_qr, fetch_data_from_api\n# Импортируем новый модуль\nfrom yandex_ocr import yandex_engine\nfrom llm_parser import llm_parser\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\napp = FastAPI(title=\"RMSER OCR Service (Hybrid: QR + Yandex + Tesseract)\")\n\nclass RecognitionResult(BaseModel):\n source: str # 'qr_api', 'yandex_vision', 'tesseract_ocr'\n items: List[ParsedItem]\n raw_text: str = \"\"\n\n@app.get(\"/health\")\ndef health_check():\n return {\"status\": \"ok\"}\n\n@app.post(\"/recognize\", response_model=RecognitionResult)\nasync def recognize_receipt(image: UploadFile = File(...)):\n \"\"\"\n Стратегия:\n 1. QR Code + FNS API (Приоритет 1 - Идеальная точность)\n 2. Yandex Vision OCR (Приоритет 2 - Высокая точность, если настроен)\n 3. Tesseract OCR (Приоритет 3 - Локальный фолбэк)\n \"\"\"\n logger.info(f\"Received file: {image.filename}, content_type: {image.content_type}\")\n\n if not image.content_type.startswith(\"image/\"):\n raise HTTPException(status_code=400, detail=\"File must be an image\")\n\n try:\n # Читаем сырые байты\n content = await image.read()\n \n # Конвертируем в numpy для QR и локального препроцессинга\n nparr = np.frombuffer(content, np.uint8)\n original_cv_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n\n if original_cv_image is None:\n raise HTTPException(status_code=400, detail=\"Invalid image data\")\n\n # --- ЭТАП 1: QR Code Strategy ---\n logger.info(\"--- Stage 1: QR Code Detection ---\")\n qr_raw = detect_and_decode_qr(original_cv_image)\n \n if qr_raw:\n logger.info(\"QR found! Fetching data from API...\")\n api_items = fetch_data_from_api(qr_raw)\n \n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via QR API.\")\n return RecognitionResult(\n source=\"qr_api\",\n items=api_items,\n raw_text=f\"QR Content: {qr_raw}\"\n )\n else:\n logger.warning(\"QR found but API failed. Falling back to OCR.\")\n else:\n logger.info(\"QR code not found. Proceeding to OCR.\")\n\n # --- ЭТАП 2: Yandex Vision Strategy (Cloud OCR) ---\n # Проверяем, настроен ли Яндекс\n if yandex_engine.oauth_token and yandex_engine.folder_id:\n logger.info(\"--- Stage 2: Yandex Vision OCR ---\")\n \n # Яндекс принимает сырые байты картинки (Base64), ему не нужен наш препроцессинг\n yandex_text = yandex_engine.recognize(content)\n \n if yandex_text and len(yandex_text) > 10:\n logger.info(f\"Yandex OCR success. Text length: {len(yandex_text)}\")\n logger.info(f\"Yandex RAW OUTPUT:\\n{yandex_text}\") \n yandex_items = parse_receipt_text(yandex_text)\n logger.info(f\"Parsed items preview: {yandex_items[:3]}...\") \n # Если Regex не нашел позиций (как в нашем случае со счетом)\n if not yandex_items:\n logger.info(\"Regex found nothing. Calling YandexGPT for semantic parsing...\")\n iam_token = yandex_engine._get_iam_token()\n yandex_items = llm_parser.parse_with_llm(yandex_text, iam_token)\n logger.info(f\"Semantic parsed items preview: {yandex_items[:3]}...\")\n \n return RecognitionResult(\n source=\"yandex_vision\",\n items=yandex_items,\n raw_text=yandex_text\n )\n else:\n logger.warning(\"Yandex Vision returned empty text or failed. Falling back to Tesseract.\")\n else:\n logger.info(\"Yandex Vision credentials not set. Skipping Stage 2.\")\n\n # --- ЭТАП 3: Tesseract Strategy (Local Fallback) ---\n logger.info(\"--- Stage 3: Tesseract OCR (Local) ---\")\n \n # 1. Image Processing (бинаризация, выравнивание)\n processed_img = preprocess_image(content)\n \n # 2. OCR\n tesseract_text = ocr_engine.recognize(processed_img)\n \n # 3. Parsing\n ocr_items = parse_receipt_text(tesseract_text)\n \n return RecognitionResult(\n source=\"tesseract_ocr\",\n items=ocr_items,\n raw_text=tesseract_text\n )\n\n except Exception as e:\n logger.error(f\"Error processing request: {e}\", exc_info=True)\n raise HTTPException(status_code=500, detail=str(e))\n\nif __name__ == \"__main__\":\n import uvicorn\n uvicorn.run(app, host=\"0.0.0.0\", port=5000)",
|
||
"ocr.py": "import logging\nimport pytesseract\nfrom PIL import Image\nimport numpy as np\n\nlogger = logging.getLogger(__name__)\n\n# Если tesseract не в PATH, раскомментируй и укажи путь:\n# pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'\n\nclass OCREngine:\n def __init__(self):\n logger.info(\"Initializing Tesseract OCR wrapper...\")\n # Tesseract не требует загрузки моделей в память, \n # проверка версии просто чтобы убедиться, что он установлен\n try:\n version = pytesseract.get_tesseract_version()\n logger.info(f\"Tesseract version found: {version}\")\n except Exception as e:\n logger.error(\"Tesseract not found! Make sure it is installed (apt install tesseract-ocr).\")\n raise e\n\n def recognize(self, image: np.ndarray) -> str:\n \"\"\"\n Принимает бинарное изображение (numpy array).\n \"\"\"\n # Tesseract работает лучше с PIL Image\n pil_img = Image.fromarray(image)\n \n # Конфигурация:\n # -l rus+eng: русский и английский\n # --psm 6: Assume a single uniform block of text (хорошо для чеков)\n custom_config = r'--oem 3 --psm 6'\n \n text = pytesseract.image_to_string(pil_img, lang='rus+eng', config=custom_config)\n return text\n\nocr_engine = OCREngine()",
|
||
"parser.py": "import re\nfrom typing import List\nfrom pydantic import BaseModel\n\nclass ParsedItem(BaseModel):\n raw_name: str\n amount: float\n price: float\n sum: float\n\n# Регулярка для поиска чисел с плавающей точкой: 123.00, 123,00, 10.5\nFLOAT_RE = r'\\d+[.,]\\d{2}'\n\ndef clean_text(text: str) -> str:\n \"\"\"Удаляет лишние символы из названия товара.\"\"\"\n # Оставляем буквы, цифры, пробелы и базовые знаки\n return re.sub(r'[^\\w\\s.,%/-]', '', text).strip()\n\ndef parse_float(val: str) -> float:\n \"\"\"Преобразует строку '123,45' или '123.45' в float.\"\"\"\n if not val:\n return 0.0\n # Заменяем запятую на точку и убираем возможные пробелы\n return float(val.replace(',', '.').replace(' ', ''))\n\ndef parse_receipt_text(text: str) -> List[ParsedItem]:\n \"\"\"\n Парсит текст чека построчно.\n Логика: накапливаем строки названия, пока не встретим строку с математикой (цена/сумма).\n \"\"\"\n lines = text.split('\\n')\n items = []\n name_buffer = []\n\n for line in lines:\n line = line.strip()\n if not line:\n continue\n\n # Ищем все числа, похожие на цену (с двумя знаками после запятой/точки)\n # Пример: 129.99\n floats = re.findall(FLOAT_RE, line)\n \n # Эвристика: строка считается \"товарной позицией с ценой\", если в ней есть минимум 2 числа\n # (обычно Цена и Сумма, или Кол-во и Сумма)\n # ИЛИ одно число, если это итоговая сумма, но мы ищем товары.\n \n is_price_line = False\n \n if len(floats) >= 2:\n is_price_line = True\n \n vals = [parse_float(f) for f in floats]\n \n # Попытка определить структуру: Цена x Кол-во = Сумма\n price = 0.0\n amount = 1.0\n total = 0.0\n \n # Обычно последнее число - это сумма (итог по строке)\n total = vals[-1]\n \n if len(vals) == 2:\n # Скорее всего: Цена ... Сумма (кол-во = 1)\n # Или: Кол-во ... Сумма (если цена не распозналась)\n # Предположим amount=1, тогда первое число - цена\n price = vals[0]\n amount = 1.0\n \n # Проверка на адекватность: сумма обычно >= цены\n # Если total < price, возможно порядок перепутан или это скидка\n if total < price and total != 0:\n # Если total сильно меньше, возможно это не сумма\n pass \n elif total > price and price > 0:\n # Пытаемся вычислить кол-во\n calc_amount = total / price\n # Если результат близок к целому (например 1.999 -> 2), то ок\n if abs(round(calc_amount) - calc_amount) < 0.05:\n amount = float(round(calc_amount))\n\n elif len(vals) >= 3:\n # Варианты: [Цена, Кол-во, Сумма] или [Кол-во, Цена, Сумма]\n # Проверяем математику: A * B = C\n v1, v2 = vals[-3], vals[-2]\n \n if abs(v1 * v2 - total) < 0.5: # Допуск 0.5 руб\n price = v1\n amount = v2\n elif abs(v2 * v1 - total) < 0.5:\n price = v2\n amount = v1\n else:\n # Если математика не сходится, берем предпоследнее как цену\n price = vals[-2]\n amount = 1.0\n\n # Сборка названия\n full_name = \" \".join(name_buffer).strip()\n \n # Если буфер пуст, возможно название в этой же строке слева\n if not full_name:\n # Удаляем найденные числа из строки, остаток считаем названием\n text_without_floats = re.sub(FLOAT_RE, '', line)\n full_name = clean_text(text_without_floats)\n\n # Фильтрация мусора (слишком короткие названия или нулевые суммы)\n if len(full_name) > 2 and total > 0:\n items.append(ParsedItem(\n raw_name=full_name,\n amount=amount,\n price=price,\n sum=total\n ))\n \n # Очищаем буфер, так как позиция закрыта\n name_buffer = []\n \n else:\n # Строка не похожа на цену.\n # Проверяем на стоп-слова (конец чека)\n upper_line = line.upper()\n if \"ИТОГ\" in upper_line or \"СУММА\" in upper_line or \"ПРИХОД\" in upper_line:\n # Считаем, что товары закончились\n name_buffer = [] \n # Можно здесь сделать break, если уверены, что ниже товаров нет\n continue\n \n # Добавляем в буфер названия\n name_buffer.append(line)\n\n return items",
|
||
"qr_manager.py": "import logging\nimport requests\nfrom typing import Optional, List\nfrom pyzbar.pyzbar import decode\nfrom PIL import Image\nimport numpy as np\n\n# Импортируем модель из parser.py\nfrom parser import ParsedItem \n\nAPI_TOKEN = \"36590.yqtiephCvvkYUKM2W\"\nAPI_URL = \"https://proverkacheka.com/api/v1/check/get\"\n\nlogger = logging.getLogger(__name__)\n\ndef is_valid_fiscal_qr(qr_string: str) -> bool:\n \"\"\"\n Проверяет, соответствует ли строка формату фискального чека ФНС.\n Ожидаемый формат: t=...&s=...&fn=...&i=...&fp=...&n=...\n Мы проверяем наличие хотя бы 3-х ключевых параметров.\n \"\"\"\n if not qr_string:\n return False\n \n # Ключевые параметры, которые обязаны быть в строке чека\n required_keys = [\"t=\", \"s=\", \"fn=\"]\n \n # Проверяем, что все ключевые параметры присутствуют\n # (порядок может отличаться, поэтому проверяем вхождение каждого)\n matches = [key in qr_string for key in required_keys]\n \n return all(matches)\n\ndef detect_and_decode_qr(image: np.ndarray) -> Optional[str]:\n \"\"\"\n Ищет ВСЕ QR-коды на изображении и возвращает только тот, \n который похож на фискальный чек.\n \"\"\"\n try:\n pil_img = Image.fromarray(image)\n \n # Декодируем все коды на картинке\n decoded_objects = decode(pil_img)\n \n if not decoded_objects:\n logger.info(\"No QR codes detected on the image.\")\n return None\n \n logger.info(f\"Detected {len(decoded_objects)} code(s). Scanning for fiscal data...\")\n\n for obj in decoded_objects:\n if obj.type == 'QRCODE':\n qr_data = obj.data.decode(\"utf-8\")\n \n # Логируем найденное (для отладки, если вдруг формат хитрый)\n # Обрезаем длинные строки, чтобы не засорять лог\n log_preview = (qr_data[:75] + '..') if len(qr_data) > 75 else qr_data\n logger.info(f\"Checking QR content: {log_preview}\")\n \n if is_valid_fiscal_qr(qr_data):\n logger.info(\"Valid fiscal QR found!\")\n return qr_data\n else:\n logger.info(\"QR skipped (not a fiscal receipt pattern).\")\n \n logger.warning(\"QR codes were found, but none matched the fiscal receipt format.\")\n return None\n\n except Exception as e:\n logger.error(f\"Error during QR detection: {e}\")\n return None\n\ndef fetch_data_from_api(qr_raw: str) -> List[ParsedItem]:\n \"\"\"\n Отправляет данные QR-кода в API proverkacheka.com и парсит JSON-ответ.\n \"\"\"\n try:\n payload = {\n 'qrraw': qr_raw,\n 'token': API_TOKEN\n }\n \n logger.info(\"Sending request to Check API...\")\n response = requests.post(API_URL, data=payload, timeout=10)\n \n if response.status_code != 200:\n logger.error(f\"API Error: Status {response.status_code}\")\n return []\n\n data = response.json()\n \n if data.get('code') != 1:\n logger.warning(f\"API returned non-success code: {data.get('code')}\")\n return []\n \n json_data = data.get('data', {}).get('json', {})\n items_data = json_data.get('items', [])\n \n parsed_items = []\n \n for item in items_data:\n price = float(item.get('price', 0)) / 100.0\n total_sum = float(item.get('sum', 0)) / 100.0\n quantity = float(item.get('quantity', 0))\n name = item.get('name', 'Unknown')\n \n parsed_items.append(ParsedItem(\n raw_name=name,\n amount=quantity,\n price=price,\n sum=total_sum\n ))\n \n return parsed_items\n\n except Exception as e:\n logger.error(f\"Error fetching/parsing API data: {e}\")\n return []",
|
||
"requirements.txt": "fastapi\nuvicorn\npython-multipart\npydantic\nnumpy\nopencv-python-headless\npytesseract\nrequests\npyzbar\npillow",
|
||
"yandex_ocr.py": "import os\nimport time\nimport json\nimport base64\nimport logging\nimport requests\nfrom typing import Optional\n\nlogger = logging.getLogger(__name__)\n\nIAM_TOKEN_URL = \"https://iam.api.cloud.yandex.net/iam/v1/tokens\"\nVISION_URL = \"https://ocr.api.cloud.yandex.net/ocr/v1/recognizeText\"\n\nclass YandexOCREngine:\n def __init__(self):\n self.oauth_token = os.getenv(\"YANDEX_OAUTH_TOKEN\")\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n \n # Кэширование IAM токена\n self._iam_token = None\n self._token_expire_time = 0\n\n if not self.oauth_token or not self.folder_id:\n logger.warning(\"Yandex OCR credentials (YANDEX_OAUTH_TOKEN, YANDEX_FOLDER_ID) not set. Yandex OCR will be unavailable.\")\n\n def _get_iam_token(self) -> Optional[str]:\n \"\"\"\n Получает IAM-токен. Если есть живой кэшированный — возвращает его.\n Если нет — обменивает OAuth на IAM.\n \"\"\"\n current_time = time.time()\n \n # Если токен есть и он \"свежий\" (с запасом в 5 минут)\n if self._iam_token and current_time < self._token_expire_time - 300:\n return self._iam_token\n\n logger.info(\"Obtaining new IAM token from Yandex...\")\n try:\n response = requests.post(\n IAM_TOKEN_URL,\n json={\"yandexPassportOauthToken\": self.oauth_token},\n timeout=10\n )\n response.raise_for_status()\n data = response.json()\n \n self._iam_token = data[\"iamToken\"]\n \n # Токен обычно живет 12 часов, но мы будем ориентироваться на поле expiresAt если нужно,\n # или просто поставим таймер. Для простоты берем 1 час жизни кэша.\n self._token_expire_time = current_time + 3600 \n \n logger.info(\"IAM token received successfully.\")\n return self._iam_token\n except Exception as e:\n logger.error(f\"Failed to get IAM token: {e}\")\n return None\n\n def recognize(self, image_bytes: bytes) -> str:\n \"\"\"\n Отправляет изображение в Yandex Vision и возвращает полный текст.\n \"\"\"\n if not self.oauth_token or not self.folder_id:\n logger.error(\"Yandex credentials missing.\")\n return \"\"\n\n iam_token = self._get_iam_token()\n if not iam_token:\n return \"\"\n\n # 1. Кодируем в Base64\n b64_image = base64.b64encode(image_bytes).decode(\"utf-8\")\n\n # 2. Формируем тело запроса\n # Используем модель 'page' (для документов) и '*' для автоопределения языка\n payload = {\n \"mimeType\": \"JPEG\", # Yandex переваривает и PNG под видом JPEG часто, но лучше быть аккуратным.\n # В идеале определять mime-type из файла, но JPEG - безопасный дефолт для фото.\n \"languageCodes\": [\"*\"],\n \"model\": \"page\",\n \"content\": b64_image\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id,\n \"x-data-logging-enabled\": \"true\"\n }\n\n # 3. Отправляем запрос\n try:\n logger.info(\"Sending request to Yandex Vision OCR...\")\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n # Если 401 Unauthorized, возможно токен протух раньше времени (редко, но бывает)\n if response.status_code == 401:\n logger.warning(\"Got 401 from Yandex. Retrying with fresh token...\")\n self._iam_token = None # сброс кэша\n iam_token = self._get_iam_token()\n if iam_token:\n headers[\"Authorization\"] = f\"Bearer {iam_token}\"\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n response.raise_for_status()\n result_json = response.json()\n \n # 4. Парсим ответ\n # Структура: result -> textAnnotation -> fullText\n # Или (если fullText нет) blocks -> lines -> text\n \n text_annotation = result_json.get(\"result\", {}).get(\"textAnnotation\", {})\n \n if not text_annotation:\n logger.warning(\"Yandex returned success but no textAnnotation found.\")\n return \"\"\n \n # Самый простой способ - взять fullText, он обычно склеен с \\n\n full_text = text_annotation.get(\"fullText\", \"\")\n \n if not full_text:\n # Фолбэк: если fullText пуст, собираем вручную по блокам\n logger.info(\"fullText empty, assembling from blocks...\")\n lines_text = []\n for block in text_annotation.get(\"blocks\", []):\n for line in block.get(\"lines\", []):\n lines_text.append(line.get(\"text\", \"\"))\n full_text = \"\\n\".join(lines_text)\n\n return full_text\n\n except Exception as e:\n logger.error(f\"Error during Yandex Vision request: {e}\")\n return \"\"\n\n# Глобальный инстанс\nyandex_engine = YandexOCREngine()",
|
||
}
|