# -*- coding: utf-8 -*- # PYTHON PROJECT DUMP # GENERATED BY pack_python_project.py project_tree = ''' ocr-service/ ├── .dockerignore ├── .env ├── Dockerfile ├── app │ ├── main.py │ ├── schemas │ │ └── models.py │ └── services │ ├── auth.py │ ├── llm.py │ ├── ocr.py │ └── qr.py ├── requirements.txt └── scripts ├── collect_data_raw.py └── test_parsing_quality.py ''' project_files = { "app/main.py": "import logging\nimport os\nfrom typing import List\n\nfrom fastapi import FastAPI, File, UploadFile, HTTPException\nfrom pydantic import BaseModel\nimport cv2\nimport numpy as np\n\n# Импортируем модули\nfrom app.schemas.models import ParsedItem, RecognitionResult\nfrom app.services.qr import detect_and_decode_qr, fetch_data_from_api, extract_fiscal_data\n# Импортируем новый модуль\nfrom app.services.ocr import yandex_engine\nfrom app.services.llm import llm_parser\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\napp = FastAPI(title=\"RMSER OCR Service (Cloud-only: QR + Yandex + GigaChat)\")\n\n@app.get(\"/health\")\ndef health_check():\n return {\"status\": \"ok\"}\n\n@app.post(\"/recognize\", response_model=RecognitionResult)\nasync def recognize_receipt(image: UploadFile = File(...)):\n \"\"\"\n Стратегия:\n 1. QR Code + FNS API (Приоритет 1 - Идеальная точность)\n 2. Yandex Vision OCR + LLM (Приоритет 2 - Высокая точность, если настроен)\n Если ничего не найдено, возвращает пустой результат.\n \"\"\"\n logger.info(f\"Received file: {image.filename}, content_type: {image.content_type}\")\n\n if not image.content_type.startswith(\"image/\"):\n raise HTTPException(status_code=400, detail=\"File must be an image\")\n\n try:\n # Читаем сырые байты\n content = await image.read()\n \n # Конвертируем в numpy для QR\n nparr = np.frombuffer(content, np.uint8)\n original_cv_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n\n if original_cv_image is None:\n raise HTTPException(status_code=400, detail=\"Invalid image data\")\n\n # --- ЭТАП 1: QR Code Strategy ---\n logger.info(\"--- Stage 1: QR Code Detection ---\")\n qr_raw = detect_and_decode_qr(original_cv_image)\n \n if qr_raw:\n logger.info(\"QR found! Fetching data from API...\")\n api_items = fetch_data_from_api(qr_raw)\n \n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via QR API.\")\n return RecognitionResult(\n source=\"qr_api\",\n items=api_items,\n raw_text=f\"QR Content: {qr_raw}\"\n )\n else:\n logger.warning(\"QR found but API failed. Falling back to OCR.\")\n else:\n logger.info(\"QR code not found. Proceeding to OCR.\")\n\n # --- ЭТАП 2: OCR + Virtual QR Strategy ---\n if yandex_engine.is_configured():\n logger.info(\"--- Stage 2: Yandex Vision OCR + Virtual QR ---\")\n yandex_text = yandex_engine.recognize(content)\n\n if yandex_text and len(yandex_text) > 10:\n logger.info(f\"OCR success. Raw text length: {len(yandex_text)}\")\n\n # Попытка собрать виртуальный QR из текста\n virtual_qr = extract_fiscal_data(yandex_text)\n if virtual_qr:\n logger.info(f\"Virtual QR constructed: {virtual_qr}\")\n api_items = fetch_data_from_api(virtual_qr)\n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via Virtual QR API.\")\n return RecognitionResult(\n source=\"virtual_qr_api\",\n items=api_items,\n raw_text=yandex_text\n )\n\n # Вызываем LLM для парсинга текста\n logger.info(\"Calling LLM Manager to parse text...\")\n yandex_items = llm_parser.parse_receipt(yandex_text)\n\n return RecognitionResult(\n source=\"yandex_vision_llm\",\n items=yandex_items,\n raw_text=yandex_text\n )\n else:\n logger.warning(\"Yandex Vision returned empty text or failed. No fallback available.\")\n return RecognitionResult(\n source=\"none\",\n items=[],\n raw_text=\"\"\n )\n else:\n logger.info(\"Yandex Vision credentials not set. No OCR available.\")\n return RecognitionResult(\n source=\"none\",\n items=[],\n raw_text=\"\"\n )\n\n except Exception as e:\n logger.error(f\"Error processing request: {e}\", exc_info=True)\n raise HTTPException(status_code=500, detail=str(e))\n\nif __name__ == \"__main__\":\n import uvicorn\n uvicorn.run(app, host=\"0.0.0.0\", port=5000)", "app/schemas/models.py": "from typing import List\nfrom pydantic import BaseModel\n\nclass ParsedItem(BaseModel):\n raw_name: str\n amount: float\n price: float\n sum: float\n\nclass RecognitionResult(BaseModel):\n source: str # 'qr_api', 'virtual_qr_api', 'yandex_vision_llm', 'none'\n items: List[ParsedItem]\n raw_text: str = \"\"", "app/services/auth.py": "import os\nimport time\nimport logging\nimport requests\nfrom typing import Optional\n\nlogger = logging.getLogger(__name__)\n\nIAM_TOKEN_URL = \"https://iam.api.cloud.yandex.net/iam/v1/tokens\"\n\nclass YandexAuthManager:\n def __init__(self):\n self.oauth_token = os.getenv(\"YANDEX_OAUTH_TOKEN\")\n \n # Кэширование IAM токена\n self._iam_token = None\n self._token_expire_time = 0\n\n if not self.oauth_token:\n logger.warning(\"YANDEX_OAUTH_TOKEN not set. Yandex services will be unavailable.\")\n\n def is_configured(self) -> bool:\n return bool(self.oauth_token)\n\n def reset_token(self):\n \"\"\"Сбрасывает кэшированный токен, заставляя получить новый при следующем вызове.\"\"\"\n self._iam_token = None\n self._token_expire_time = 0\n\n def get_iam_token(self) -> Optional[str]:\n \"\"\"\n Получает IAM-токен. Если есть живой кэшированный — возвращает его.\n Если нет — обменивает OAuth на IAM.\n \"\"\"\n current_time = time.time()\n \n # Если токен есть и он \"свежий\" (с запасом в 5 минут)\n if self._iam_token and current_time < self._token_expire_time - 300:\n return self._iam_token\n\n if not self.oauth_token:\n logger.error(\"OAuth token not available.\")\n return None\n\n logger.info(\"Obtaining new IAM token from Yandex...\")\n try:\n response = requests.post(\n IAM_TOKEN_URL,\n json={\"yandexPassportOauthToken\": self.oauth_token},\n timeout=10\n )\n response.raise_for_status()\n data = response.json()\n \n self._iam_token = data[\"iamToken\"]\n \n # Токен обычно живет 12 часов, но мы будем ориентироваться на поле expiresAt если нужно,\n # или просто поставим таймер. Для простоты берем 1 час жизни кэша.\n self._token_expire_time = current_time + 3600 \n \n logger.info(\"IAM token received successfully.\")\n return self._iam_token\n except Exception as e:\n logger.error(f\"Failed to get IAM token: {e}\")\n return None\n\n# Глобальный инстанс\nyandex_auth = YandexAuthManager()", "app/services/llm.py": "import os\nimport requests\nfrom requests.exceptions import SSLError\nimport logging\nimport json\nimport uuid\nimport time\nimport re\nfrom abc import ABC, abstractmethod\nfrom typing import List, Optional\n\nfrom app.schemas.models import ParsedItem\nfrom app.services.auth import yandex_auth\n\nlogger = logging.getLogger(__name__)\n\nYANDEX_GPT_URL = \"https://llm.api.cloud.yandex.net/foundationModels/v1/completion\"\nGIGACHAT_OAUTH_URL = \"https://ngw.devices.sberbank.ru:9443/api/v2/oauth\"\nGIGACHAT_COMPLETION_URL = \"https://gigachat.devices.sberbank.ru/api/v1/chat/completions\"\n\nclass LLMProvider(ABC):\n @abstractmethod\n def generate(self, system_prompt: str, user_text: str) -> str:\n pass\n\nclass YandexGPTProvider(LLMProvider):\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n\n def generate(self, system_prompt: str, user_text: str) -> str:\n iam_token = yandex_auth.get_iam_token()\n if not iam_token:\n raise Exception(\"Failed to get IAM token\")\n\n prompt = {\n \"modelUri\": f\"gpt://{self.folder_id}/yandexgpt/latest\",\n \"completionOptions\": {\"stream\": False, \"temperature\": 0.1, \"maxTokens\": \"2000\"},\n \"messages\": [\n {\"role\": \"system\", \"text\": system_prompt},\n {\"role\": \"user\", \"text\": user_text}\n ]\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id\n }\n\n response = requests.post(YANDEX_GPT_URL, headers=headers, json=prompt, timeout=30)\n response.raise_for_status()\n content = response.json()['result']['alternatives'][0]['message']['text']\n return content\n\nclass GigaChatProvider(LLMProvider):\n def __init__(self):\n self.auth_key = os.getenv(\"GIGACHAT_AUTH_KEY\")\n self._access_token = None\n self._expires_at = 0\n\n def _get_token(self) -> Optional[str]:\n if self._access_token and time.time() < self._expires_at:\n return self._access_token\n\n logger.info(\"Obtaining GigaChat access token...\")\n headers = {\n 'Content-Type': 'application/x-www-form-urlencoded',\n 'Accept': 'application/json',\n 'RqUID': str(uuid.uuid4()),\n 'Authorization': f'Basic {self.auth_key}'\n }\n payload = {'scope': 'GIGACHAT_API_PERS'}\n\n response = requests.post(GIGACHAT_OAUTH_URL, headers=headers, data=payload, timeout=10)\n response.raise_for_status()\n data = response.json()\n self._access_token = data['access_token']\n self._expires_at = data['expires_at'] / 1000 # Переводим мс в сек\n return self._access_token\n\n def generate(self, system_prompt: str, user_text: str) -> str:\n token = self._get_token()\n if not token:\n raise Exception(\"Failed to get GigaChat token\")\n\n headers = {\n 'Content-Type': 'application/json',\n 'Accept': 'application/json',\n 'Authorization': f'Bearer {token}'\n }\n\n payload = {\n \"model\": \"GigaChat\",\n \"messages\": [\n {\"role\": \"system\", \"content\": system_prompt},\n {\"role\": \"user\", \"content\": user_text}\n ],\n \"temperature\": 0.1\n }\n\n try:\n response = requests.post(GIGACHAT_COMPLETION_URL, headers=headers, json=payload, timeout=30)\n response.raise_for_status()\n content = response.json()['choices'][0]['message']['content']\n return content\n except SSLError as e:\n logger.error(\"SSL Error with GigaChat. Check certificates\")\n raise e\n\nclass LLMManager:\n def __init__(self):\n engine = os.getenv(\"LLM_ENGINE\", \"yandex\").lower()\n self.engine = engine\n if engine == \"gigachat\":\n self.provider = GigaChatProvider()\n else:\n self.provider = YandexGPTProvider()\n logger.info(f\"LLM Engine initialized: {self.engine}\")\n\n def parse_receipt(self, raw_text: str) -> List[ParsedItem]:\n system_prompt = \"\"\"\nТы — профессиональный бухгалтер. Твоя задача — извлечь из \"сырого\" текста (OCR) структурированные данные о товарах.\n\nВХОДНЫЕ ДАННЫЕ:\nТекст чека или накладной. Может содержать мусор, заголовки, итоги, служебную информацию.\n\nФОРМАТ ОТВЕТА:\nТолько валидный JSON массив.\n[\n {\n \"raw_name\": \"Название товара\",\n \"amount\": 1.0, // Количество\n \"price\": 100.0, // Цена за единицу (с учетом скидок)\n \"sum\": 100.0 // Стоимость позиции\n }\n]\n\nПРАВИЛА:\n1. Игнорируй строки: \"ИТОГ\", \"СУММА\", \"НДС\", \"Кассир\", \"Сдача\", \"Безналичными\", телефоны, адреса, ссылки.\n2. Очищай числа от пробелов (например, \"1 630,00\" -> 1630.0).\n3. Если количество (amount) явно не указано, считай его равным 1.\n4. **КРИТИЧНО:** Один товар ЧАСТО занимает НЕСКОЛЬКО СТРОК. Название товара в первой строке, цифры (цена, количество, сумма) в последующих. **НИКОГДА НЕ ОБЪЕДИНЯЙ РАЗНЫЕ ТОВАРЫ В ОДНУ ЗАПИСЬ.** Анализируй блоки строк внимательно.\n **КРИТИЧНО:** Если в тексте есть строки вида `66.99 66.99 3шт. 200.97`, это значит: Цена=66.99, Кол-во=3, Сумма=200.97. Название товара находится СТРОГО В ПРЕДЫДУЩЕЙ СТРОКЕ.\n5. Название товара (raw_name) должно быть полным, но без цены и количества.\n\nВАЖНОЕ ПРАВИЛО ДЛЯ ЧЕКОВ ТИПА \"МАГНИТ\":\nЧасто информация о товаре разбита на две строки:\n1. Строка с названием: \"Яйцо столовое СО 10шт бокс:20\"\n2. Строка с цифрами: \"66.99 66.99 3шт. 200.97\"\n\nЕсли ты видишь строку с цифрами, ищи название в ПРЕДЫДУЩЕЙ строке. Объединяй их в один объект.\nНе создавай отдельный товар из строки с цифрами.\n\nПРИМЕР 1 (Простой):\nВход:\nХЛЕБ БОРОДИН 1 Х 45.00 45.00\nМОЛОКО 3.2% 2 Х 80.00 160.00\nВыход:\n[\n {\"raw_name\": \"ХЛЕБ БОРОДИН\", \"amount\": 1.0, \"price\": 45.0, \"sum\": 45.0},\n {\"raw_name\": \"МОЛОКО 3.2%\", \"amount\": 2.0, \"price\": 80.0, \"sum\": 160.0}\n]\n\nПРИМЕР 2 (Сложный, многострочный):\nВход:\nЯйцо столовое СО 10шт бокс:20\n66.99\n66.99\n3шт.\n200.97\nМАГНИТ Пакет-майка большой 15кг\n9.99\n9.99\n1шт.\n9.99\nСВЕКЛА 1кг\n32.99 32.99 1.292кг 42.62\nВыход:\n[\n {\"raw_name\": \"Яйцо столовое СО 10шт бокс:20\", \"amount\": 3.0, \"price\": 66.99, \"sum\": 200.97},\n {\"raw_name\": \"МАГНИТ Пакет-майка большой 15кг\", \"amount\": 1.0, \"price\": 9.99, \"sum\": 9.99},\n {\"raw_name\": \"СВЕКЛА 1кг\", \"amount\": 1.292, \"price\": 32.99, \"sum\": 42.62}\n]\n\"\"\"\n\n try:\n logger.info(f\"Parsing receipt using engine: {self.engine}\")\n response = self.provider.generate(system_prompt, raw_text)\n # Очистка от Markdown\n clean_json = response.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n # Парсинг JSON\n data = json.loads(clean_json)\n # Пост-обработка: исправление чисел\n for item in data:\n for key in ['amount', 'price', 'sum']:\n if isinstance(item[key], str):\n # Удаляем пробелы внутри чисел\n item[key] = float(re.sub(r'\\s+', '', item[key]).replace(',', '.'))\n elif isinstance(item[key], (int, float)):\n item[key] = float(item[key])\n return [ParsedItem(**item) for item in data]\n except Exception as e:\n logger.error(f\"LLM Parsing error: {e}\")\n return []\n\nllm_parser = LLMManager()", "app/services/ocr.py": "from abc import ABC, abstractmethod\nfrom typing import Optional\nimport os\nimport json\nimport base64\nimport logging\nimport requests\n\nfrom app.services.auth import yandex_auth\n\nlogger = logging.getLogger(__name__)\n\nVISION_URL = \"https://ocr.api.cloud.yandex.net/ocr/v1/recognizeText\"\n\nclass OCREngine(ABC):\n @abstractmethod\n def recognize(self, image_bytes: bytes) -> str:\n \"\"\"Распознает текст из изображения и возвращает строку.\"\"\"\n pass\n\n @abstractmethod\n def is_configured(self) -> bool:\n \"\"\"Проверяет, настроен ли движок (наличие ключей/настроек).\"\"\"\n pass\n\nclass YandexOCREngine(OCREngine):\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n\n if not yandex_auth.is_configured() or not self.folder_id:\n logger.warning(\"Yandex OCR credentials (YANDEX_OAUTH_TOKEN, YANDEX_FOLDER_ID) not set. Yandex OCR will be unavailable.\")\n\n def is_configured(self) -> bool:\n return yandex_auth.is_configured() and bool(self.folder_id)\n\n def recognize(self, image_bytes: bytes) -> str:\n \"\"\"\n Отправляет изображение в Yandex Vision и возвращает полный текст.\n \"\"\"\n if not self.is_configured():\n logger.error(\"Yandex credentials missing.\")\n return \"\"\n\n iam_token = yandex_auth.get_iam_token()\n if not iam_token:\n return \"\"\n\n # 1. Кодируем в Base64\n b64_image = base64.b64encode(image_bytes).decode(\"utf-8\")\n\n # 2. Формируем тело запроса\n # Используем модель 'page' (для документов) и '*' для автоопределения языка\n payload = {\n \"mimeType\": \"JPEG\", # Yandex переваривает и PNG под видом JPEG часто, но лучше быть аккуратным.\n # В идеале определять mime-type из файла, но JPEG - безопасный дефолт для фото.\n \"languageCodes\": [\"*\"],\n \"model\": \"page\",\n \"content\": b64_image\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id,\n \"x-data-logging-enabled\": \"true\"\n }\n\n # 3. Отправляем запрос\n try:\n logger.info(\"Sending request to Yandex Vision OCR...\")\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n # Если 401 Unauthorized, возможно токен протух раньше времени (редко, но бывает)\n if response.status_code == 401:\n logger.warning(\"Got 401 from Yandex. Retrying with fresh token...\")\n yandex_auth.reset_token() # сброс кэша\n iam_token = yandex_auth.get_iam_token()\n if iam_token:\n headers[\"Authorization\"] = f\"Bearer {iam_token}\"\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n response.raise_for_status()\n result_json = response.json()\n \n # 4. Парсим ответ\n # Структура: result -> textAnnotation -> fullText\n # Или (если fullText нет) blocks -> lines -> text\n \n text_annotation = result_json.get(\"result\", {}).get(\"textAnnotation\", {})\n \n if not text_annotation:\n logger.warning(\"Yandex returned success but no textAnnotation found.\")\n return \"\"\n \n # Самый простой способ - взять fullText, он обычно склеен с \\n\n full_text = text_annotation.get(\"fullText\", \"\")\n \n if not full_text:\n # Фолбэк: если fullText пуст, собираем вручную по блокам\n logger.info(\"fullText empty, assembling from blocks...\")\n lines_text = []\n for block in text_annotation.get(\"blocks\", []):\n for line in block.get(\"lines\", []):\n lines_text.append(line.get(\"text\", \"\"))\n full_text = \"\\n\".join(lines_text)\n\n return full_text\n\n except Exception as e:\n logger.error(f\"Error during Yandex Vision request: {e}\")\n return \"\"\n\n# Глобальный инстанс\nyandex_engine = YandexOCREngine()", "app/services/qr.py": "import logging\nimport requests\nimport re\nfrom typing import Optional, List\nfrom pyzbar.pyzbar import decode\nfrom PIL import Image\nimport numpy as np\n\nfrom app.schemas.models import ParsedItem\n\nAPI_TOKEN = \"36590.yqtiephCvvkYUKM2W\"\nAPI_URL = \"https://proverkacheka.com/api/v1/check/get\"\n\nlogger = logging.getLogger(__name__)\n\ndef extract_fiscal_data(text: str) -> Optional[str]:\n \"\"\"\n Ищет в тексте:\n - Дата и время (t): 19.12.25 12:16 -> 20251219T1216\n - Сумма (s): ИТОГ: 770.00\n - ФН (fn): 16 цифр, начинается на 73\n - ФД (i): до 8 цифр (ищем после Д: или ФД:)\n - ФП (fp): 10 цифр (ищем после П: или ФП:)\n \n Возвращает строку формата: t=20251219T1216&s=770.00&fn=7384440800514469&i=11194&fp=3334166168&n=1\n \"\"\"\n # 1. Поиск даты и времени\n # Ищем форматы DD.MM.YY HH:MM или DD.MM.YYYY HH:MM\n date_match = re.search(r'(\\d{2})\\.(\\d{2})\\.(\\d{2,4})\\s+(\\d{2}):(\\d{2})', text)\n t_param = \"\"\n if date_match:\n d, m, y, hh, mm = date_match.groups()\n if len(y) == 2: y = \"20\" + y\n t_param = f\"{y}{m}{d}T{hh}{mm}\"\n\n # 2. Поиск суммы (Итог)\n # Ищем слово ИТОГ и число после него\n sum_match = re.search(r'(?:ИТОГ|СУММА|СУММА:)\\s*[:]*\\s*(\\d+[.,]\\d{2})', text, re.IGNORECASE)\n s_param = \"\"\n if sum_match:\n s_param = sum_match.group(1).replace(',', '.')\n\n # 3. Поиск ФН (16 цифр, начинается с 73)\n fn_match = re.search(r'\\b(73\\d{14})\\b', text)\n fn_param = fn_match.group(1) if fn_match else \"\"\n\n # 4. Поиск ФД (i) - ищем после маркеров Д: или ФД:\n # Берем набор цифр до 8 знаков\n fd_match = re.search(r'(?:ФД|Д)[:\\s]+(\\d{1,8})\\b', text)\n i_param = fd_match.group(1) if fd_match else \"\"\n\n # 5. Поиск ФП (fp) - ищем после маркеров П: или ФП:\n # Строго 10 цифр\n fp_match = re.search(r'(?:ФП|П)[:\\s]+(\\d{10})\\b', text)\n fp_param = fp_match.group(1) if fp_match else \"\"\n\n # Валидация: для формирования запроса к API нам критически важны все параметры\n if all([t_param, s_param, fn_param, i_param, fp_param]):\n return f\"t={t_param}&s={s_param}&fn={fn_param}&i={i_param}&fp={fp_param}&n=1\"\n \n return None\n\ndef is_valid_fiscal_qr(qr_string: str) -> bool:\n \"\"\"\n Проверяет, соответствует ли строка формату фискального чека ФНС.\n Ожидаемый формат: t=...&s=...&fn=...&i=...&fp=...&n=...\n Мы проверяем наличие хотя бы 3-х ключевых параметров.\n \"\"\"\n if not qr_string:\n return False\n \n # Ключевые параметры, которые обязаны быть в строке чека\n required_keys = [\"t=\", \"s=\", \"fn=\"]\n \n # Проверяем, что все ключевые параметры присутствуют\n # (порядок может отличаться, поэтому проверяем вхождение каждого)\n matches = [key in qr_string for key in required_keys]\n \n return all(matches)\n\ndef detect_and_decode_qr(image: np.ndarray) -> Optional[str]:\n \"\"\"\n Ищет ВСЕ QR-коды на изображении и возвращает только тот, \n который похож на фискальный чек.\n \"\"\"\n try:\n pil_img = Image.fromarray(image)\n \n # Декодируем все коды на картинке\n decoded_objects = decode(pil_img)\n \n if not decoded_objects:\n logger.info(\"No QR codes detected on the image.\")\n return None\n \n logger.info(f\"Detected {len(decoded_objects)} code(s). Scanning for fiscal data...\")\n\n for obj in decoded_objects:\n if obj.type == 'QRCODE':\n qr_data = obj.data.decode(\"utf-8\")\n \n # Логируем найденное (для отладки, если вдруг формат хитрый)\n # Обрезаем длинные строки, чтобы не засорять лог\n log_preview = (qr_data[:75] + '..') if len(qr_data) > 75 else qr_data\n logger.info(f\"Checking QR content: {log_preview}\")\n \n if is_valid_fiscal_qr(qr_data):\n logger.info(\"Valid fiscal QR found!\")\n return qr_data\n else:\n logger.info(\"QR skipped (not a fiscal receipt pattern).\")\n \n logger.warning(\"QR codes were found, but none matched the fiscal receipt format.\")\n return None\n\n except Exception as e:\n logger.error(f\"Error during QR detection: {e}\")\n return None\n\ndef fetch_data_from_api(qr_raw: str) -> List[ParsedItem]:\n \"\"\"\n Отправляет данные QR-кода в API proverkacheka.com и парсит JSON-ответ.\n \"\"\"\n try:\n payload = {\n 'qrraw': qr_raw,\n 'token': API_TOKEN\n }\n \n logger.info(\"Sending request to Check API...\")\n response = requests.post(API_URL, data=payload, timeout=10)\n \n if response.status_code != 200:\n logger.error(f\"API Error: Status {response.status_code}\")\n return []\n\n data = response.json()\n \n if data.get('code') != 1:\n logger.warning(f\"API returned non-success code: {data.get('code')}\")\n return []\n \n json_data = data.get('data', {}).get('json', {})\n items_data = json_data.get('items', [])\n \n parsed_items = []\n \n for item in items_data:\n price = float(item.get('price', 0)) / 100.0\n total_sum = float(item.get('sum', 0)) / 100.0\n quantity = float(item.get('quantity', 0))\n name = item.get('name', 'Unknown')\n \n parsed_items.append(ParsedItem(\n raw_name=name,\n amount=quantity,\n price=price,\n sum=total_sum\n ))\n \n return parsed_items\n\n except Exception as e:\n logger.error(f\"Error fetching/parsing API data: {e}\")\n return []", "requirements.txt": "fastapi\nuvicorn\npython-multipart\npydantic\nnumpy\nopencv-python-headless\nrequests\npyzbar\npillow\ncertifi", "scripts/collect_data_raw.py": "import os\nimport requests\nimport json\nimport mimetypes\n\n# Папка, куда вы положите фото чеков для теста\nINPUT_DIR = \"./test_receipts\"\n# Папка, куда сохраним сырой текст\nOUTPUT_DIR = \"./raw_outputs\"\n# Адрес запущенного OCR сервиса\nAPI_URL = \"http://10.25.100.250:5006/recognize\"\n\ndef process_images():\n if not os.path.exists(OUTPUT_DIR):\n os.makedirs(OUTPUT_DIR)\n\n if not os.path.exists(INPUT_DIR):\n os.makedirs(INPUT_DIR)\n print(f\"Папка {INPUT_DIR} создана. Положите туда фото чеков и перезапустите скрипт.\")\n return\n\n files = [f for f in os.listdir(INPUT_DIR) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]\n \n if not files:\n print(f\"В папке {INPUT_DIR} нет изображений.\")\n return\n\n print(f\"Найдено {len(files)} файлов. Начинаю обработку...\")\n\n for filename in files:\n file_path = os.path.join(INPUT_DIR, filename)\n mime_type, _ = mimetypes.guess_type(file_path)\n \n print(f\"Processing {filename}...\", end=\" \")\n \n try:\n with open(file_path, 'rb') as f:\n files = {'image': (filename, f, mime_type or 'image/jpeg')}\n response = requests.post(API_URL, files=files, timeout=30)\n \n if response.status_code == 200:\n data = response.json()\n raw_text = data.get(\"raw_text\", \"\")\n source = data.get(\"source\", \"unknown\")\n \n # Сохраняем RAW текст\n out_name = f\"{filename}_RAW.txt\"\n with open(os.path.join(OUTPUT_DIR, out_name), \"w\", encoding=\"utf-8\") as out:\n out.write(f\"Source: {source}\\n\")\n out.write(\"=\"*20 + \"\\n\")\n out.write(raw_text)\n \n print(f\"OK ({source}) -> {out_name}\")\n else:\n print(f\"FAIL: {response.status_code} - {response.text}\")\n \n except Exception as e:\n print(f\"ERROR: {e}\")\n\nif __name__ == \"__main__\":\n process_images()", "scripts/test_parsing_quality.py": "import os\nimport requests\nimport json\nimport mimetypes\n\n# Папка с фото\nINPUT_DIR = \"./test_receipts\"\n# Папка для результатов\nOUTPUT_DIR = \"./json_results\"\n# Адрес сервиса\nAPI_URL = \"http://10.25.100.250:5006/recognize\"\n\ndef test_parsing():\n if not os.path.exists(OUTPUT_DIR):\n os.makedirs(OUTPUT_DIR)\n\n if not os.path.exists(INPUT_DIR):\n print(f\"Папка {INPUT_DIR} не найдена.\")\n return\n\n files = [f for f in os.listdir(INPUT_DIR) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]\n \n print(f\"Найдено {len(files)} файлов. Тестируем парсинг...\")\n\n for filename in files:\n file_path = os.path.join(INPUT_DIR, filename)\n mime_type, _ = mimetypes.guess_type(file_path)\n \n print(f\"Processing {filename}...\", end=\" \")\n \n try:\n with open(file_path, 'rb') as f:\n files = {'image': (filename, f, mime_type or 'image/jpeg')}\n response = requests.post(API_URL, files=files, timeout=60) # Увеличили таймаут для LLM\n \n if response.status_code == 200:\n data = response.json()\n items = data.get(\"items\", [])\n source = data.get(\"source\", \"unknown\")\n \n # Сохраняем JSON\n out_name = f\"{filename}_RESULT.json\"\n with open(os.path.join(OUTPUT_DIR, out_name), \"w\", encoding=\"utf-8\") as out:\n json.dump(data, out, ensure_ascii=False, indent=2)\n \n print(f\"OK ({source}) -> Found {len(items)} items\")\n else:\n print(f\"FAIL: {response.status_code} - {response.text}\")\n \n except Exception as e:\n print(f\"ERROR: {e}\")\n\nif __name__ == \"__main__\":\n test_parsing()", }