Compare commits

...

3 Commits

Author SHA1 Message Date
0fa41e6d46 prod testing
Some checks failed
Build and Deploy / build_and_deploy (push) Failing after 17s
2025-07-24 11:40:21 +03:00
da6d345609 ready to deploy
Some checks failed
Build and Deploy / build_and_deploy (push) Failing after 10s
2025-07-24 11:28:34 +03:00
1c14f9bcc1 docker-ready 2025-07-22 01:15:08 +03:00
9 changed files with 195 additions and 24 deletions

16
.dockerignore Normal file
View File

@@ -0,0 +1,16 @@
# .dockerignore
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
env/
venv/
.env
*.db
*.db-journal
test_output/
logs/
.git
.gitignore
.idea/

View File

@@ -0,0 +1,39 @@
name: Build and Deploy
on:
push:
branches:
- prod
jobs:
build_and_deploy:
runs-on: docker
steps:
- name: Checkout Repository
uses: actions/checkout@v4
- name: Define Image Name
id: image_vars
run: |
IMAGE_TAG="${{ GITEA_BRANCH }}-${{ GITEA_SHA::7 }}"
FULL_IMAGE_NAME="${{ GITEA_REPO_NAME }}:${IMAGE_TAG}"
echo "::set-output name=FULL_IMAGE_NAME::${FULL_IMAGE_NAME}"
- name: Build Docker Image
run: |
docker buildx build --load --platform linux/amd64 \
-t "${{ steps.image_vars.outputs.FULL_IMAGE_NAME }}" .
- name: Set up SSH
run: |
mkdir -p ~/.ssh
echo "${{ secrets.PROD_SSH_KEY }}" > ~/.ssh/id_ed25519
chmod 600 ~/.ssh/id_ed25519
- name: Save, Transfer and Deploy Image
run: |
IMAGE_TO_DEPLOY="${{ steps.image_vars.outputs.FULL_IMAGE_NAME }}"
echo "Transferring image ${IMAGE_TO_DEPLOY} to production..."
docker save "${IMAGE_TO_DEPLOY}" | ssh -o StrictHostKeyChecking=no deployer@YOUR_PROD_SERVER_IP "/home/deployer/deploy.sh '${IMAGE_TO_DEPLOY}'"

18
Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
# Этап 1: Используем официальный легковесный образ Python
FROM python:3.9-slim
# Устанавливаем рабочую директорию внутри контейнера
WORKDIR /app
# Обновляем pip и устанавливаем зависимости
# Копируем requirements.txt отдельно, чтобы Docker мог кэшировать этот слой.
# Зависимости переустановятся только если изменится requirements.txt.
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Копируем весь исходный код приложения в рабочую директорию
COPY . .
# Указываем команду, которая будет выполняться при запуске контейнера
CMD ["python", "main.py"]

View File

@@ -1,4 +1,3 @@
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -12,6 +11,8 @@ if not all([SD_ACCESS_KEY, SD_BASE_URL]):
raise ValueError("Необходимо задать переменные окружения SDKEY и SD_BASE_URL") raise ValueError("Необходимо задать переменные окружения SDKEY и SD_BASE_URL")
# --- Paths --- # --- Paths ---
# Директория для хранения лог-файлов
LOG_PATH = os.getenv("LOGPATH", ".")
DB_PATH = os.getenv("BDPATH", ".") # По умолчанию - текущая папка DB_PATH = os.getenv("BDPATH", ".") # По умолчанию - текущая папка
DB_NAME = "fiscals.db" DB_NAME = "fiscals.db"
DB_FULL_PATH = os.path.join(DB_PATH, DB_NAME) DB_FULL_PATH = os.path.join(DB_PATH, DB_NAME)

View File

