2612-есть ок OCR, нужно допиливать бота под новый flow для операторов

This commit is contained in:
2026-01-27 00:17:10 +03:00
parent 7d2ffb54b5
commit 1843cb9c20
22 changed files with 1011 additions and 577 deletions

View File

@@ -0,0 +1,68 @@
import os
import time
import logging
import requests
from typing import Optional
logger = logging.getLogger(__name__)
IAM_TOKEN_URL = "https://iam.api.cloud.yandex.net/iam/v1/tokens"
class YandexAuthManager:
def __init__(self):
self.oauth_token = os.getenv("YANDEX_OAUTH_TOKEN")
# Кэширование IAM токена
self._iam_token = None
self._token_expire_time = 0
if not self.oauth_token:
logger.warning("YANDEX_OAUTH_TOKEN not set. Yandex services will be unavailable.")
def is_configured(self) -> bool:
return bool(self.oauth_token)
def reset_token(self):
"""Сбрасывает кэшированный токен, заставляя получить новый при следующем вызове."""
self._iam_token = None
self._token_expire_time = 0
def get_iam_token(self) -> Optional[str]:
"""
Получает IAM-токен. Если есть живой кэшированный — возвращает его.
Если нет — обменивает OAuth на IAM.
"""
current_time = time.time()
# Если токен есть и он "свежий" (с запасом в 5 минут)
if self._iam_token and current_time < self._token_expire_time - 300:
return self._iam_token
if not self.oauth_token:
logger.error("OAuth token not available.")
return None
logger.info("Obtaining new IAM token from Yandex...")
try:
response = requests.post(
IAM_TOKEN_URL,
json={"yandexPassportOauthToken": self.oauth_token},
timeout=10
)
response.raise_for_status()
data = response.json()
self._iam_token = data["iamToken"]
# Токен обычно живет 12 часов, но мы будем ориентироваться на поле expiresAt если нужно,
# или просто поставим таймер. Для простоты берем 1 час жизни кэша.
self._token_expire_time = current_time + 3600
logger.info("IAM token received successfully.")
return self._iam_token
except Exception as e:
logger.error(f"Failed to get IAM token: {e}")
return None
# Глобальный инстанс
yandex_auth = YandexAuthManager()

View File

@@ -0,0 +1,46 @@
import io
import logging
import re
from openpyxl import load_workbook
logger = logging.getLogger(__name__)
def extract_text_from_excel(content: bytes) -> str:
"""
Извлекает текстовое содержимое из Excel файла (.xlsx).
Проходит по всем строкам активного листа, собирает значения ячеек
и формирует текстовую строку для передачи в LLM парсер.
Args:
content: Байтовое содержимое Excel файла
Returns:
Строка с текстовым представлением содержимого Excel файла
"""
try:
# Загружаем workbook из байтов, data_only=True берет значения, а не формулы
wb = load_workbook(filename=io.BytesIO(content), data_only=True)
sheet = wb.active
lines = []
for row in sheet.iter_rows(values_only=True):
# Собираем непустые значения в строку через разделитель
row_text = " | ".join([
str(cell).strip()
for cell in row
if cell is not None and str(cell).strip() != ""
])
# Простая эвристика: строка должна содержать хотя бы одну букву (кириллица/латиница) И хотя бы одну цифру.
# Это отсеет пустые разделители и чистые заголовки.
if row_text and re.search(r'[a-zA-Zа-яА-Я]', row_text) and re.search(r'\d', row_text):
lines.append(row_text)
result = "\n".join(lines)
logger.info(f"Extracted {len(lines)} lines from Excel file")
return result
except Exception as e:
logger.error(f"Error extracting text from Excel: {e}", exc_info=True)
raise

View File

