diff --git a/config.yaml b/config.yaml index 31acbd2..9ac3362 100644 --- a/config.yaml +++ b/config.yaml @@ -4,6 +4,8 @@ app: drop_tables: false storage_path: "./uploads" public_url: "https://rmser.serty.top" + maintenance_mode: true + dev_ids: [665599275] # Укажите здесь ваш ID и ID тестировщиков db: dsn: "host=postgres user=rmser password=mhrcadmin994525 dbname=rmser_db port=5432 sslmode=disable TimeZone=Europe/Moscow" diff --git a/config/config.go b/config/config.go index 9267d5b..462e923 100644 --- a/config/config.go +++ b/config/config.go @@ -24,6 +24,9 @@ type AppConfig struct { DropTables bool `mapstructure:"drop_tables"` StoragePath string `mapstructure:"storage_path"` PublicURL string `mapstructure:"public_url"` + + MaintenanceMode bool `mapstructure:"maintenance_mode"` + DevIDs []int64 `mapstructure:"dev_ids"` // Whitelist для режима разработки } type DBConfig struct { diff --git a/ocr-service/python_project_dump.py b/ocr-service/python_project_dump.py index f5bad8a..697c281 100644 --- a/ocr-service/python_project_dump.py +++ b/ocr-service/python_project_dump.py @@ -13,17 +13,16 @@ ocr-service/ ├── parser.py ├── qr_manager.py ├── requirements.txt -├── system-prompt.md └── yandex_ocr.py ''' project_files = { "imgproc.py": "import cv2\nimport numpy as np\nimport logging\n\nlogger = logging.getLogger(__name__)\n\ndef order_points(pts):\n rect = np.zeros((4, 2), dtype=\"float32\")\n s = pts.sum(axis=1)\n rect[0] = pts[np.argmin(s)]\n rect[2] = pts[np.argmax(s)]\n diff = np.diff(pts, axis=1)\n rect[1] = pts[np.argmin(diff)]\n rect[3] = pts[np.argmax(diff)]\n return rect\n\ndef four_point_transform(image, pts):\n rect = order_points(pts)\n (tl, tr, br, bl) = rect\n\n widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))\n widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))\n maxWidth = max(int(widthA), int(widthB))\n\n heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))\n heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))\n maxHeight = max(int(heightA), int(heightB))\n\n dst = np.array([\n [0, 0],\n [maxWidth - 1, 0],\n [maxWidth - 1, maxHeight - 1],\n [0, maxHeight - 1]], dtype=\"float32\")\n\n M = cv2.getPerspectiveTransform(rect, dst)\n warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))\n return warped\n\ndef preprocess_image(image_bytes: bytes) -> np.ndarray:\n \"\"\"\n Возвращает БИНАРНОЕ (Ч/Б) изображение для Tesseract.\n \"\"\"\n nparr = np.frombuffer(image_bytes, np.uint8)\n image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n \n if image is None:\n raise ValueError(\"Could not decode image\")\n\n # Ресайз для поиска контуров\n ratio = image.shape[0] / 500.0\n orig = image.copy()\n image_small = cv2.resize(image, (int(image.shape[1] / ratio), 500))\n\n gray = cv2.cvtColor(image_small, cv2.COLOR_BGR2GRAY)\n gray = cv2.GaussianBlur(gray, (5, 5), 0)\n edged = cv2.Canny(gray, 75, 200)\n\n cnts, _ = cv2.findContours(edged.copy(), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)\n cnts = sorted(cnts, key=cv2.contourArea, reverse=True)[:5]\n\n screenCnt = None\n found = False\n\n for c in cnts:\n peri = cv2.arcLength(c, True)\n approx = cv2.approxPolyDP(c, 0.02 * peri, True)\n if len(approx) == 4:\n screenCnt = approx\n found = True\n break\n\n # Изображение, с которым будем работать дальше\n target_img = None\n\n if found:\n logger.info(\"Receipt contour found (Tesseract mode).\")\n target_img = four_point_transform(orig, screenCnt.reshape(4, 2) * ratio)\n else:\n logger.warning(\"Receipt contour NOT found. Using full image.\")\n target_img = orig\n\n # --- Подготовка для Tesseract (Бинаризация) ---\n # Переводим в Gray\n gray_final = cv2.cvtColor(target_img, cv2.COLOR_BGR2GRAY)\n \n # Адаптивный порог (превращаем в чисто черное и белое)\n # block_size=11, C=2 - классические параметры для текста\n thresh = cv2.adaptiveThreshold(\n gray_final, 255, \n cv2.ADAPTIVE_THRESH_GAUSSIAN_C, \n cv2.THRESH_BINARY, 11, 2\n )\n \n # Немного убираем шум\n # thresh = cv2.medianBlur(thresh, 3) \n \n return thresh", - "llm_parser.py": "import os\nimport requests\nimport logging\nimport json\nfrom typing import List\nfrom parser import ParsedItem\n\nlogger = logging.getLogger(__name__)\n\nYANDEX_GPT_URL = \"https://llm.api.cloud.yandex.net/foundationModels/v1/completion\"\n\nclass YandexGPTParser:\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n self.api_key = os.getenv(\"YANDEX_OAUTH_TOKEN\") # Используем тот же доступ\n\n def parse_with_llm(self, raw_text: str, iam_token: str) -> List[ParsedItem]:\n \"\"\"\n Отправляет текст в YandexGPT для структурирования.\n \"\"\"\n if not iam_token:\n return []\n\n prompt = {\n \"modelUri\": f\"gpt://{self.folder_id}/yandexgpt/latest\",\n \"completionOptions\": {\n \"stream\": False,\n \"temperature\": 0.1, # Низкая температура для точности\n \"maxTokens\": \"2000\"\n },\n \"messages\": [\n {\n \"role\": \"system\",\n \"text\": (\n \"Ты — помощник по бухгалтерии. Извлеки список товаров из текста документа. \"\n \"Верни ответ строго в формате JSON: \"\n '[{\"raw_name\": string, \"amount\": float, \"price\": float, \"sum\": float}]. '\n \"Если количество не указано, считай 1.0. Не пиши ничего, кроме JSON.\"\n )\n },\n {\n \"role\": \"user\",\n \"text\": raw_text\n }\n ]\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id\n }\n\n try:\n response = requests.post(YANDEX_GPT_URL, headers=headers, json=prompt, timeout=30)\n response.raise_for_status()\n result = response.json()\n \n # Извлекаем текст ответа\n content = result['result']['alternatives'][0]['message']['text']\n \n # Очищаем от возможных markdown-оберток ```json ... ```\n clean_json = content.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n \n items_raw = json.loads(clean_json)\n \n parsed_items = [ParsedItem(**item) for item in items_raw]\n return parsed_items\n \n except Exception as e:\n logger.error(f\"LLM Parsing error: {e}\")\n return []\n\nllm_parser = YandexGPTParser()", - "main.py": "import logging\nimport os\nfrom typing import List\n\nfrom fastapi import FastAPI, File, UploadFile, HTTPException\nfrom pydantic import BaseModel\nimport cv2\nimport numpy as np\n\n# Импортируем модули\nfrom imgproc import preprocess_image\nfrom parser import parse_receipt_text, ParsedItem\nfrom ocr import ocr_engine\nfrom qr_manager import detect_and_decode_qr, fetch_data_from_api\n# Импортируем новый модуль\nfrom yandex_ocr import yandex_engine\nfrom llm_parser import llm_parser\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\napp = FastAPI(title=\"RMSER OCR Service (Hybrid: QR + Yandex + Tesseract)\")\n\nclass RecognitionResult(BaseModel):\n source: str # 'qr_api', 'yandex_vision', 'tesseract_ocr'\n items: List[ParsedItem]\n raw_text: str = \"\"\n\n@app.get(\"/health\")\ndef health_check():\n return {\"status\": \"ok\"}\n\n@app.post(\"/recognize\", response_model=RecognitionResult)\nasync def recognize_receipt(image: UploadFile = File(...)):\n \"\"\"\n Стратегия:\n 1. QR Code + FNS API (Приоритет 1 - Идеальная точность)\n 2. Yandex Vision OCR (Приоритет 2 - Высокая точность, если настроен)\n 3. Tesseract OCR (Приоритет 3 - Локальный фолбэк)\n \"\"\"\n logger.info(f\"Received file: {image.filename}, content_type: {image.content_type}\")\n\n if not image.content_type.startswith(\"image/\"):\n raise HTTPException(status_code=400, detail=\"File must be an image\")\n\n try:\n # Читаем сырые байты\n content = await image.read()\n \n # Конвертируем в numpy для QR и локального препроцессинга\n nparr = np.frombuffer(content, np.uint8)\n original_cv_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n\n if original_cv_image is None:\n raise HTTPException(status_code=400, detail=\"Invalid image data\")\n\n # --- ЭТАП 1: QR Code Strategy ---\n logger.info(\"--- Stage 1: QR Code Detection ---\")\n qr_raw = detect_and_decode_qr(original_cv_image)\n \n if qr_raw:\n logger.info(\"QR found! Fetching data from API...\")\n api_items = fetch_data_from_api(qr_raw)\n \n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via QR API.\")\n return RecognitionResult(\n source=\"qr_api\",\n items=api_items,\n raw_text=f\"QR Content: {qr_raw}\"\n )\n else:\n logger.warning(\"QR found but API failed. Falling back to OCR.\")\n else:\n logger.info(\"QR code not found. Proceeding to OCR.\")\n\n # --- ЭТАП 2: Yandex Vision Strategy (Cloud OCR) ---\n # Проверяем, настроен ли Яндекс\n if yandex_engine.oauth_token and yandex_engine.folder_id:\n logger.info(\"--- Stage 2: Yandex Vision OCR ---\")\n \n # Яндекс принимает сырые байты картинки (Base64), ему не нужен наш препроцессинг\n yandex_text = yandex_engine.recognize(content)\n \n if yandex_text and len(yandex_text) > 10:\n logger.info(f\"Yandex OCR success. Text length: {len(yandex_text)}\")\n logger.info(f\"Yandex RAW OUTPUT:\\n{yandex_text}\") \n yandex_items = parse_receipt_text(yandex_text)\n logger.info(f\"Parsed items preview: {yandex_items[:3]}...\") \n # Если Regex не нашел позиций (как в нашем случае со счетом)\n if not yandex_items:\n logger.info(\"Regex found nothing. Calling YandexGPT for semantic parsing...\")\n iam_token = yandex_engine._get_iam_token()\n yandex_items = llm_parser.parse_with_llm(yandex_text, iam_token)\n logger.info(f\"Semantic parsed items preview: {yandex_items[:3]}...\")\n \n return RecognitionResult(\n source=\"yandex_vision\",\n items=yandex_items,\n raw_text=yandex_text\n )\n else:\n logger.warning(\"Yandex Vision returned empty text or failed. Falling back to Tesseract.\")\n else:\n logger.info(\"Yandex Vision credentials not set. Skipping Stage 2.\")\n\n # --- ЭТАП 3: Tesseract Strategy (Local Fallback) ---\n logger.info(\"--- Stage 3: Tesseract OCR (Local) ---\")\n \n # 1. Image Processing (бинаризация, выравнивание)\n processed_img = preprocess_image(content)\n \n # 2. OCR\n tesseract_text = ocr_engine.recognize(processed_img)\n \n # 3. Parsing\n ocr_items = parse_receipt_text(tesseract_text)\n \n return RecognitionResult(\n source=\"tesseract_ocr\",\n items=ocr_items,\n raw_text=tesseract_text\n )\n\n except Exception as e:\n logger.error(f\"Error processing request: {e}\", exc_info=True)\n raise HTTPException(status_code=500, detail=str(e))\n\nif __name__ == \"__main__\":\n import uvicorn\n uvicorn.run(app, host=\"0.0.0.0\", port=5000)", + "llm_parser.py": "import os\nimport requests\nimport logging\nimport json\nimport uuid\nimport time\nfrom typing import List, Optional\nfrom parser import ParsedItem\n\nlogger = logging.getLogger(__name__)\n\nYANDEX_GPT_URL = \"https://llm.api.cloud.yandex.net/foundationModels/v1/completion\"\nGIGACHAT_OAUTH_URL = \"https://ngw.devices.sberbank.ru:9443/api/v2/oauth\"\nGIGACHAT_COMPLETION_URL = \"https://gigachat.devices.sberbank.ru/api/v1/chat/completions\"\n\nclass YandexGPTParser:\n def __init__(self):\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n self.api_key = os.getenv(\"YANDEX_OAUTH_TOKEN\")\n\n def parse(self, raw_text: str, iam_token: str) -> List[ParsedItem]:\n if not iam_token:\n return []\n \n prompt = {\n \"modelUri\": f\"gpt://{self.folder_id}/yandexgpt/latest\",\n \"completionOptions\": {\"stream\": False, \"temperature\": 0.1, \"maxTokens\": \"2000\"},\n \"messages\": [\n {\n \"role\": \"system\",\n \"text\": (\n \"Ты — помощник по бухгалтерии. Извлеки список товаров из текста документа. \"\n \"Верни ответ строго в формате JSON: \"\n '[{\"raw_name\": string, \"amount\": float, \"price\": float, \"sum\": float}]. '\n \"Если количество не указано, считай 1.0. Не пиши ничего, кроме JSON.\"\n )\n },\n {\"role\": \"user\", \"text\": raw_text}\n ]\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id\n }\n\n try:\n response = requests.post(YANDEX_GPT_URL, headers=headers, json=prompt, timeout=30)\n response.raise_for_status()\n content = response.json()['result']['alternatives'][0]['message']['text']\n clean_json = content.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n return [ParsedItem(**item) for item in json.loads(clean_json)]\n except Exception as e:\n logger.error(f\"YandexGPT Parsing error: {e}\")\n return []\n\nclass GigaChatParser:\n def __init__(self):\n self.auth_key = os.getenv(\"GIGACHAT_AUTH_KEY\")\n self._access_token = None\n self._expires_at = 0\n\n def _get_token(self) -> Optional[str]:\n if self._access_token and time.time() < self._expires_at:\n return self._access_token\n\n logger.info(\"Obtaining GigaChat access token...\")\n headers = {\n 'Content-Type': 'application/x-www-form-urlencoded',\n 'Accept': 'application/json',\n 'RqUID': str(uuid.uuid4()),\n 'Authorization': f'Basic {self.auth_key}'\n }\n payload = {'scope': 'GIGACHAT_API_PERS'}\n\n try:\n # verify=False может понадобиться, если сертификаты Минцифры не в системном хранилище, \n # но вы указали, что установите их в контейнер.\n response = requests.post(GIGACHAT_OAUTH_URL, headers=headers, data=payload, timeout=10)\n response.raise_for_status()\n data = response.json()\n self._access_token = data['access_token']\n self._expires_at = data['expires_at'] / 1000 # Переводим мс в сек\n return self._access_token\n except Exception as e:\n logger.error(f\"GigaChat Auth error: {e}\")\n return None\n\n def parse(self, raw_text: str) -> List[ParsedItem]:\n token = self._get_token()\n if not token:\n return []\n\n headers = {\n 'Content-Type': 'application/json',\n 'Accept': 'application/json',\n 'Authorization': f'Bearer {token}'\n }\n\n payload = {\n \"model\": \"GigaChat\",\n \"messages\": [\n {\n \"role\": \"system\",\n \"content\": (\n \"Ты — эксперт по распознаванию чеков. Извлеки товары из текста. \"\n \"Верни ТОЛЬКО JSON массив объектов с полями: raw_name (строка), \"\n \"amount (число), price (число), sum (число). \"\n \"Если данных нет, верни []. Никаких пояснений.\"\n )\n },\n {\"role\": \"user\", \"content\": raw_text}\n ],\n \"temperature\": 0.1\n }\n\n try:\n response = requests.post(GIGACHAT_COMPLETION_URL, headers=headers, json=payload, timeout=30)\n response.raise_for_status()\n content = response.json()['choices'][0]['message']['content']\n clean_json = content.replace(\"```json\", \"\").replace(\"```\", \"\").strip()\n return [ParsedItem(**item) for item in json.loads(clean_json)]\n except Exception as e:\n logger.error(f\"GigaChat Parsing error: {e}\")\n return []\n\nclass LLMManager:\n def __init__(self):\n self.yandex = YandexGPTParser()\n self.giga = GigaChatParser()\n self.engine = os.getenv(\"LLM_ENGINE\", \"yandex\").lower()\n\n def parse_with_priority(self, raw_text: str, yandex_iam_token: Optional[str] = None) -> List[ParsedItem]:\n if self.engine == \"gigachat\":\n logger.info(\"Using GigaChat as primary LLM\")\n items = self.giga.parse(raw_text)\n if not items and yandex_iam_token:\n logger.info(\"GigaChat failed, falling back to YandexGPT\")\n items = self.yandex.parse(raw_text, yandex_iam_token)\n return items\n else:\n logger.info(\"Using YandexGPT as primary LLM\")\n items = self.yandex.parse(raw_text, yandex_iam_token) if yandex_iam_token else []\n if not items:\n logger.info(\"YandexGPT failed, falling back to GigaChat\")\n items = self.giga.parse(raw_text)\n return items\n\nllm_parser = LLMManager()", + "main.py": "import logging\nimport os\nfrom typing import List\n\nfrom fastapi import FastAPI, File, UploadFile, HTTPException\nfrom pydantic import BaseModel\nimport cv2\nimport numpy as np\n\n# Импортируем модули\nfrom imgproc import preprocess_image\nfrom parser import parse_receipt_text, ParsedItem, extract_fiscal_data\nfrom ocr import ocr_engine\nfrom qr_manager import detect_and_decode_qr, fetch_data_from_api\n# Импортируем новый модуль\nfrom yandex_ocr import yandex_engine\nfrom llm_parser import llm_parser\n\nlogging.basicConfig(\n level=logging.INFO,\n format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n)\nlogger = logging.getLogger(__name__)\n\napp = FastAPI(title=\"RMSER OCR Service (Hybrid: QR + Yandex + Tesseract)\")\n\nclass RecognitionResult(BaseModel):\n source: str # 'qr_api', 'yandex_vision', 'tesseract_ocr'\n items: List[ParsedItem]\n raw_text: str = \"\"\n\n@app.get(\"/health\")\ndef health_check():\n return {\"status\": \"ok\"}\n\n@app.post(\"/recognize\", response_model=RecognitionResult)\nasync def recognize_receipt(image: UploadFile = File(...)):\n \"\"\"\n Стратегия:\n 1. QR Code + FNS API (Приоритет 1 - Идеальная точность)\n 2. Yandex Vision OCR (Приоритет 2 - Высокая точность, если настроен)\n 3. Tesseract OCR (Приоритет 3 - Локальный фолбэк)\n \"\"\"\n logger.info(f\"Received file: {image.filename}, content_type: {image.content_type}\")\n\n if not image.content_type.startswith(\"image/\"):\n raise HTTPException(status_code=400, detail=\"File must be an image\")\n\n try:\n # Читаем сырые байты\n content = await image.read()\n \n # Конвертируем в numpy для QR и локального препроцессинга\n nparr = np.frombuffer(content, np.uint8)\n original_cv_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)\n\n if original_cv_image is None:\n raise HTTPException(status_code=400, detail=\"Invalid image data\")\n\n # --- ЭТАП 1: QR Code Strategy ---\n logger.info(\"--- Stage 1: QR Code Detection ---\")\n qr_raw = detect_and_decode_qr(original_cv_image)\n \n if qr_raw:\n logger.info(\"QR found! Fetching data from API...\")\n api_items = fetch_data_from_api(qr_raw)\n \n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via QR API.\")\n return RecognitionResult(\n source=\"qr_api\",\n items=api_items,\n raw_text=f\"QR Content: {qr_raw}\"\n )\n else:\n logger.warning(\"QR found but API failed. Falling back to OCR.\")\n else:\n logger.info(\"QR code not found. Proceeding to OCR.\")\n\n # --- ЭТАП 2: OCR + Virtual QR Strategy ---\n if yandex_engine.oauth_token and yandex_engine.folder_id:\n logger.info(\"--- Stage 2: Yandex Vision OCR + Virtual QR ---\")\n yandex_text = yandex_engine.recognize(content)\n \n if yandex_text and len(yandex_text) > 10:\n logger.info(f\"OCR success. Raw text length: {len(yandex_text)}\")\n \n # Попытка собрать виртуальный QR из текста\n virtual_qr = extract_fiscal_data(yandex_text)\n if virtual_qr:\n logger.info(f\"Virtual QR constructed: {virtual_qr}\")\n api_items = fetch_data_from_api(virtual_qr)\n if api_items:\n logger.info(f\"Success: Retrieved {len(api_items)} items via Virtual QR API.\")\n return RecognitionResult(\n source=\"virtual_qr_api\",\n items=api_items,\n raw_text=yandex_text\n )\n \n # Если виртуальный QR не сработал, пробуем Regex\n yandex_items = parse_receipt_text(yandex_text)\n \n # Если Regex пуст — вызываем LLM (GigaChat / YandexGPT)\n if not yandex_items:\n logger.info(\"Regex found nothing. Calling LLM Manager...\")\n iam_token = yandex_engine._get_iam_token()\n yandex_items = llm_parser.parse_with_priority(yandex_text, iam_token)\n \n return RecognitionResult(\n source=\"yandex_vision_llm\",\n items=yandex_items,\n raw_text=yandex_text\n )\n else:\n logger.warning(\"Yandex Vision returned empty text or failed. Falling back to Tesseract.\")\n else:\n logger.info(\"Yandex Vision credentials not set. Skipping Stage 2.\")\n\n # --- ЭТАП 3: Tesseract Strategy (Local Fallback) ---\n logger.info(\"--- Stage 3: Tesseract OCR (Local) ---\")\n \n # 1. Image Processing (бинаризация, выравнивание)\n processed_img = preprocess_image(content)\n \n # 2. OCR\n tesseract_text = ocr_engine.recognize(processed_img)\n \n # 3. Parsing\n ocr_items = parse_receipt_text(tesseract_text)\n \n return RecognitionResult(\n source=\"tesseract_ocr\",\n items=ocr_items,\n raw_text=tesseract_text\n )\n\n except Exception as e:\n logger.error(f\"Error processing request: {e}\", exc_info=True)\n raise HTTPException(status_code=500, detail=str(e))\n\nif __name__ == \"__main__\":\n import uvicorn\n uvicorn.run(app, host=\"0.0.0.0\", port=5000)", "ocr.py": "import logging\nimport pytesseract\nfrom PIL import Image\nimport numpy as np\n\nlogger = logging.getLogger(__name__)\n\n# Если tesseract не в PATH, раскомментируй и укажи путь:\n# pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'\n\nclass OCREngine:\n def __init__(self):\n logger.info(\"Initializing Tesseract OCR wrapper...\")\n # Tesseract не требует загрузки моделей в память, \n # проверка версии просто чтобы убедиться, что он установлен\n try:\n version = pytesseract.get_tesseract_version()\n logger.info(f\"Tesseract version found: {version}\")\n except Exception as e:\n logger.error(\"Tesseract not found! Make sure it is installed (apt install tesseract-ocr).\")\n raise e\n\n def recognize(self, image: np.ndarray) -> str:\n \"\"\"\n Принимает бинарное изображение (numpy array).\n \"\"\"\n # Tesseract работает лучше с PIL Image\n pil_img = Image.fromarray(image)\n \n # Конфигурация:\n # -l rus+eng: русский и английский\n # --psm 6: Assume a single uniform block of text (хорошо для чеков)\n custom_config = r'--oem 3 --psm 6'\n \n text = pytesseract.image_to_string(pil_img, lang='rus+eng', config=custom_config)\n return text\n\nocr_engine = OCREngine()", - "parser.py": "import re\nfrom typing import List\nfrom pydantic import BaseModel\n\nclass ParsedItem(BaseModel):\n raw_name: str\n amount: float\n price: float\n sum: float\n\n# Регулярка для поиска чисел с плавающей точкой: 123.00, 123,00, 10.5\nFLOAT_RE = r'\\d+[.,]\\d{2}'\n\ndef clean_text(text: str) -> str:\n \"\"\"Удаляет лишние символы из названия товара.\"\"\"\n # Оставляем буквы, цифры, пробелы и базовые знаки\n return re.sub(r'[^\\w\\s.,%/-]', '', text).strip()\n\ndef parse_float(val: str) -> float:\n \"\"\"Преобразует строку '123,45' или '123.45' в float.\"\"\"\n if not val:\n return 0.0\n # Заменяем запятую на точку и убираем возможные пробелы\n return float(val.replace(',', '.').replace(' ', ''))\n\ndef parse_receipt_text(text: str) -> List[ParsedItem]:\n \"\"\"\n Парсит текст чека построчно.\n Логика: накапливаем строки названия, пока не встретим строку с математикой (цена/сумма).\n \"\"\"\n lines = text.split('\\n')\n items = []\n name_buffer = []\n\n for line in lines:\n line = line.strip()\n if not line:\n continue\n\n # Ищем все числа, похожие на цену (с двумя знаками после запятой/точки)\n # Пример: 129.99\n floats = re.findall(FLOAT_RE, line)\n \n # Эвристика: строка считается \"товарной позицией с ценой\", если в ней есть минимум 2 числа\n # (обычно Цена и Сумма, или Кол-во и Сумма)\n # ИЛИ одно число, если это итоговая сумма, но мы ищем товары.\n \n is_price_line = False\n \n if len(floats) >= 2:\n is_price_line = True\n \n vals = [parse_float(f) for f in floats]\n \n # Попытка определить структуру: Цена x Кол-во = Сумма\n price = 0.0\n amount = 1.0\n total = 0.0\n \n # Обычно последнее число - это сумма (итог по строке)\n total = vals[-1]\n \n if len(vals) == 2:\n # Скорее всего: Цена ... Сумма (кол-во = 1)\n # Или: Кол-во ... Сумма (если цена не распозналась)\n # Предположим amount=1, тогда первое число - цена\n price = vals[0]\n amount = 1.0\n \n # Проверка на адекватность: сумма обычно >= цены\n # Если total < price, возможно порядок перепутан или это скидка\n if total < price and total != 0:\n # Если total сильно меньше, возможно это не сумма\n pass \n elif total > price and price > 0:\n # Пытаемся вычислить кол-во\n calc_amount = total / price\n # Если результат близок к целому (например 1.999 -> 2), то ок\n if abs(round(calc_amount) - calc_amount) < 0.05:\n amount = float(round(calc_amount))\n\n elif len(vals) >= 3:\n # Варианты: [Цена, Кол-во, Сумма] или [Кол-во, Цена, Сумма]\n # Проверяем математику: A * B = C\n v1, v2 = vals[-3], vals[-2]\n \n if abs(v1 * v2 - total) < 0.5: # Допуск 0.5 руб\n price = v1\n amount = v2\n elif abs(v2 * v1 - total) < 0.5:\n price = v2\n amount = v1\n else:\n # Если математика не сходится, берем предпоследнее как цену\n price = vals[-2]\n amount = 1.0\n\n # Сборка названия\n full_name = \" \".join(name_buffer).strip()\n \n # Если буфер пуст, возможно название в этой же строке слева\n if not full_name:\n # Удаляем найденные числа из строки, остаток считаем названием\n text_without_floats = re.sub(FLOAT_RE, '', line)\n full_name = clean_text(text_without_floats)\n\n # Фильтрация мусора (слишком короткие названия или нулевые суммы)\n if len(full_name) > 2 and total > 0:\n items.append(ParsedItem(\n raw_name=full_name,\n amount=amount,\n price=price,\n sum=total\n ))\n \n # Очищаем буфер, так как позиция закрыта\n name_buffer = []\n \n else:\n # Строка не похожа на цену.\n # Проверяем на стоп-слова (конец чека)\n upper_line = line.upper()\n if \"ИТОГ\" in upper_line or \"СУММА\" in upper_line or \"ПРИХОД\" in upper_line:\n # Считаем, что товары закончились\n name_buffer = [] \n # Можно здесь сделать break, если уверены, что ниже товаров нет\n continue\n \n # Добавляем в буфер названия\n name_buffer.append(line)\n\n return items", + "parser.py": "import re\nfrom typing import List, Optional\nfrom pydantic import BaseModel\nfrom datetime import datetime\n\nclass ParsedItem(BaseModel):\n raw_name: str\n amount: float\n price: float\n sum: float\n\n# Регулярка для поиска чисел с плавающей точкой: 123.00, 123,00, 10.5\nFLOAT_RE = r'\\d+[.,]\\d{2}'\n\ndef clean_text(text: str) -> str:\n \"\"\"Удаляет лишние символы из названия товара.\"\"\"\n return re.sub(r'[^\\w\\s.,%/-]', '', text).strip()\n\ndef parse_float(val: str) -> float:\n \"\"\"Преобразует строку '123,45' или '123.45' в float.\"\"\"\n if not val:\n return 0.0\n return float(val.replace(',', '.').replace(' ', ''))\n\ndef extract_fiscal_data(text: str) -> Optional[str]:\n \"\"\"\n Ищет в тексте:\n - Дата и время (t): 19.12.25 12:16 -> 20251219T1216\n - Сумма (s): ИТОГ: 770.00\n - ФН (fn): 16 цифр, начинается на 73\n - ФД (i): до 8 цифр (ищем после Д: или ФД:)\n - ФП (fp): 10 цифр (ищем после П: или ФП:)\n \n Возвращает строку формата: t=20251219T1216&s=770.00&fn=7384440800514469&i=11194&fp=3334166168&n=1\n \"\"\"\n # 1. Поиск даты и времени\n # Ищем форматы DD.MM.YY HH:MM или DD.MM.YYYY HH:MM\n date_match = re.search(r'(\\d{2})\\.(\\d{2})\\.(\\d{2,4})\\s+(\\d{2}):(\\d{2})', text)\n t_param = \"\"\n if date_match:\n d, m, y, hh, mm = date_match.groups()\n if len(y) == 2: y = \"20\" + y\n t_param = f\"{y}{m}{d}T{hh}{mm}\"\n\n # 2. Поиск суммы (Итог)\n # Ищем слово ИТОГ и число после него\n sum_match = re.search(r'(?:ИТОГ|СУММА|СУММА:)\\s*[:]*\\s*(\\d+[.,]\\d{2})', text, re.IGNORECASE)\n s_param = \"\"\n if sum_match:\n s_param = sum_match.group(1).replace(',', '.')\n\n # 3. Поиск ФН (16 цифр, начинается с 73)\n fn_match = re.search(r'\\b(73\\d{14})\\b', text)\n fn_param = fn_match.group(1) if fn_match else \"\"\n\n # 4. Поиск ФД (i) - ищем после маркеров Д: или ФД:\n # Берем набор цифр до 8 знаков\n fd_match = re.search(r'(?:ФД|Д)[:\\s]+(\\d{1,8})\\b', text)\n i_param = fd_match.group(1) if fd_match else \"\"\n\n # 5. Поиск ФП (fp) - ищем после маркеров П: или ФП:\n # Строго 10 цифр\n fp_match = re.search(r'(?:ФП|П)[:\\s]+(\\d{10})\\b', text)\n fp_param = fp_match.group(1) if fp_match else \"\"\n\n # Валидация: для формирования запроса к API нам критически важны все параметры\n if all([t_param, s_param, fn_param, i_param, fp_param]):\n return f\"t={t_param}&s={s_param}&fn={fn_param}&i={i_param}&fp={fp_param}&n=1\"\n \n return None\n\ndef parse_receipt_text(text: str) -> List[ParsedItem]:\n \"\"\"\n Парсит текст чека построчно (Regex-метод).\n \"\"\"\n lines = text.split('\\n')\n items = []\n name_buffer = []\n\n for line in lines:\n line = line.strip()\n if not line:\n continue\n\n floats = re.findall(FLOAT_RE, line)\n is_price_line = False\n \n if len(floats) >= 2:\n is_price_line = True\n vals = [parse_float(f) for f in floats]\n \n price = 0.0\n amount = 1.0\n total = vals[-1]\n \n if len(vals) == 2:\n price = vals[0]\n amount = 1.0\n if total > price and price > 0:\n calc_amount = total / price\n if abs(round(calc_amount) - calc_amount) < 0.05:\n amount = float(round(calc_amount))\n elif len(vals) >= 3:\n v1, v2 = vals[-3], vals[-2]\n if abs(v1 * v2 - total) < 0.5:\n price, amount = v1, v2\n elif abs(v2 * v1 - total) < 0.5:\n price, amount = v2, v1\n else:\n price, amount = vals[-2], 1.0\n\n full_name = \" \".join(name_buffer).strip()\n if not full_name:\n text_without_floats = re.sub(FLOAT_RE, '', line)\n full_name = clean_text(text_without_floats)\n\n if len(full_name) > 2 and total > 0:\n items.append(ParsedItem(\n raw_name=full_name,\n amount=amount,\n price=price,\n sum=total\n ))\n name_buffer = []\n else:\n upper_line = line.upper()\n if any(stop in upper_line for stop in [\"ИТОГ\", \"СУММА\", \"ПРИХОД\"]):\n name_buffer = [] \n continue\n name_buffer.append(line)\n\n return items", "qr_manager.py": "import logging\nimport requests\nfrom typing import Optional, List\nfrom pyzbar.pyzbar import decode\nfrom PIL import Image\nimport numpy as np\n\n# Импортируем модель из parser.py\nfrom parser import ParsedItem \n\nAPI_TOKEN = \"36590.yqtiephCvvkYUKM2W\"\nAPI_URL = \"https://proverkacheka.com/api/v1/check/get\"\n\nlogger = logging.getLogger(__name__)\n\ndef is_valid_fiscal_qr(qr_string: str) -> bool:\n \"\"\"\n Проверяет, соответствует ли строка формату фискального чека ФНС.\n Ожидаемый формат: t=...&s=...&fn=...&i=...&fp=...&n=...\n Мы проверяем наличие хотя бы 3-х ключевых параметров.\n \"\"\"\n if not qr_string:\n return False\n \n # Ключевые параметры, которые обязаны быть в строке чека\n required_keys = [\"t=\", \"s=\", \"fn=\"]\n \n # Проверяем, что все ключевые параметры присутствуют\n # (порядок может отличаться, поэтому проверяем вхождение каждого)\n matches = [key in qr_string for key in required_keys]\n \n return all(matches)\n\ndef detect_and_decode_qr(image: np.ndarray) -> Optional[str]:\n \"\"\"\n Ищет ВСЕ QR-коды на изображении и возвращает только тот, \n который похож на фискальный чек.\n \"\"\"\n try:\n pil_img = Image.fromarray(image)\n \n # Декодируем все коды на картинке\n decoded_objects = decode(pil_img)\n \n if not decoded_objects:\n logger.info(\"No QR codes detected on the image.\")\n return None\n \n logger.info(f\"Detected {len(decoded_objects)} code(s). Scanning for fiscal data...\")\n\n for obj in decoded_objects:\n if obj.type == 'QRCODE':\n qr_data = obj.data.decode(\"utf-8\")\n \n # Логируем найденное (для отладки, если вдруг формат хитрый)\n # Обрезаем длинные строки, чтобы не засорять лог\n log_preview = (qr_data[:75] + '..') if len(qr_data) > 75 else qr_data\n logger.info(f\"Checking QR content: {log_preview}\")\n \n if is_valid_fiscal_qr(qr_data):\n logger.info(\"Valid fiscal QR found!\")\n return qr_data\n else:\n logger.info(\"QR skipped (not a fiscal receipt pattern).\")\n \n logger.warning(\"QR codes were found, but none matched the fiscal receipt format.\")\n return None\n\n except Exception as e:\n logger.error(f\"Error during QR detection: {e}\")\n return None\n\ndef fetch_data_from_api(qr_raw: str) -> List[ParsedItem]:\n \"\"\"\n Отправляет данные QR-кода в API proverkacheka.com и парсит JSON-ответ.\n \"\"\"\n try:\n payload = {\n 'qrraw': qr_raw,\n 'token': API_TOKEN\n }\n \n logger.info(\"Sending request to Check API...\")\n response = requests.post(API_URL, data=payload, timeout=10)\n \n if response.status_code != 200:\n logger.error(f\"API Error: Status {response.status_code}\")\n return []\n\n data = response.json()\n \n if data.get('code') != 1:\n logger.warning(f\"API returned non-success code: {data.get('code')}\")\n return []\n \n json_data = data.get('data', {}).get('json', {})\n items_data = json_data.get('items', [])\n \n parsed_items = []\n \n for item in items_data:\n price = float(item.get('price', 0)) / 100.0\n total_sum = float(item.get('sum', 0)) / 100.0\n quantity = float(item.get('quantity', 0))\n name = item.get('name', 'Unknown')\n \n parsed_items.append(ParsedItem(\n raw_name=name,\n amount=quantity,\n price=price,\n sum=total_sum\n ))\n \n return parsed_items\n\n except Exception as e:\n logger.error(f\"Error fetching/parsing API data: {e}\")\n return []", - "requirements.txt": "fastapi\nuvicorn\npython-multipart\npydantic\nnumpy\nopencv-python-headless\npytesseract\nrequests\npyzbar\npillow", + "requirements.txt": "fastapi\nuvicorn\npython-multipart\npydantic\nnumpy\nopencv-python-headless\npytesseract\nrequests\npyzbar\npillow\ncertifi", "yandex_ocr.py": "import os\nimport time\nimport json\nimport base64\nimport logging\nimport requests\nfrom typing import Optional\n\nlogger = logging.getLogger(__name__)\n\nIAM_TOKEN_URL = \"https://iam.api.cloud.yandex.net/iam/v1/tokens\"\nVISION_URL = \"https://ocr.api.cloud.yandex.net/ocr/v1/recognizeText\"\n\nclass YandexOCREngine:\n def __init__(self):\n self.oauth_token = os.getenv(\"YANDEX_OAUTH_TOKEN\")\n self.folder_id = os.getenv(\"YANDEX_FOLDER_ID\")\n \n # Кэширование IAM токена\n self._iam_token = None\n self._token_expire_time = 0\n\n if not self.oauth_token or not self.folder_id:\n logger.warning(\"Yandex OCR credentials (YANDEX_OAUTH_TOKEN, YANDEX_FOLDER_ID) not set. Yandex OCR will be unavailable.\")\n\n def _get_iam_token(self) -> Optional[str]:\n \"\"\"\n Получает IAM-токен. Если есть живой кэшированный — возвращает его.\n Если нет — обменивает OAuth на IAM.\n \"\"\"\n current_time = time.time()\n \n # Если токен есть и он \"свежий\" (с запасом в 5 минут)\n if self._iam_token and current_time < self._token_expire_time - 300:\n return self._iam_token\n\n logger.info(\"Obtaining new IAM token from Yandex...\")\n try:\n response = requests.post(\n IAM_TOKEN_URL,\n json={\"yandexPassportOauthToken\": self.oauth_token},\n timeout=10\n )\n response.raise_for_status()\n data = response.json()\n \n self._iam_token = data[\"iamToken\"]\n \n # Токен обычно живет 12 часов, но мы будем ориентироваться на поле expiresAt если нужно,\n # или просто поставим таймер. Для простоты берем 1 час жизни кэша.\n self._token_expire_time = current_time + 3600 \n \n logger.info(\"IAM token received successfully.\")\n return self._iam_token\n except Exception as e:\n logger.error(f\"Failed to get IAM token: {e}\")\n return None\n\n def recognize(self, image_bytes: bytes) -> str:\n \"\"\"\n Отправляет изображение в Yandex Vision и возвращает полный текст.\n \"\"\"\n if not self.oauth_token or not self.folder_id:\n logger.error(\"Yandex credentials missing.\")\n return \"\"\n\n iam_token = self._get_iam_token()\n if not iam_token:\n return \"\"\n\n # 1. Кодируем в Base64\n b64_image = base64.b64encode(image_bytes).decode(\"utf-8\")\n\n # 2. Формируем тело запроса\n # Используем модель 'page' (для документов) и '*' для автоопределения языка\n payload = {\n \"mimeType\": \"JPEG\", # Yandex переваривает и PNG под видом JPEG часто, но лучше быть аккуратным.\n # В идеале определять mime-type из файла, но JPEG - безопасный дефолт для фото.\n \"languageCodes\": [\"*\"],\n \"model\": \"page\",\n \"content\": b64_image\n }\n\n headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {iam_token}\",\n \"x-folder-id\": self.folder_id,\n \"x-data-logging-enabled\": \"true\"\n }\n\n # 3. Отправляем запрос\n try:\n logger.info(\"Sending request to Yandex Vision OCR...\")\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n # Если 401 Unauthorized, возможно токен протух раньше времени (редко, но бывает)\n if response.status_code == 401:\n logger.warning(\"Got 401 from Yandex. Retrying with fresh token...\")\n self._iam_token = None # сброс кэша\n iam_token = self._get_iam_token()\n if iam_token:\n headers[\"Authorization\"] = f\"Bearer {iam_token}\"\n response = requests.post(VISION_URL, headers=headers, json=payload, timeout=20)\n \n response.raise_for_status()\n result_json = response.json()\n \n # 4. Парсим ответ\n # Структура: result -> textAnnotation -> fullText\n # Или (если fullText нет) blocks -> lines -> text\n \n text_annotation = result_json.get(\"result\", {}).get(\"textAnnotation\", {})\n \n if not text_annotation:\n logger.warning(\"Yandex returned success but no textAnnotation found.\")\n return \"\"\n \n # Самый простой способ - взять fullText, он обычно склеен с \\n\n full_text = text_annotation.get(\"fullText\", \"\")\n \n if not full_text:\n # Фолбэк: если fullText пуст, собираем вручную по блокам\n logger.info(\"fullText empty, assembling from blocks...\")\n lines_text = []\n for block in text_annotation.get(\"blocks\", []):\n for line in block.get(\"lines\", []):\n lines_text.append(line.get(\"text\", \"\"))\n full_text = \"\\n\".join(lines_text)\n\n return full_text\n\n except Exception as e:\n logger.error(f\"Error during Yandex Vision request: {e}\")\n return \"\"\n\n# Глобальный инстанс\nyandex_engine = YandexOCREngine()", } diff --git a/pack_go_files.py b/pack_go_files.py index e61dad7..afb9b72 100644 --- a/pack_go_files.py +++ b/pack_go_files.py @@ -25,7 +25,8 @@ IGNORE_LIST = [ "ftp_cache", "ocr-service", "rmser-view", - "temp" + "temp", + "pack_go_files.py" ] @@ -134,7 +135,7 @@ def write_to_py(files, tree_str, output_file): def main(): root_dir = "." - output_file = "project_dump.py" + output_file = "go_backend_dump.py" if len(sys.argv) > 1: output_file = sys.argv[1] diff --git a/rmser-view/.gitignore b/rmser-view/.gitignore index a547bf3..2530b18 100644 --- a/rmser-view/.gitignore +++ b/rmser-view/.gitignore @@ -22,3 +22,4 @@ dist-ssr *.njsproj *.sln *.sw? +*.txt \ No newline at end of file diff --git a/rmser-view/dump_react.py b/rmser-view/dump_react.py new file mode 100644 index 0000000..c166a94 --- /dev/null +++ b/rmser-view/dump_react.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +pack_project_dump.py + +Упаковывает код проекта в один текстовый файл, удобный для анализа: +- дерево файлов +- затем содержимое каждого файла в блоках с маркерами +- фильтрация мусорных директорий (node_modules, dist, build и т.п.) +- лимит размера на файл, чтобы не раздувать дамп +- попытка декодирования utf-8 с заменой ошибок + +Пример: + python pack_project_dump.py --root . --out project_dump.txt +""" + +from __future__ import annotations + +import argparse +import fnmatch +import hashlib +import os +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable, List, Optional, Tuple + + +DEFAULT_EXCLUDE_DIRS = { + "node_modules", + "dist", + "build", + ".next", + ".cache", + ".turbo", + ".vercel", + "coverage", + ".git", + ".idea", + ".vscode", +} + +DEFAULT_EXCLUDE_FILES = { + "package-lock.json", # можно оставить, но часто огромный + "yarn.lock", # можно оставить, но часто огромный + "pnpm-lock.yaml", # можно оставить, но часто огромный +} + +DEFAULT_TEXT_EXTS = { + ".js", ".jsx", ".ts", ".tsx", + ".json", ".md", ".css", ".scss", ".sass", ".less", + ".html", ".yml", ".yaml", + ".env", ".env.example", + ".gitignore", ".editorconfig", + ".txt", + ".mjs", ".cjs", "Dockerfile", +} + + +@dataclass(frozen=True) +class FileEntry: + rel_path: str + size: int + sha256: str + + +def sha256_bytes(data: bytes) -> str: + h = hashlib.sha256() + h.update(data) + return h.hexdigest() + + +def is_probably_text(path: Path, extra_exts: Optional[set[str]] = None) -> bool: + ext = path.suffix.lower() + if extra_exts and ext in extra_exts: + return True + if ext in DEFAULT_TEXT_EXTS: + return True + # Файлы без расширения, но “текстовые” по имени + if path.name in {".eslintrc", ".prettierrc"}: + return True + return False + + +def should_exclude_path( + rel_parts: Tuple[str, ...], + exclude_dirs: set[str], + exclude_file_globs: List[str], + exclude_files: set[str], +) -> bool: + # исключаем директории по любому сегменту пути + if any(part in exclude_dirs for part in rel_parts[:-1]): + return True + + name = rel_parts[-1] if rel_parts else "" + if name in exclude_files: + return True + + rel_str = "/".join(rel_parts) + for pat in exclude_file_globs: + if fnmatch.fnmatch(rel_str, pat) or fnmatch.fnmatch(name, pat): + return True + + return False + + +def iter_project_files( + root: Path, + exclude_dirs: set[str], + exclude_files: set[str], + exclude_file_globs: List[str], +) -> Iterable[Path]: + for dirpath, dirnames, filenames in os.walk(root): + # фильтруем dirnames на месте, чтобы os.walk не заходил внутрь + dirnames[:] = [d for d in dirnames if d not in exclude_dirs] + + for fname in filenames: + p = Path(dirpath) / fname + rel = p.relative_to(root) + rel_parts = tuple(rel.parts) + if should_exclude_path(rel_parts, exclude_dirs, exclude_file_globs, exclude_files): + continue + yield p + + +def build_tree_listing(paths: List[Path], root: Path) -> str: + rels = sorted(str(p.relative_to(root)).replace(os.sep, "/") for p in paths) + lines = ["Дерево файлов:"] + for r in rels: + lines.append(f"- {r}") + return "\n".join(lines) + "\n" + + +def read_file_bytes(path: Path, max_file_bytes: int) -> Tuple[bytes, bool]: + data = path.read_bytes() + if len(data) > max_file_bytes: + return data[:max_file_bytes], True + return data, False + + +def decode_text(data: bytes) -> str: + # Пытаемся utf-8; если ошибки — заменяем, чтобы не падать + return data.decode("utf-8", errors="replace") + + +def pack_dump( + root: Path, + out_path: Path, + include_globs: List[str], + exclude_dirs: set[str], + exclude_files: set[str], + exclude_file_globs: List[str], + max_file_kb: int, + only_text: bool, +) -> None: + max_file_bytes = max_file_kb * 1024 + + all_files = list(iter_project_files(root, exclude_dirs, exclude_files, exclude_file_globs)) + + # apply include globs if provided + if include_globs: + def match_any(rel: str) -> bool: + return any(fnmatch.fnmatch(rel, g) for g in include_globs) + + filtered = [] + for p in all_files: + rel = str(p.relative_to(root)).replace(os.sep, "/") + if match_any(rel): + filtered.append(p) + all_files = filtered + + entries: List[FileEntry] = [] + blocks: List[str] = [] + + # дерево проекта + blocks.append(f"Снимок проекта: {root.resolve()}") + blocks.append(f"Дата (UTC): {datetime.now(timezone.utc).isoformat()}") + blocks.append("") + blocks.append(build_tree_listing(all_files, root)) + + for p in sorted(all_files, key=lambda x: str(x)): + rel = str(p.relative_to(root)).replace(os.sep, "/") + + if only_text and not is_probably_text(p): + continue + + try: + raw, truncated = read_file_bytes(p, max_file_bytes) + except Exception as e: + blocks.append("<<>>") + blocks.append(f"path: {rel}") + blocks.append("error: не удалось прочитать файл") + blocks.append(f"exception: {type(e).__name__}: {e}") + blocks.append("<<>>") + blocks.append("") + continue + + sha = sha256_bytes(raw) + size_on_disk = p.stat().st_size + entries.append(FileEntry(rel_path=rel, size=size_on_disk, sha256=sha)) + + text = decode_text(raw) + + blocks.append("<<>>") + blocks.append(f"path: {rel}") + blocks.append(f"size_bytes: {size_on_disk}") + blocks.append(f"sha256_first_{max_file_kb}kb: {sha}") + if truncated: + blocks.append(f"truncated: true (первые {max_file_kb} KB)") + else: + blocks.append("truncated: false") + blocks.append("<<>>") + blocks.append(text) + blocks.append("<<>>") + blocks.append("") + + # краткий индекс + blocks.insert( + 0, + "Индекс файлов (путь | размер | sha256 первых N KB):\n" + + "\n".join(f"- {e.rel_path} | {e.size} | {e.sha256}" for e in entries) + + "\n" + ) + + out_path.write_text("\n".join(blocks), encoding="utf-8") + + +def parse_args() -> argparse.Namespace: + ap = argparse.ArgumentParser() + ap.add_argument("--root", default=".", help="Корень проекта") + ap.add_argument("--out", default="react_ts_frontend.txt", help="Файл-выход (один)") + ap.add_argument( + "--include", + action="append", + default=[], + help="Глоб-паттерн для включения (можно несколько), например: 'src/**' или '**/*.tsx'", + ) + ap.add_argument( + "--exclude-file", + action="append", + default=[], + help="Глоб-паттерн для исключения файлов, например: '**/*.min.js'", + ) + ap.add_argument( + "--max-file-kb", + type=int, + default=512, + help="Максимальный объём на один файл (KB). Остальное отрежется.", + ) + ap.add_argument( + "--only-text", + action="store_true", + help="Включать только вероятно текстовые файлы по расширению/имени", + ) + return ap.parse_args() + + +def main() -> None: + args = parse_args() + root = Path(args.root).resolve() + out_path = Path(args.out).resolve() + + pack_dump( + root=root, + out_path=out_path, + include_globs=args.include, + exclude_dirs=set(DEFAULT_EXCLUDE_DIRS), + exclude_files=set(DEFAULT_EXCLUDE_FILES), + exclude_file_globs=args.exclude_file, + max_file_kb=args.max_file_kb, + only_text=args.only_text, + ) + + print(f"Готово: {out_path}") + + +if __name__ == "__main__": + main() \ No newline at end of file