@@ -1,9 +1,10 @@
# ftp_parser.py
import os import os
import json import json
import logging import logging
from typing import List, Dict from typing import List, Dict
from datetime import datetime, timedelta
from dateutil import parser
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -11,6 +12,7 @@ def process_json_files(directory: str) -> List[Dict]:
""" """
Сканирует директорию, читает все .json файлы и извлекает из них данные Сканирует директорию, читает все .json файлы и извлекает из них данные
о фискальных регистраторах в стандартизированном формате. о фискальных регистраторах в стандартизированном формате.
Файлы, старше 21 дня (по полю 'current_time'), игнорируются.
:param directory: Путь к директории с JSON файлами. :param directory: Путь к директории с JSON файлами.
:return: Список словарей, где каждый словарь представляет один ФР. :return: Список словарей, где каждый словарь представляет один ФР.
@@ -22,6 +24,9 @@ def process_json_files(directory: str) -> List[Dict]:
all_fr_data = [] all_fr_data = []
log.info(f"Начинаю обработку JSON файлов из директории: {directory}") log.info(f"Начинаю обработку JSON файлов из директории: {directory}")
# Пороговая дата: всё что старше, считаем неактуальным
freshness_threshold = datetime.now() - timedelta(days=21)
for filename in os.listdir(directory): for filename in os.listdir(directory):
if filename.lower().endswith('.json'): if filename.lower().endswith('.json'):
file_path = os.path.join(directory, filename) file_path = os.path.join(directory, filename)
@@ -29,6 +34,26 @@ def process_json_files(directory: str) -> List[Dict]:
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f) data = json.load(f)
current_time_str = data.get('current_time')
if not current_time_str:
log.warning(f"Пропущен файл {filename}: отсутствует поле 'current_time' для проверки актуальности.")
continue
try:
file_datetime = parser.parse(current_time_str)
# Приводим к aware-объекту с локальной таймзоной, если он naive, для корректного сравнения
if file_datetime.tzinfo is None:
file_datetime = file_datetime.astimezone()
# Сравниваем с пороговым значением (пороговое значение тоже делаем aware)
if file_datetime < freshness_threshold.astimezone(file_datetime.tzinfo):
log.warning(f"Пропущен файл {filename}: данные неактуальны (старше 21 дня). "
f"Дата файла: {file_datetime.strftime('%Y-%m-%d')}")
continue
except parser.ParserError:
log.error(f"Не удалось распознать дату в поле 'current_time' в файле {filename}: '{current_time_str}'")
continue
if 'serialNumber' not in data or not data['serialNumber']: if 'serialNumber' not in data or not data['serialNumber']:
log.warning(f"Пропущен файл {filename}: отсутствует serialNumber.") log.warning(f"Пропущен файл {filename}: отсутствует serialNumber.")
continue continue

54
main.py
View File

@@ -1,8 +1,8 @@
# main.py
import sys import sys
import time import time
import logging import logging
import logging.handlers
import os
import schedule import schedule
import config import config
@@ -11,15 +11,53 @@ from sd_api import ServiceDeskClient
from sync_logic import Synchronizer from sync_logic import Synchronizer
def setup_logging(): def setup_logging():
"""Настраивает базовую конфигурацию логирования.""" """
logging.basicConfig( Настраивает продвинутую конфигурацию логирования с ротацией файлов
level=logging.INFO, и разными уровнями для консоли и файла.
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', """
stream=sys.stdout, # Вывод логов в stdout # 1. Устанавливаем корневому логгеру самый низкий уровень (DEBUG).
# Это позволяет хэндлерам самим решать, что пропускать.
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# 2. Создаем директорию для логов, если она не существует.
try:
if not os.path.exists(config.LOG_PATH):
os.makedirs(config.LOG_PATH)
except OSError as e:
print(f"Ошибка создания директории для логов {config.LOG_PATH}: {e}")
# В случае ошибки выходим, т.к. логирование в файл не будет работать
sys.exit(1)
# 3. Настраиваем форматтер для логов.
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 4. Настраиваем хэндлер для вывода в консоль (уровень INFO).
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(log_formatter)
# 5. Настраиваем хэндлер для записи в файл с ежедневной ротацией (уровень DEBUG).
# Файлы будут вида sync_service.log, sync_service.log.2023-10-27 и т.д.
log_file = os.path.join(config.LOG_PATH, 'sync_service.log')
file_handler = logging.handlers.TimedRotatingFileHandler(
log_file,
when='midnight', # Ротация в полночь
interval=1, # Каждый день
backupCount=7, # Хранить 7 старых файлов
encoding='utf-8'
) )
# Отключаем слишком "болтливые" логи от сторонних библиотек file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(log_formatter)
# 6. Добавляем оба хэндлера к корневому логгеру.
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
# 7. Отключаем слишком "болтливые" логи от сторонних библиотек.
logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("schedule").setLevel(logging.INFO)
def job(): def job():
""" """

View File

@@ -2,3 +2,4 @@ python-dotenv
requests requests
schedule schedule
python-dateutil python-dateutil
urllib3

View File

