Files
rmser/ocr-service/python_project_dump.py

36 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# PYTHON PROJECT DUMP
# GENERATED BY pack_python_project.py
project_tree = '''
ocr-service/
├── .dockerignore
├── .env
├── Dockerfile
├── app
│ ├── main.py
│ ├── schemas
│ │ └── models.py
│ └── services
│ ├── auth.py
│ ├── llm.py
│ ├── ocr.py
│ └── qr.py
├── requirements.txt
└── scripts
├── collect_data_raw.py
└── test_parsing_quality.py
'''
project_files = {
"app/main.py": "import logging\nimport os\nfrom typing import List\n\nfrom fastapi import FastAPI, File, UploadFile, HTTPException\nfrom pydantic import BaseModel\nimport cv2\nimport numpy as np\n\n# Импортируем модули\nfrom app.schemas.models import ParsedItem, RecognitionResult\nfrom app.services.qr import detect_and_decode_qr, fetch_data_from_api, extract_fiscal_data\n# Импортируем новый модуль\nfrom app.services.ocr import yandex_engine\nfrom app.services.llm import llm_parser\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\napp = FastAPI(title=\"RMSER OCR Service (Cloud-only: QR + Yandex + GigaChat)\")\n\n@app.get(\"/health\")\ndef health_check():\n return {\"status\": \"ok\"}\n\n@app.post(\"/recognize\", response_model=RecognitionResult)\nasync def recognize_receipt(image: UploadFile = File(...)):\n \"\"\"\n Стратегия:\n 1. QR Code + FNS API (Приоритет 1 - Идеальная точность)\n 2. Yandex Vision OCR + LLM (Приоритет 2 - Высокая точность, если настроен)\n Если ничего не найдено, возвращает пустой результат.\n \"\"\"\n logger.info(f\"Received file: {image.filename}, content_type: {image.content_type}\")\n\n if not image.content_type.startswith(\"image/\"):\n raise HTTPException(status_code=400, detail=\"File must be an image\")\n\n try:\n # Читаем сырые байты\n content = await image.read()\n \n # Конвертируем в numpy для QR\n nparr = np.frombuffer(content, np.uint8)\n original_cv_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n\n if original_cv_image is None:\n raise HTTPException(status_code=400, detail=\"Invalid image data\")\n\n # --- ЭТАП 1: QR Code Strategy ---\n logger.info(\"--- Stage 1: QR Code Detection ---\")\n qr_raw = detect_and_decode_qr(original_cv_image)\n \n if qr_raw:\n logger.info(\"QR found! Fetching data from API...\")\n api_items = fetch_data_from_api(qr_raw)\n \n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via QR API.\")\n return RecognitionResult(\n source=\"qr_api\",\n items=api_items,\n raw_text=f\"QR Content: {qr_raw}\"\n )\n else:\n logger.warning(\"QR found but API failed. Falling back to OCR.\")\n else:\n logger.info(\"QR code not found. Proceeding to OCR.\")\n\n # --- ЭТАП 2: OCR + Virtual QR Strategy ---\n if yandex_engine.is_configured():\n logger.info(\"--- Stage 2: Yandex Vision OCR + Virtual QR ---\")\n yandex_text = yandex_engine.recognize(content)\n\n if yandex_text and len(yandex_text) > 10:\n logger.info(f\"OCR success. Raw text length: {len(yandex_text)}\")\n\n # Попытка собрать виртуальный QR из текста\n virtual_qr = extract_fiscal_data(yandex_text)\n if virtual_qr:\n logger.info(f\"Virtual QR constructed: {virtual_qr}\")\n api_items = fetch_data_from_api(virtual_qr)\n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via Virtual QR API.\")\n return RecognitionResult(\n source=\"virtual_qr_api\",\n items=api_items,\n raw_text=yandex_text\n )\n\n # Вызываем LLM для парсинга текста\n logger.info(\"Calling LLM Manager to parse text...\")\n yandex_items = llm_parser.parse_receipt(yandex_text)\n\n return RecognitionResult(\n source=\"yandex_vision_llm\",\n items=yandex_items,\n raw_text=yandex_text\n )\n else:\n logger.warning(\"Yandex Vision returned empty text or failed. No fallback available.\")\n return RecognitionResult(\n source=\"none\",\n items=[],\n raw_text=\"\"\n )\n else:\n logger.info(\"Yandex Vision credentials not set. No OCR available.\")\n return RecognitionResult(\n source=\"none\",\n items=[],\n raw_text=\"\"\n )\n\n except Exception as e:\n logger.error(f\"Error processing request: {e}\", exc_info=True)\n raise HTTPException(status_code=500, detail=str(e))\n\nif __name__ == \"__main__\":\n import uvicorn\n uvicorn.run(app, host=\"0.0.0.0\", port=5000)",
"app/schemas/models.py": "from typing import List\nfrom pydantic import BaseModel\n\nclass ParsedItem(BaseModel):\n raw_name: str\n amount: float\n price: float\n sum: float\n\nclass RecognitionResult(BaseModel):\n source: str # 'qr_api', 'virtual_qr_api', 'yandex_vision_llm', 'none'\n items: List[ParsedItem]\n raw_text: str = \"\"",
"app/services/auth.py": "import os\nimport time\nimport logging\nimport requests\nfrom typing import Optional\n\nlogger = logging.getLogger(__name__)\n\nIAM_TOKEN_URL = \"https://iam.api.cloud.yandex.net/iam/v1/tokens\"\n\nclass YandexAuthManager:\n def __init__(self):\n self.oauth_token = os.getenv(\"YANDEX_OAUTH_TOKEN\")\n \n # Кэширование IAM токена\n self._iam_token = None\n self._token_expire_time = 0\n\n if not self.oauth_token:\n logger.warning(\"YANDEX_OAUTH_TOKEN not set. Yandex services will be unavailable.\")\n\n def is_configured(self) -> bool:\n return bool(self.oauth_token)\n\n def reset_token(self):\n \"\"\"Сбрасывает кэшированный токен, заставляя получить новый при следующем вызове.\"\"\"\n self._iam_token = None\n self._token_expire_time = 0\n\n def get_iam_token(self) -> Optional[str]:\n \"\"\"\n Получает IAM-токен. Если есть живой кэшированный — возвращает его.\n Если нет — обменивает OAuth на IAM.\n \"\"\"\n current_time = time.time()\n \n # Если токен есть и он \"свежий\" (с запасом в 5 минут)\n if self._iam_token and current_time < self._token_expire_time - 300:\n return self._iam_token\n\n if not self.oauth_token:\n logger.error(\"OAuth token not available.\")\n return None\n\n logger.info(\"Obtaining new IAM token from Yandex...\")\n try:\n response = requests.post(\n IAM_TOKEN_URL,\n json={\"yandexPassportOauthToken\": self.oauth_token},\n timeout=10\n )\n response.raise_for_status()\n data = response.json()\n \n self._iam_token = data[\"iamToken\"]\n \n # Токен обычно живет 12 часов, но мы будем ориентироваться на поле expiresAt если нужно,\n # или просто поставим таймер. Для простоты берем 1 час жизни кэша.\n self._token_expire_time = current_time + 3600 \n \n logger.info(\"IAM token received successfully.\")\n return self._iam_token\n except Exception as e:\n logger.error(f\"Failed to get IAM token: {e}\")\n return None\n\n# Глобальный инстанс\nyandex_auth = YandexAuthManager()",
"app/services/llm.py": "import os\nimport requests\nfrom requests.exceptions import SSLError\nimport logging\nimport json\nimport uuid\nimport time\nimport re\nfrom abc import ABC, abstractmethod\nfrom typing import List, Optional\n\nfrom app.schemas.models import ParsedItem\nfrom app.services.auth import yandex_auth\n\nlogger = logging.getLogger(__name__)\n\nYANDEX_GPT_URL = \"https://llm.api.cloud.yandex.net/foundationModels/v1/completion\"\nGIGACHAT_OAUTH_URL = \"https://ngw.devices.sberbank.ru:9443/api/v2/oauth\"\nGIGACHAT_COMPLETION_URL = \"https://gigachat.devices.sberbank.ru/api/v1/chat/completions\"\n\nclass LLMProvider(ABC):\n @abstractmethod\n def generate(self, system_prompt: str, user_text: str) -> str:\n pass\n\nclass YandexGPTProvider(LLMProvider):\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n\n def generate(self, system_prompt: str, user_text: str) -> str:\n iam_token = yandex_auth.get_iam_token()\n if not iam_token:\n raise Exception(\"Failed to get IAM token\")\n\n prompt = {\n \"modelUri\": f\"gpt://{self.folder_id}/yandexgpt/latest\",\n \"completionOptions\": {\"stream\": False, \"temperature\": 0.1, \"maxTokens\": \"2000\"},\n \"messages\": [\n {\"role\": \"system\", \"text\": system_prompt},\n {\"role\": \"user\", \"text\": user_text}\n ]\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id\n }\n\n response = requests.post(YANDEX_GPT_URL, headers=headers, json=prompt, timeout=30)\n response.raise_for_status()\n content = response.json()['result']['alternatives'][0]['message']['text']\n return content\n\nclass GigaChatProvider(LLMProvider):\n def __init__(self):\n self.auth_key = os.getenv(\"GIGACHAT_AUTH_KEY\")\n self._access_token = None\n self._expires_at = 0\n\n def _get_token(self) -> Optional[str]:\n if self._access_token and time.time() < self._expires_at:\n return self._access_token\n\n logger.info(\"Obtaining GigaChat access token...\")\n headers = {\n 'Content-Type': 'application/x-www-form-urlencoded',\n 'Accept': 'application/json',\n 'RqUID': str(uuid.uuid4()),\n 'Authorization': f'Basic {self.auth_key}'\n }\n payload = {'scope': 'GIGACHAT_API_PERS'}\n\n response = requests.post(GIGACHAT_OAUTH_URL, headers=headers, data=payload, timeout=10)\n response.raise_for_status()\n data = response.json()\n self._access_token = data['access_token']\n self._expires_at = data['expires_at'] / 1000 # Переводим мс в сек\n return self._access_token\n\n def generate(self, system_prompt: str, user_text: str) -> str:\n token = self._get_token()\n if not token:\n raise Exception(\"Failed to get GigaChat token\")\n\n headers = {\n 'Content-Type': 'application/json',\n 'Accept': 'application/json',\n 'Authorization': f'Bearer {token}'\n }\n\n payload = {\n \"model\": \"GigaChat\",\n \"messages\": [\n {\"role\": \"system\", \"content\": system_prompt},\n {\"role\": \"user\", \"content\": user_text}\n ],\n \"temperature\": 0.1\n }\n\n try:\n response = requests.post(GIGACHAT_COMPLETION_URL, headers=headers, json=payload, timeout=30)\n response.raise_for_status()\n content = response.json()['choices'][0]['message']['content']\n return content\n except SSLError as e:\n logger.error(\"SSL Error with GigaChat. Check certificates\")\n raise e\n\nclass LLMManager:\n def __init__(self):\n engine = os.getenv(\"LLM_ENGINE\", \"yandex\").lower()\n self.engine = engine\n if engine == \"gigachat\":\n self.provider = GigaChatProvider()\n else:\n self.provider = YandexGPTProvider()\n logger.info(f\"LLM Engine initialized: {self.engine}\")\n\n def parse_receipt(self, raw_text: str) -> List[ParsedItem]:\n system_prompt = \"\"\"\nТы — профессиональный бухгалтер. Твоя задача — извлечь из \"сырого\" текста (OCR) структурированные данные о товарах.\n\nВХОДНЫЕ ДАННЫЕ:\nТекст чека или накладной. Может содержать мусор, заголовки, итоги, служебную информацию.\n\nФОРМАТ ОТВЕТА:\nТолько валидный JSON массив.\n[\n {\n \"raw_name\": \"Название товара\",\n \"amount\": 1.0, // Количество\n \"price\": 100.0, // Цена за единицу (с учетом скидок)\n \"sum\": 100.0 // Стоимость позиции\n }\n]\n\nПРАВИЛА:\n1. Игнорируй строки: \"ИТОГ\", \"СУММА\", \"НДС\", \"Кассир\", \"Сдача\", \"Безналичными\", телефоны, адреса, ссылки.\n2. Очищай числа от пробелов (например, \"1 630,00\" -> 1630.0).\n3. Если количество (amount) явно не указано, считай его равным 1.\n4. **КРИТИЧНО:** Один товар ЧАСТО занимает НЕСКОЛЬКО СТРОК. Название товара в первой строке, цифры (цена, количество, сумма) в последующих. **НИКОГДА НЕ ОБЪЕДИНЯЙ РАЗНЫЕ ТОВАРЫ В ОДНУ ЗАПИСЬ.** Анализируй блоки строк внимательно.\n **КРИТИЧНО:** Если в тексте есть строки вида `66.99 66.99 3шт. 200.97`, это значит: Цена=66.99, Кол-во=3, Сумма=200.97. Название товара находится СТРОГО В ПРЕДЫДУЩЕЙ СТРОКЕ.\n5. Название товара (raw_name) должно быть полным, но без цены и количества.\n\nВАЖНОЕ ПРАВИЛО ДЛЯ ЧЕКОВ ТИПА \"МАГНИТ\":\nЧасто информация о товаре разбита на две строки:\n1. Строка с названием: \"Яйцо столовое СО 10шт бокс:20\"\n2. Строка с цифрами: \"66.99 66.99 3шт. 200.97\"\n\nЕсли ты видишь строку с цифрами, ищи название в ПРЕДЫДУЩЕЙ строке. Объединяй их в один объект.\nНе создавай отдельный товар из строки с цифрами.\n\nПРИМЕР 1 (Простой):\nВход:\nХЛЕБ БОРОДИН 1 Х 45.00 45.00\nМОЛОКО 3.2% 2 Х 80.00 160.00\nВыход:\n[\n {\"raw_name\": \"ХЛЕБ БОРОДИН\", \"amount\": 1.0, \"price\": 45.0, \"sum\": 45.0},\n {\"raw_name\": \"МОЛОКО 3.2%\", \"amount\": 2.0, \"price\": 80.0, \"sum\": 160.0}\n]\n\nПРИМЕР 2 (Сложный, многострочный):\nВход:\nЯйцо столовое СО 10шт бокс:20\n66.99\n66.99\n3шт.\n200.97\nМАГНИТ Пакет-майка большой 15кг\n9.99\n9.99\n1шт.\n9.99\nСВЕКЛА 1кг\n32.99 32.99 1.292кг 42.62\nВыход:\n[\n {\"raw_name\": \"Яйцо столовое СО 10шт бокс:20\", \"amount\": 3.0, \"price\": 66.99, \"sum\": 200.97},\n {\"raw_name\": \"МАГНИТ Пакет-майка большой 15кг\", \"amount\": 1.0, \"price\": 9.99, \"sum\": 9.99},\n {\"raw_name\": \"СВЕКЛА 1кг\", \"amount\": 1.292, \"price\": 32.99, \"sum\": 42.62}\n]\n\"\"\"\n\n try:\n logger.info(f\"Parsing receipt using engine: {self.engine}\")\n response = self.provider.generate(system_prompt, raw_text)\n # Очистка от Markdown\n clean_json = response.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n # Парсинг JSON\n data = json.loads(clean_json)\n # Пост-обработка: исправление чисел\n for item in data:\n for key in ['amount', 'price', 'sum']:\n if isinstance(item[key], str):\n # Удаляем пробелы внутри чисел\n item[key] = float(re.sub(r'\\s+', '', item[key]).replace(',', '.'))\n elif isinstance(item[key], (int, float)):\n item[key] = float(item[key])\n return [ParsedItem(**item) for item in data]\n except Exception as e:\n logger.error(f\"LLM Parsing error: {e}\")\n return []\n\nllm_parser = LLMManager()",
"app/services/ocr.py": "from abc import ABC, abstractmethod\nfrom typing import Optional\nimport os\nimport json\nimport base64\nimport logging\nimport requests\n\nfrom app.services.auth import yandex_auth\n\nlogger = logging.getLogger(__name__)\n\nVISION_URL = \"https://ocr.api.cloud.yandex.net/ocr/v1/recognizeText\"\n\nclass OCREngine(ABC):\n @abstractmethod\n def recognize(self, image_bytes: bytes) -> str:\n \"\"\"Распознает текст из изображения и возвращает строку.\"\"\"\n pass\n\n @abstractmethod\n def is_configured(self) -> bool:\n \"\"\"Проверяет, настроен ли движок (наличие ключей/настроек).\"\"\"\n pass\n\nclass YandexOCREngine(OCREngine):\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n\n if not yandex_auth.is_configured() or not self.folder_id:\n logger.warning(\"Yandex OCR credentials (YANDEX_OAUTH_TOKEN, YANDEX_FOLDER_ID) not set. Yandex OCR will be unavailable.\")\n\n def is_configured(self) -> bool:\n return yandex_auth.is_configured() and bool(self.folder_id)\n\n def recognize(self, image_bytes: bytes) -> str:\n \"\"\"\n Отправляет изображение в Yandex Vision и возвращает полный текст.\n \"\"\"\n if not self.is_configured():\n logger.error(\"Yandex credentials missing.\")\n return \"\"\n\n iam_token = yandex_auth.get_iam_token()\n if not iam_token:\n return \"\"\n\n # 1. Кодируем в Base64\n b64_image = base64.b64encode(image_bytes).decode(\"utf-8\")\n\n # 2. Формируем тело запроса\n # Используем модель 'page' (для документов) и '*' для автоопределения языка\n payload = {\n \"mimeType\": \"JPEG\", # Yandex переваривает и PNG под видом JPEG часто, но лучше быть аккуратным.\n # В идеале определять mime-type из файла, но JPEG - безопасный дефолт для фото.\n \"languageCodes\": [\"*\"],\n \"model\": \"page\",\n \"content\": b64_image\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id,\n \"x-data-logging-enabled\": \"true\"\n }\n\n # 3. Отправляем запрос\n try:\n logger.info(\"Sending request to Yandex Vision OCR...\")\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n # Если 401 Unauthorized, возможно токен протух раньше времени (редко, но бывает)\n if response.status_code == 401:\n logger.warning(\"Got 401 from Yandex. Retrying with fresh token...\")\n yandex_auth.reset_token() # сброс кэша\n iam_token = yandex_auth.get_iam_token()\n if iam_token:\n headers[\"Authorization\"] = f\"Bearer {iam_token}\"\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n response.raise_for_status()\n result_json = response.json()\n \n # 4. Парсим ответ\n # Структура: result -> textAnnotation -> fullText\n # Или (если fullText нет) blocks -> lines -> text\n \n text_annotation = result_json.get(\"result\", {}).get(\"textAnnotation\", {})\n \n if not text_annotation:\n logger.warning(\"Yandex returned success but no textAnnotation found.\")\n return \"\"\n \n # Самый простой способ - взять fullText, он обычно склеен с \\n\n full_text = text_annotation.get(\"fullText\", \"\")\n \n if not full_text:\n # Фолбэк: если fullText пуст, собираем вручную по блокам\n logger.info(\"fullText empty, assembling from blocks...\")\n lines_text = []\n for block in text_annotation.get(\"blocks\", []):\n for line in block.get(\"lines\", []):\n lines_text.append(line.get(\"text\", \"\"))\n full_text = \"\\n\".join(lines_text)\n\n return full_text\n\n except Exception as e:\n logger.error(f\"Error during Yandex Vision request: {e}\")\n return \"\"\n\n# Глобальный инстанс\nyandex_engine = YandexOCREngine()",
"app/services/qr.py": "import logging\nimport requests\nimport re\nfrom typing import Optional, List\nfrom pyzbar.pyzbar import decode\nfrom PIL import Image\nimport numpy as np\n\nfrom app.schemas.models import ParsedItem\n\nAPI_TOKEN = \"36590.yqtiephCvvkYUKM2W\"\nAPI_URL = \"https://proverkacheka.com/api/v1/check/get\"\n\nlogger = logging.getLogger(__name__)\n\ndef extract_fiscal_data(text: str) -> Optional[str]:\n \"\"\"\n Ищет в тексте:\n - Дата и время (t): 19.12.25 12:16 -> 20251219T1216\n - Сумма (s): ИТОГ: 770.00\n - ФН (fn): 16 цифр, начинается на 73\n - ФД (i): до 8 цифр (ищем после Д: или ФД:)\n - ФП (fp): 10 цифр (ищем после П: или ФП:)\n \n Возвращает строку формата: t=20251219T1216&s=770.00&fn=7384440800514469&i=11194&fp=3334166168&n=1\n \"\"\"\n # 1. Поиск даты и времени\n # Ищем форматы DD.MM.YY HH:MM или DD.MM.YYYY HH:MM\n date_match = re.search(r'(\\d{2})\\.(\\d{2})\\.(\\d{2,4})\\s+(\\d{2}):(\\d{2})', text)\n t_param = \"\"\n if date_match:\n d, m, y, hh, mm = date_match.groups()\n if len(y) == 2: y = \"20\" + y\n t_param = f\"{y}{m}{d}T{hh}{mm}\"\n\n # 2. Поиск суммы (Итог)\n # Ищем слово ИТОГ и число после него\n sum_match = re.search(r'(?:ИТОГ|СУММА|СУММА:)\\s*[:]*\\s*(\\d+[.,]\\d{2})', text, re.IGNORECASE)\n s_param = \"\"\n if sum_match:\n s_param = sum_match.group(1).replace(',', '.')\n\n # 3. Поиск ФН (16 цифр, начинается с 73)\n fn_match = re.search(r'\\b(73\\d{14})\\b', text)\n fn_param = fn_match.group(1) if fn_match else \"\"\n\n # 4. Поиск ФД (i) - ищем после маркеров Д: или ФД:\n # Берем набор цифр до 8 знаков\n fd_match = re.search(r'(?:ФД|Д)[:\\s]+(\\d{1,8})\\b', text)\n i_param = fd_match.group(1) if fd_match else \"\"\n\n # 5. Поиск ФП (fp) - ищем после маркеров П: или ФП:\n # Строго 10 цифр\n fp_match = re.search(r'(?:ФП|П)[:\\s]+(\\d{10})\\b', text)\n fp_param = fp_match.group(1) if fp_match else \"\"\n\n # Валидация: для формирования запроса к API нам критически важны все параметры\n if all([t_param, s_param, fn_param, i_param, fp_param]):\n return f\"t={t_param}&s={s_param}&fn={fn_param}&i={i_param}&fp={fp_param}&n=1\"\n \n return None\n\ndef is_valid_fiscal_qr(qr_string: str) -> bool:\n \"\"\"\n Проверяет, соответствует ли строка формату фискального чека ФНС.\n Ожидаемый формат: t=...&s=...&fn=...&i=...&fp=...&n=...\n Мы проверяем наличие хотя бы 3-х ключевых параметров.\n \"\"\"\n if not qr_string:\n return False\n \n # Ключевые параметры, которые обязаны быть в строке чека\n required_keys = [\"t=\", \"s=\", \"fn=\"]\n \n # Проверяем, что все ключевые параметры присутствуют\n # (порядок может отличаться, поэтому проверяем вхождение каждого)\n matches = [key in qr_string for key in required_keys]\n \n return all(matches)\n\ndef detect_and_decode_qr(image: np.ndarray) -> Optional[str]:\n \"\"\"\n Ищет ВСЕ QR-коды на изображении и возвращает только тот, \n который похож на фискальный чек.\n \"\"\"\n try:\n pil_img = Image.fromarray(image)\n \n # Декодируем все коды на картинке\n decoded_objects = decode(pil_img)\n \n if not decoded_objects:\n logger.info(\"No QR codes detected on the image.\")\n return None\n \n logger.info(f\"Detected {len(decoded_objects)} code(s). Scanning for fiscal data...\")\n\n for obj in decoded_objects:\n if obj.type == 'QRCODE':\n qr_data = obj.data.decode(\"utf-8\")\n \n # Логируем найденное (для отладки, если вдруг формат хитрый)\n # Обрезаем длинные строки, чтобы не засорять лог\n log_preview = (qr_data[:75] + '..') if len(qr_data) > 75 else qr_data\n logger.info(f\"Checking QR content: {log_preview}\")\n \n if is_valid_fiscal_qr(qr_data):\n logger.info(\"Valid fiscal QR found!\")\n return qr_data\n else:\n logger.info(\"QR skipped (not a fiscal receipt pattern).\")\n \n logger.warning(\"QR codes were found, but none matched the fiscal receipt format.\")\n return None\n\n except Exception as e:\n logger.error(f\"Error during QR detection: {e}\")\n return None\n\ndef fetch_data_from_api(qr_raw: str) -> List[ParsedItem]:\n \"\"\"\n Отправляет данные QR-кода в API proverkacheka.com и парсит JSON-ответ.\n \"\"\"\n try:\n payload = {\n 'qrraw': qr_raw,\n 'token': API_TOKEN\n }\n \n logger.info(\"Sending request to Check API...\")\n response = requests.post(API_URL, data=payload, timeout=10)\n \n if response.status_code != 200:\n logger.error(f\"API Error: Status {response.status_code}\")\n return []\n\n data = response.json()\n \n if data.get('code') != 1:\n logger.warning(f\"API returned non-success code: {data.get('code')}\")\n return []\n \n json_data = data.get('data', {}).get('json', {})\n items_data = json_data.get('items', [])\n \n parsed_items = []\n \n for item in items_data:\n price = float(item.get('price', 0)) / 100.0\n total_sum = float(item.get('sum', 0)) / 100.0\n quantity = float(item.get('quantity', 0))\n name = item.get('name', 'Unknown')\n \n parsed_items.append(ParsedItem(\n raw_name=name,\n amount=quantity,\n price=price,\n sum=total_sum\n ))\n \n return parsed_items\n\n except Exception as e:\n logger.error(f\"Error fetching/parsing API data: {e}\")\n return []",
"requirements.txt": "fastapi\nuvicorn\npython-multipart\npydantic\nnumpy\nopencv-python-headless\nrequests\npyzbar\npillow\ncertifi",
"scripts/collect_data_raw.py": "import os\nimport requests\nimport json\nimport mimetypes\n\n# Папка, куда вы положите фото чеков для теста\nINPUT_DIR = \"./test_receipts\"\n# Папка, куда сохраним сырой текст\nOUTPUT_DIR = \"./raw_outputs\"\n# Адрес запущенного OCR сервиса\nAPI_URL = \"http://10.25.100.250:5006/recognize\"\n\ndef process_images():\n if not os.path.exists(OUTPUT_DIR):\n os.makedirs(OUTPUT_DIR)\n\n if not os.path.exists(INPUT_DIR):\n os.makedirs(INPUT_DIR)\n print(f\"Папка {INPUT_DIR} создана. Положите туда фото чеков и перезапустите скрипт.\")\n return\n\n files = [f for f in os.listdir(INPUT_DIR) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]\n \n if not files:\n print(f\"В папке {INPUT_DIR} нет изображений.\")\n return\n\n print(f\"Найдено {len(files)} файлов. Начинаю обработку...\")\n\n for filename in files:\n file_path = os.path.join(INPUT_DIR, filename)\n mime_type, _ = mimetypes.guess_type(file_path)\n \n print(f\"Processing {filename}...\", end=\" \")\n \n try:\n with open(file_path, 'rb') as f:\n files = {'image': (filename, f, mime_type or 'image/jpeg')}\n response = requests.post(API_URL, files=files, timeout=30)\n \n if response.status_code == 200:\n data = response.json()\n raw_text = data.get(\"raw_text\", \"\")\n source = data.get(\"source\", \"unknown\")\n \n # Сохраняем RAW текст\n out_name = f\"{filename}_RAW.txt\"\n with open(os.path.join(OUTPUT_DIR, out_name), \"w\", encoding=\"utf-8\") as out:\n out.write(f\"Source: {source}\\n\")\n out.write(\"=\"*20 + \"\\n\")\n out.write(raw_text)\n \n print(f\"OK ({source}) -> {out_name}\")\n else:\n print(f\"FAIL: {response.status_code} - {response.text}\")\n \n except Exception as e:\n print(f\"ERROR: {e}\")\n\nif __name__ == \"__main__\":\n process_images()",
"scripts/test_parsing_quality.py": "import os\nimport requests\nimport json\nimport mimetypes\n\n# Папка с фото\nINPUT_DIR = \"./test_receipts\"\n# Папка для результатов\nOUTPUT_DIR = \"./json_results\"\n# Адрес сервиса\nAPI_URL = \"http://10.25.100.250:5006/recognize\"\n\ndef test_parsing():\n if not os.path.exists(OUTPUT_DIR):\n os.makedirs(OUTPUT_DIR)\n\n if not os.path.exists(INPUT_DIR):\n print(f\"Папка {INPUT_DIR} не найдена.\")\n return\n\n files = [f for f in os.listdir(INPUT_DIR) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]\n \n print(f\"Найдено {len(files)} файлов. Тестируем парсинг...\")\n\n for filename in files:\n file_path = os.path.join(INPUT_DIR, filename)\n mime_type, _ = mimetypes.guess_type(file_path)\n \n print(f\"Processing {filename}...\", end=\" \")\n \n try:\n with open(file_path, 'rb') as f:\n files = {'image': (filename, f, mime_type or 'image/jpeg')}\n response = requests.post(API_URL, files=files, timeout=60) # Увеличили таймаут для LLM\n \n if response.status_code == 200:\n data = response.json()\n items = data.get(\"items\", [])\n source = data.get(\"source\", \"unknown\")\n \n # Сохраняем JSON\n out_name = f\"{filename}_RESULT.json\"\n with open(os.path.join(OUTPUT_DIR, out_name), \"w\", encoding=\"utf-8\") as out:\n json.dump(data, out, ensure_ascii=False, indent=2)\n \n print(f\"OK ({source}) -> Found {len(items)} items\")\n else:\n print(f\"FAIL: {response.status_code} - {response.text}\")\n \n except Exception as e:\n print(f\"ERROR: {e}\")\n\nif __name__ == \"__main__\":\n test_parsing()",
}