@@ -0,0 +1,237 @@
import os
import requests
from requests.exceptions import SSLError
import logging
import json
import uuid
import time
import re
from abc import ABC, abstractmethod
from typing import List, Optional
from app.schemas.models import ParsedItem
from app.services.auth import yandex_auth
logger = logging.getLogger(__name__)
YANDEX_GPT_URL = "https://llm.api.cloud.yandex.net/foundationModels/v1/completion"
GIGACHAT_OAUTH_URL = "https://ngw.devices.sberbank.ru:9443/api/v2/oauth"
GIGACHAT_COMPLETION_URL = "https://gigachat.devices.sberbank.ru/api/v1/chat/completions"
class LLMProvider(ABC):
@abstractmethod
def generate(self, system_prompt: str, user_text: str) -> str:
pass
class YandexGPTProvider(LLMProvider):
def __init__(self):
self.folder_id = os.getenv("YANDEX_FOLDER_ID")
def generate(self, system_prompt: str, user_text: str) -> str:
iam_token = yandex_auth.get_iam_token()
if not iam_token:
raise Exception("Failed to get IAM token")
prompt = {
"modelUri": f"gpt://{self.folder_id}/yandexgpt/latest",
"completionOptions": {"stream": False, "temperature": 0.1, "maxTokens": "2000"},
"messages": [
{"role": "system", "text": system_prompt},
{"role": "user", "text": user_text}
]
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {iam_token}",
"x-folder-id": self.folder_id
}
response = requests.post(YANDEX_GPT_URL, headers=headers, json=prompt, timeout=30)
response.raise_for_status()
content = response.json()['result']['alternatives'][0]['message']['text']
return content
class GigaChatProvider(LLMProvider):
def __init__(self):
self.auth_key = os.getenv("GIGACHAT_AUTH_KEY")
self._access_token = None
self._expires_at = 0
def _get_token(self) -> Optional[str]:
if self._access_token and time.time() < self._expires_at:
return self._access_token
logger.info("Obtaining GigaChat access token...")
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'RqUID': str(uuid.uuid4()),
'Authorization': f'Basic {self.auth_key}'
}
payload = {'scope': 'GIGACHAT_API_PERS'}
response = requests.post(GIGACHAT_OAUTH_URL, headers=headers, data=payload, timeout=10)
response.raise_for_status()
data = response.json()
self._access_token = data['access_token']
self._expires_at = data['expires_at'] / 1000 # Переводим мс в сек
return self._access_token
def generate(self, system_prompt: str, user_text: str) -> str:
token = self._get_token()
if not token:
raise Exception("Failed to get GigaChat token")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer {token}'
}
payload = {
"model": "GigaChat",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text}
],
"temperature": 0.1
}
try:
response = requests.post(GIGACHAT_COMPLETION_URL, headers=headers, json=payload, timeout=30)
response.raise_for_status()
content = response.json()['choices'][0]['message']['content']
return content
except SSLError as e:
logger.error("SSL Error with GigaChat. Check certificates")
raise e
class LLMManager:
def __init__(self):
engine = os.getenv("LLM_ENGINE", "yandex").lower()
self.engine = engine
if engine == "gigachat":
self.provider = GigaChatProvider()
else:
self.provider = YandexGPTProvider()
logger.info(f"LLM Engine initialized: {self.engine}")
def parse_receipt(self, raw_text: str) -> dict:
system_prompt = """
Ты — профессиональный бухгалтер. Твоя задача — извлечь из "сырого" текста (OCR или Excel) данные о товарах и реквизиты документа.
ВХОДНЫЕ ДАННЫЕ:
Текст чека, накладной или УПД. В Excel-файлах колонки разделены символом "|".
ФОРМАТ ОТВЕТА (JSON):
{
"doc_number": "Номер документа (или ФД для чеков)",
"doc_date": "Дата документа (DD.MM.YYYY)",
"items": [
{
"raw_name": "Название товара",
"amount": 1.0,
"price": 100.0, // Цена за единицу (с учетом скидок)
"sum": 100.0 // Стоимость позиции (ВСЕГО с НДС)
}
]
}
ПРАВИЛА ПОИСКА РЕКВИЗИТОВ:
1. Номер документа: Ищи "Счет-фактура №", "УПД №", "Накладная №", "ФД:", "Документ №".
- Пример: "УПД № 556430/123" -> "556430/123"
- Пример: "ФД: 12345" -> "12345"
2. Дата: Ищи рядом с номером. Преобразуй в формат DD.MM.YYYY.
ОБЩИЕ ПРАВИЛА ОБРАБОТКИ ТОВАРОВ:
1. **ЗАПРЕТ ГРУППИРОВКИ:** Если в чеке 5 раз подряд идет "Масло сливочное" (даже с разными кодами), ты должен вернуть **5 отдельных объектов**. НИКОГДА не объединяй их и не суммируй количество, если это не указано явно в одной строке (типа "5 шт x 100").
2. **ОЧИСТКА:** Игнорируй строки "ИТОГ", "СУММА", "НДС", "Всего к оплате", "Грузоотправитель", "Продавец". Числа приводи к float.
СТРАТЕГИЯ ДЛЯ EXCEL (УПД):
1. Разделитель колонок — "|".
2. Название — самая длинная текстовая ячейка.
3. Сумма — колонка "Стоимость с налогом - всего" (обычно крайняя правая сумма в строке). Если есть "без налога" и "с налогом", бери С НАЛОГОМ.
СТРАТЕГИЯ ДЛЯ ЧЕКОВ (OCR):
1. **МАРКЕР НАЧАЛА:** Часто строка товара начинается с цифрового кода (артикула). Пример: `6328 Масло...`. Если видишь число в начале строки, за которым идет текст — это НАЧАЛО нового товара.
2. **МНОГОСТРОЧНОСТЬ:** Название товара может быть разбито на 2-4 строки.
- Если видишь строку с цифрами (цена, кол-во), а перед ней текст — это конец описания товара. Склей текст с предыдущими строками.
- Пример:
`6298 Масло`
`ТРАД.сл.`
`1 шт 174.24`
-> Название: "6298 Масло ТРАД.сл."
3. **ЦЕНА:** Если указана "Цена со скидкой", бери её.
ПРИМЕР 1 (УПД):
Вход:
Код | Товар | Кол-во | Цена | Сумма без НДС | Сумма с НДС
1 | Сок ананасовый | 4 | 533.64 | 2134.56 | 2348.00
Выход:
{
"doc_number": "", "doc_date": "",
"items": [
{"raw_name": "Сок ананасовый", "amount": 4.0, "price": 533.64, "sum": 2348.00}
]
}
ПРИМЕР 2 (Сложный чек):
Вход:
5603 СЫР ПЛАВ.
45% 200Г
169.99 1шт 169.99
6328 Масло ТРАД.
Сл.82.5% 175г
204.99 30.75 174.24 1шт 174.24
6298 Масло ТРАД.
Сл.82.5%
175г
1шт 174.24
Выход:
{
"doc_number": "", "doc_date": "",
"items": [
{"raw_name": "5603 СЫР ПЛАВ. 45% 200Г", "amount": 1.0, "price": 169.99, "sum": 169.99},
{"raw_name": "6328 Масло ТРАД. Сл.82.5% 175г", "amount": 1.0, "price": 174.24, "sum": 174.24},
{"raw_name": "6298 Масло ТРАД. Сл.82.5% 175г", "amount": 1.0, "price": 174.24, "sum": 174.24}
]
}
"""
try:
logger.info(f"Parsing receipt using engine: {self.engine}")
response = self.provider.generate(system_prompt, raw_text)
# Очистка от Markdown
clean_json = response.replace("```json", "").replace("```", "").strip()
# Парсинг JSON
data = json.loads(clean_json)
# Обработка обратной совместимости: если вернулся список, оборачиваем в словарь
if isinstance(data, list):
data = {"items": data, "doc_number": "", "doc_date": ""}
# Извлекаем товары
items_data = data.get("items", [])
# Пост-обработка: исправление чисел в товарах
for item in items_data:
for key in ['amount', 'price', 'sum']:
if isinstance(item[key], str):
# Удаляем пробелы внутри чисел
item[key] = float(re.sub(r'\s+', '', item[key]).replace(',', '.'))
elif isinstance(item[key], (int, float)):
item[key] = float(item[key])
# Формируем результат
result = {
"items": [ParsedItem(**item) for item in items_data],
"doc_number": data.get("doc_number", ""),
"doc_date": data.get("doc_date", "")
}
return result
except Exception as e:
logger.error(f"LLM Parsing error: {e}")
return {"items": [], "doc_number": "", "doc_date": ""}
llm_parser = LLMManager()