@@ -6,6 +6,8 @@ import json
import os import os
from typing import List, Dict, Any from typing import List, Dict, Any
from datetime import datetime from datetime import datetime
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Импортируем нашу конфигурацию # Импортируем нашу конфигурацию
import config import config
@@ -19,6 +21,7 @@ class ServiceDeskAPIError(Exception):
class ServiceDeskClient: class ServiceDeskClient:
""" """
Клиент для взаимодействия с REST API ServiceDesk. Клиент для взаимодействия с REST API ServiceDesk.
Оснащен механизмом повторных запросов для повышения отказоустойчивости.
""" """
def __init__(self, access_key: str, base_url: str): def __init__(self, access_key: str, base_url: str):
if not access_key or not base_url: if not access_key or not base_url:
@@ -29,32 +32,55 @@ class ServiceDeskClient:
# Устанавливаем accessKey как параметр по умолчанию для всех запросов # Устанавливаем accessKey как параметр по умолчанию для всех запросов
self.session.params = {'accessKey': access_key} self.session.params = {'accessKey': access_key}
# --- НАСТРОЙКА ЛОГИКИ ПОВТОРНЫХ ЗАПРОСОВ ---
retry_strategy = Retry(
total=3, # Общее количество повторных попыток
backoff_factor=1, # Множитель для задержки (sleep_time = {backoff_factor} * (2 ** ({number of total retries} - 1)))
# Задержки будут примерно 0.5с, 1с, 2с
status_forcelist=[500, 502, 503, 504], # Коды, которые нужно повторять
allowed_methods=["POST", "GET"] # Методы, для которых будет работать retry. API использует POST даже для поиска.
)
# Создаем адаптер с нашей стратегией
adapter = HTTPAdapter(max_retries=retry_strategy)
# "Монтируем" адаптер для всех http и https запросов
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
log.info("Клиент ServiceDesk инициализирован с логикой повторных запросов (retries).")
def _make_request(self, method: str, url: str, params: Dict = None, json_data: Dict = None) -> Any: def _make_request(self, method: str, url: str, params: Dict = None, json_data: Dict = None) -> Any:
""" """
Внутренний метод для выполнения HTTP-запросов. Внутренний метод для выполнения HTTP-запросов.
Теперь он автоматически использует логику retry, настроенную в __init__.
:param method: HTTP метод ('GET', 'POST', etc.) :param method: HTTP метод ('GET', 'POST', etc.)
:param url: Полный URL для запроса. :param url: Полный URL для запроса.
:param params: Дополнительные параметры URL (кроме accessKey). :param params: Дополнительные параметры URL (кроме accessKey).
:param json_data: Тело запроса в формате JSON для POST/PUT. :param json_data: Тело запроса в формате JSON для POST/PUT.
:return: Ответ сервера в виде JSON. :return: Ответ сервера в виде JSON.
:raises ServiceDeskAPIError: в случае ошибки API или сети. :raises ServiceDeskAPIError: в случае ошибки API или сети ПОСЛЕ всех попыток.
""" """
try: try:
# Теперь этот вызов будет автоматически повторяться в случае сбоев
response = self.session.request(method, url, params=params, json=json_data, timeout=30) response = self.session.request(method, url, params=params, json=json_data, timeout=30)
# Проверяем на ошибки ПОСЛЕ успешного выполнения запроса (или исчерпания попыток)
response.raise_for_status() # Вызовет исключение для кодов 4xx/5xx response.raise_for_status() # Вызовет исключение для кодов 4xx/5xx
# Некоторые ответы могут быть пустыми (например, при успешном редактировании) # Некоторые ответы могут быть пустыми (например, при успешном редактировании - 204)
if response.status_code == 204 or not response.text: # или успешном создании (201)
if response.status_code in [204, 201] or not response.text:
return None return None
return response.json() return response.json()
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
# Эта ошибка возникнет, если после всех попыток сервер все равно вернул 4xx или 5xx
error_message = f"HTTP Error: {e.response.status_code} for URL {url}. Response: {e.response.text}" error_message = f"HTTP Error: {e.response.status_code} for URL {url}. Response: {e.response.text}"
log.error(error_message) log.error(error_message)
raise ServiceDeskAPIError(error_message) from e raise ServiceDeskAPIError(error_message) from e
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
# Эта ошибка возникнет, если после всех попыток не удалось подключиться к серверу (например, DNS или таймаут)
error_message = f"Request failed for URL {url}: {e}" error_message = f"Request failed for URL {url}: {e}"
log.error(error_message) log.error(error_message)
raise ServiceDeskAPIError(error_message) from e raise ServiceDeskAPIError(error_message) from e
@@ -73,7 +99,8 @@ class ServiceDeskClient:
} }
frs = self._make_request('POST', config.FIND_FRS_URL, params=params) frs = self._make_request('POST', config.FIND_FRS_URL, params=params)
log.info(f"Получено {len(frs)} записей о ФР.") log.info(f"Получено {len(frs)} записей о ФР.")
return frs # frs может быть None если ответ пустой, вернем пустой список для консистентности
return frs or []
def get_all_workstations(self) -> List[Dict]: def get_all_workstations(self) -> List[Dict]:
"""Получает список всех рабочих станций.""" """Получает список всех рабочих станций."""
@@ -83,7 +110,7 @@ class ServiceDeskClient:
} }
workstations = self._make_request('POST', config.FIND_WORKSTATIONS_URL, params=params) workstations = self._make_request('POST', config.FIND_WORKSTATIONS_URL, params=params)
log.info(f"Получено {len(workstations)} записей о рабочих станциях.") log.info(f"Получено {len(workstations)} записей о рабочих станциях.")
return workstations return workstations or []
def get_lookup_values(self, metaclass: str) -> List[Dict]: def get_lookup_values(self, metaclass: str) -> List[Dict]:
""" """
@@ -97,7 +124,7 @@ class ServiceDeskClient:
params = {'attrs': 'UUID,title'} params = {'attrs': 'UUID,title'}
lookups = self._make_request('POST', url, params=params) lookups = self._make_request('POST', url, params=params)
log.info(f"Получено {len(lookups)} значений для {metaclass}.") log.info(f"Получено {len(lookups)} значений для {metaclass}.")
return lookups return lookups or []
def update_fr(self, uuid: str, data: Dict) -> None: def update_fr(self, uuid: str, data: Dict) -> None:
""" """
@@ -125,9 +152,6 @@ class ServiceDeskClient:
return return
log.info(f"Обновление объекта ФР с UUID: {uuid}...") log.info(f"Обновление объекта ФР с UUID: {uuid}...")
# Примечание: старый код использовал POST с параметрами для редактирования.
# Если API требует form-encoded data, а не JSON, нужно использовать `data=data` вместо `json=data`.
# Судя по вашему коду, это POST-запрос с параметрами в URL, а не в теле.
url = config.EDIT_FR_URL_TEMPLATE.format(uuid=uuid) url = config.EDIT_FR_URL_TEMPLATE.format(uuid=uuid)
self._make_request('POST', url, params=data) self._make_request('POST', url, params=data)
log.info(f"Объект {uuid} успешно обновлен.") log.info(f"Объект {uuid} успешно обновлен.")