View File

@@ -0,0 +1,114 @@
from abc import ABC, abstractmethod
from typing import Optional
import os
import json
import base64
import logging
import requests
from app.services.auth import yandex_auth
logger = logging.getLogger(__name__)
VISION_URL = "https://ocr.api.cloud.yandex.net/ocr/v1/recognizeText"
class OCREngine(ABC):
@abstractmethod
def recognize(self, image_bytes: bytes) -> str:
"""Распознает текст из изображения и возвращает строку."""
pass
@abstractmethod
def is_configured(self) -> bool:
"""Проверяет, настроен ли движок (наличие ключей/настроек)."""
pass
class YandexOCREngine(OCREngine):
def __init__(self):
self.folder_id = os.getenv("YANDEX_FOLDER_ID")
if not yandex_auth.is_configured() or not self.folder_id:
logger.warning("Yandex OCR credentials (YANDEX_OAUTH_TOKEN, YANDEX_FOLDER_ID) not set. Yandex OCR will be unavailable.")
def is_configured(self) -> bool:
return yandex_auth.is_configured() and bool(self.folder_id)
def recognize(self, image_bytes: bytes) -> str:
"""
Отправляет изображение в Yandex Vision и возвращает полный текст.
"""
if not self.is_configured():
logger.error("Yandex credentials missing.")
return ""
iam_token = yandex_auth.get_iam_token()
if not iam_token:
return ""
# 1. Кодируем в Base64
b64_image = base64.b64encode(image_bytes).decode("utf-8")
# 2. Формируем тело запроса
# Используем модель 'page' (для документов) и '*' для автоопределения языка
payload = {
"mimeType": "JPEG", # Yandex переваривает и PNG под видом JPEG часто, но лучше быть аккуратным.
# В идеале определять mime-type из файла, но JPEG - безопасный дефолт для фото.
"languageCodes": ["*"],
"model": "page",
"content": b64_image
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {iam_token}",
"x-folder-id": self.folder_id,
"x-data-logging-enabled": "true"
}
# 3. Отправляем запрос
try:
logger.info("Sending request to Yandex Vision OCR...")
response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)
# Если 401 Unauthorized, возможно токен протух раньше времени (редко, но бывает)
if response.status_code == 401:
logger.warning("Got 401 from Yandex. Retrying with fresh token...")
yandex_auth.reset_token() # сброс кэша
iam_token = yandex_auth.get_iam_token()
if iam_token:
headers["Authorization"] = f"Bearer {iam_token}"
response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)
response.raise_for_status()
result_json = response.json()
# 4. Парсим ответ
# Структура: result -> textAnnotation -> fullText
# Или (если fullText нет) blocks -> lines -> text
text_annotation = result_json.get("result", {}).get("textAnnotation", {})
if not text_annotation:
logger.warning("Yandex returned success but no textAnnotation found.")
return ""
# Самый простой способ - взять fullText, он обычно склеен с \n
full_text = text_annotation.get("fullText", "")
if not full_text:
# Фолбэк: если fullText пуст, собираем вручную по блокам
logger.info("fullText empty, assembling from blocks...")
lines_text = []
for block in text_annotation.get("blocks", []):
for line in block.get("lines", []):
lines_text.append(line.get("text", ""))
full_text = "\n".join(lines_text)
return full_text
except Exception as e:
logger.error(f"Error during Yandex Vision request: {e}")
return ""
# Глобальный инстанс
yandex_engine = YandexOCREngine()

View File

@@ -0,0 +1,165 @@
import logging
import requests
import re
from typing import Optional, List
from pyzbar.pyzbar import decode
from PIL import Image
import numpy as np
from app.schemas.models import ParsedItem
API_TOKEN = "36590.yqtiephCvvkYUKM2W"
API_URL = "https://proverkacheka.com/api/v1/check/get"
logger = logging.getLogger(__name__)
def extract_fiscal_data(text: str) -> Optional[str]:
"""
Ищет в тексте:
- Дата и время (t): 19.12.25 12:16 -> 20251219T1216
- Сумма (s): ИТОГ: 770.00
- ФН (fn): 16 цифр, начинается на 73
- ФД (i): до 8 цифр (ищем после Д: или ФД:)
- ФП (fp): 10 цифр (ищем после П: или ФП:)
Возвращает строку формата: t=20251219T1216&s=770.00&fn=7384440800514469&i=11194&fp=3334166168&n=1
"""
# 1. Поиск даты и времени
# Ищем форматы DD.MM.YY HH:MM или DD.MM.YYYY HH:MM
date_match = re.search(r'(\d{2})\.(\d{2})\.(\d{2,4})\s+(\d{2}):(\d{2})', text)
t_param = ""
if date_match:
d, m, y, hh, mm = date_match.groups()
if len(y) == 2: y = "20" + y
t_param = f"{y}{m}{d}T{hh}{mm}"
# 2. Поиск суммы (Итог)
# Ищем слово ИТОГ и число после него
sum_match = re.search(r'(?:ИТОГ|СУММА|СУММА:)\s*[:]*\s*(\d+[.,]\d{2})', text, re.IGNORECASE)
s_param = ""
if sum_match:
s_param = sum_match.group(1).replace(',', '.')
# 3. Поиск ФН (16 цифр, начинается с 73)
fn_match = re.search(r'\b(73\d{14})\b', text)
fn_param = fn_match.group(1) if fn_match else ""
# 4. Поиск ФД (i) - ищем после маркеров Д: или ФД:
# Берем набор цифр до 8 знаков
fd_match = re.search(r'(?:ФД|Д)[:\s]+(\d{1,8})\b', text)
i_param = fd_match.group(1) if fd_match else ""
# 5. Поиск ФП (fp) - ищем после маркеров П: или ФП:
# Строго 10 цифр
fp_match = re.search(r'(?:ФП|П)[:\s]+(\d{10})\b', text)
fp_param = fp_match.group(1) if fp_match else ""
# Валидация: для формирования запроса к API нам критически важны все параметры
if all([t_param, s_param, fn_param, i_param, fp_param]):
return f"t={t_param}&s={s_param}&fn={fn_param}&i={i_param}&fp={fp_param}&n=1"
return None
def is_valid_fiscal_qr(qr_string: str) -> bool:
"""
Проверяет, соответствует ли строка формату фискального чека ФНС.
Ожидаемый формат: t=...&s=...&fn=...&i=...&fp=...&n=...
Мы проверяем наличие хотя бы 3-х ключевых параметров.
"""
if not qr_string:
return False
# Ключевые параметры, которые обязаны быть в строке чека
required_keys = ["t=", "s=", "fn="]
# Проверяем, что все ключевые параметры присутствуют
# (порядок может отличаться, поэтому проверяем вхождение каждого)
matches = [key in qr_string for key in required_keys]
return all(matches)
def detect_and_decode_qr(image: np.ndarray) -> Optional[str]:
"""
Ищет ВСЕ QR-коды на изображении и возвращает только тот,
который похож на фискальный чек.
"""
try:
pil_img = Image.fromarray(image)
# Декодируем все коды на картинке
decoded_objects = decode(pil_img)
if not decoded_objects:
logger.info("No QR codes detected on the image.")
return None
logger.info(f"Detected {len(decoded_objects)} code(s). Scanning for fiscal data...")
for obj in decoded_objects:
if obj.type == 'QRCODE':
qr_data = obj.data.decode("utf-8")
# Логируем найденное (для отладки, если вдруг формат хитрый)
# Обрезаем длинные строки, чтобы не засорять лог
log_preview = (qr_data[:75] + '..') if len(qr_data) > 75 else qr_data
logger.info(f"Checking QR content: {log_preview}")
if is_valid_fiscal_qr(qr_data):
logger.info("Valid fiscal QR found!")
return qr_data
else:
logger.info("QR skipped (not a fiscal receipt pattern).")
logger.warning("QR codes were found, but none matched the fiscal receipt format.")
return None
except Exception as e:
logger.error(f"Error during QR detection: {e}")
return None
def fetch_data_from_api(qr_raw: str) -> List[ParsedItem]:
"""
Отправляет данные QR-кода в API proverkacheka.com и парсит JSON-ответ.
"""
try:
payload = {
'qrraw': qr_raw,
'token': API_TOKEN
}
logger.info("Sending request to Check API...")
response = requests.post(API_URL, data=payload, timeout=10)
if response.status_code != 200:
logger.error(f"API Error: Status {response.status_code}")
return []
data = response.json()
if data.get('code') != 1:
logger.warning(f"API returned non-success code: {data.get('code')}")
return []
json_data = data.get('data', {}).get('json', {})
items_data = json_data.get('items', [])
parsed_items = []
for item in items_data:
price = float(item.get('price', 0)) / 100.0
total_sum = float(item.get('sum', 0)) / 100.0
quantity = float(item.get('quantity', 0))
name = item.get('name', 'Unknown')
parsed_items.append(ParsedItem(
raw_name=name,
amount=quantity,
price=price,
sum=total_sum
))
return parsed_items
except Exception as e:
logger.error(f"Error fetching/parsing API data: {e}")
return []