View File

@@ -3,7 +3,7 @@
import re import re
import logging import logging
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from datetime import datetime from datetime import datetime, timedelta
from dateutil import parser from dateutil import parser
# Импортируем наши модули # Импортируем наши модули
@@ -172,8 +172,8 @@ class Synchronizer:
def _update_existing_frs(self): def _update_existing_frs(self):
""" """
Находит и обновляет ФР с отличающимися датами окончания ФН, Находит и обновляет ФР с отличающимися датами окончания ФН.
воспроизводя проверенную логику в более производительном виде. Пропускает обновление, если разница в датах более 10 лет.
""" """
log.info("Поиск ФР для обновления...") log.info("Поиск ФР для обновления...")
@@ -199,6 +199,9 @@ class Synchronizer:
records_to_check = self.db._execute_query(query, fetch='all') records_to_check = self.db._execute_query(query, fetch='all')
update_counter = 0 update_counter = 0
#Порог для сравнения дат
archive_delta = timedelta(days=365*10)
for rec in records_to_check: for rec in records_to_check:
try: try:
# 2. Приводим даты к одному "знаменателю" - объектам datetime # 2. Приводим даты к одному "знаменателю" - объектам datetime
@@ -214,12 +217,18 @@ class Synchronizer:
# 3. Сравниваем даты. Если они различаются, готовим обновление. # 3. Сравниваем даты. Если они различаются, готовим обновление.
# Проверяем на неравенство. Величина различия не важна. # Проверяем на неравенство. Величина различия не важна.
if pos_date != sd_date: if pos_date != sd_date:
# Проверяем, что разница не больше 10 лет
if abs(pos_date - sd_date) > archive_delta:
log.warning(f"Пропуск сравнения для S/N {rec['serialNumber']}: разница в датах больше 10 лет."
f"FTP: {pos_date.date()}, SD: {sd_date.date()}.")
continue
log.info(f"Найдено расхождение в дате для S/N {rec['serialNumber']} (UUID: {rec['sd_uuid']}). " log.info(f"Найдено расхождение в дате для S/N {rec['serialNumber']} (UUID: {rec['sd_uuid']}). "
f"FTP: {pos_date}, SD: {sd_date}. Подготовка к обновлению.") f"FTP: {pos_date}, SD: {sd_date}. Подготовка к обновлению.")
# 4. Формируем данные для обновления # 4. Формируем данные для обновления
# 5. Проверяем, закончился ли ФН # Проверяем, закончился ли ФН
if rec['pos_rnm'] == '0000000000000000': if rec['pos_rnm'] == '0000000000000000':
legal_name = 'ЗАКОНЧИЛСЯ ФН' legal_name = 'ЗАКОНЧИЛСЯ ФН'
else: else: