Files
MHservice/sync_logic.py
SERTY da6d345609
Some checks failed
Build and Deploy / build_and_deploy (push) Failing after 10s
ready to deploy
2025-07-24 11:28:34 +03:00

395 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# sync_logic.py
import re
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from dateutil import parser
# Импортируем наши модули
import config
from database import DatabaseManager
from sd_api import ServiceDeskClient, ServiceDeskAPIError
import ftp_parser
log = logging.getLogger(__name__)
def _clean_sd_remote_id(raw_id: Optional[str]) -> Optional[str]:
"""
Очищает ID удаленного доступа, полученный из ServiceDesk..
"""
if not raw_id or raw_id.lower() == 'none':
return None
no_spaces_id = re.sub(r'\s+', '', raw_id)
match = re.search(r'\d+', no_spaces_id)
return match.group(0) if match else None
def _extract_srok_fn_from_execution(fn_execution: Optional[str]) -> Optional[str]:
"""Извлекает срок ФН (13, 15, 36) из строки fnExecution."""
if not fn_execution:
return None
# Ищем числа 13, 15 или 36 в строке
match = re.search(r'(13|15|36)', fn_execution)
return match.group(1) if match else None
def _map_ffd_version(ffd_from_ftp: Optional[str]) -> Optional[str]:
"""Сопоставляет версию ФФД из FTP ('120') с версией в SD ('1.2')."""
if ffd_from_ftp == '120':
return '1.2'
if ffd_from_ftp == '105':
return '1.05'
# Можно добавить другие сопоставления, если появятся
return ffd_from_ftp # Возвращаем как есть, если нет сопоставления
class Synchronizer:
"""
Класс, инкапсулирующий всю логику синхронизации данных.
"""
def __init__(self, db_manager: DatabaseManager, sd_client: ServiceDeskClient):
self.db = db_manager
self.sd = sd_client
self.lookup_cache: Dict[str, Dict[str, str]] = {}
def _prepare_data_for_db(self, fr_list: List[Dict], source: str) -> List[tuple]:
"""
Преобразует список словарей в список кортежей для массовой вставки в БД.
Также логирует отсутствующие, но важные поля.
"""
prepared_data = []
if source == 'pos':
# Ожидаемые колонки для pos_fiscals
columns = ('serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial',
'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion',
'fnExecution', 'INN', 'anydesk_id', 'teamviewer_id', 'lastModifiedDate')
for fr in fr_list:
# Проверка на наличие обязательных полей
if not all(fr.get(key) for key in ['serialNumber', 'modelName', 'dateTime_end']):
log.warning(f"Пропущена запись из FTP: отсутствует одно из ключевых полей "
f"(serialNumber, modelName, dateTime_end). Данные: {fr}")
continue
prepared_data.append(tuple(fr.get(col) for col in columns))
elif source == 'sd':
# Ожидаемые колонки для sd_fiscals
columns = ('UUID', 'serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial',
'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion',
'owner_uuid', 'lastModifiedDate')
for fr in fr_list:
if not fr.get('UUID') or not fr.get('FRSerialNumber'):
log.warning(f"Пропущена запись из SD: отсутствует UUID или FRSerialNumber. Данные: {fr}")
continue
# Извлекаем и нормализуем данные
owner = fr.get('owner')
model = fr.get('ModelKKT')
ofd = fr.get('OFDName')
ffd = fr.get('FFD')
prepared_data.append((
fr.get('UUID'),
fr.get('FRSerialNumber'),
model.get('title') if model else None,
fr.get('RNKKT'),
fr.get('LegalName'),
fr.get('FNNumber'),
fr.get('KKTRegDate'),
fr.get('FNExpireDate'),
ofd.get('title') if ofd else None,
fr.get('FRDownloader'),
ffd.get('title') if ffd else None,
owner.get('UUID') if owner else None,
fr.get('lastModifiedDate')
))
return prepared_data, columns
def _fetch_and_cache_lookups(self):
"""Запрашивает и кэширует UUID справочников из SD."""
log.info("Кэширование справочников из ServiceDesk...")
metaclasses = ['ModeliFR', 'SrokiFN', 'FFD']
for mc in metaclasses:
try:
values = self.sd.get_lookup_values(mc)
# Создаем словарь вида {'Название': 'UUID'}
self.lookup_cache[mc] = {item['title']: item['UUID'] for item in values}
except ServiceDeskAPIError as e:
log.error(f"Не удалось загрузить справочник {mc}: {e}")
# Если не удалось загрузить критически важные справочники, прерываем работу
raise
log.info("Справочники успешно закэшированы.")
def run_full_sync(self):
"""Выполняет полный цикл синхронизации."""
log.info("--- НАЧАЛО ЦИКЛА СИНХРОНИЗАЦИИ ---")
try:
# 1. Кэшируем справочники
self._fetch_and_cache_lookups()
# 2. Очищаем таблицы перед заполнением
self.db.clear_tables(['pos_fiscals', 'sd_fiscals', 'workstations'])
# 3. Сбор и загрузка данных с FTP
pos_frs_raw = ftp_parser.process_json_files(config.JSON_FILES_PATH)
pos_frs_data, pos_cols = self._prepare_data_for_db(pos_frs_raw, 'pos')
self.db.bulk_insert('pos_fiscals', pos_cols, pos_frs_data)
# 4. Сбор и загрузка данных из ServiceDesk (ФР)
sd_frs_raw = self.sd.get_all_frs()
sd_frs_data, sd_cols = self._prepare_data_for_db(sd_frs_raw, 'sd')
self.db.bulk_insert('sd_fiscals', sd_cols, sd_frs_data)
# 5. Сбор и загрузка данных из ServiceDesk (Рабочие станции)
workstations_raw = self.sd.get_all_workstations()
workstations_data = []
for ws in workstations_raw:
owner = ws.get('owner')
if ws.get('UUID') and owner and owner.get('UUID'):
workstations_data.append((
ws['UUID'],
owner['UUID'],
_clean_sd_remote_id(ws.get('AnyDesk')),
_clean_sd_remote_id(ws.get('Teamviewer')),
ws.get('lastModifiedDate')
))
self.db.bulk_insert('workstations', ['uuid', 'owner_uuid', 'clean_anydesk_id', 'clean_teamviewer_id', 'lastModifiedDate'], workstations_data)
# 6. Логика синхронизации
self._update_existing_frs()
self._create_new_frs()
log.info("--- ЦИКЛ СИНХРОНИЗАЦИИ УСПЕШНО ЗАВЕРШЕН ---")
except Exception as e:
log.critical(f"Критическая ошибка в цикле синхронизации: {e}", exc_info=True)
def _update_existing_frs(self):
"""
Находит и обновляет ФР с отличающимися датами окончания ФН.
Пропускает обновление, если разница в датах более 10 лет.
"""
log.info("Поиск ФР для обновления...")
# 1. SQL-запрос для поиска расхождений.
# Он объединяет таблицы по serialNumber и выбирает все необходимые поля
# сразу, чтобы избежать вложенных циклов.
query = """
SELECT
pos.serialNumber,
pos.dateTime_end AS pos_date_end,
pos.datetime_reg AS pos_date_reg,
pos.fn_serial AS pos_fn_serial,
pos.RNM AS pos_rnm,
pos.bootVersion AS pos_boot_version,
pos.organizationName AS pos_org_name,
pos.INN AS pos_inn,
sd.UUID AS sd_uuid,
sd.dateTime_end AS sd_date_end
FROM pos_fiscals pos
JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber
"""
records_to_check = self.db._execute_query(query, fetch='all')
update_counter = 0
#Порог для сравнения дат
archive_delta = timedelta(days=365*10)
for rec in records_to_check:
try:
# 2. Приводим даты к одному "знаменателю" - объектам datetime
# dateutil.parser отлично справляется с разными форматами
# Проверяем, что обе даты существуют, прежде чем их парсить
if not rec['pos_date_end'] or not rec['sd_date_end']:
log.warning(f"Пропуск сравнения для S/N {rec['serialNumber']}: одна из дат отсутствует.")
continue
pos_date = parser.parse(rec['pos_date_end'])
sd_date = parser.parse(rec['sd_date_end'])
# 3. Сравниваем даты. Если они различаются, готовим обновление.
# Проверяем на неравенство. Величина различия не важна.
if pos_date != sd_date:
# Проверяем, что разница не больше 10 лет
if abs(pos_date - sd_date) > archive_delta:
log.warning(f"Пропуск сравнения для S/N {rec['serialNumber']}: разница в датах больше 10 лет."
f"FTP: {pos_date.date()}, SD: {sd_date.date()}.")
continue
log.info(f"Найдено расхождение в дате для S/N {rec['serialNumber']} (UUID: {rec['sd_uuid']}). "
f"FTP: {pos_date}, SD: {sd_date}. Подготовка к обновлению.")
# 4. Формируем данные для обновления
# Проверяем, закончился ли ФН
if rec['pos_rnm'] == '0000000000000000':
legal_name = 'ЗАКОНЧИЛСЯ ФН'
else:
legal_name = f"{rec['pos_org_name']} ИНН:{rec['pos_inn']}" if rec['pos_org_name'] and rec['pos_inn'] else rec['pos_org_name']
# Форматируем даты для API ServiceDesk
formatted_expire_date = pos_date.strftime('%Y.%m.%d %H:%M:%S')
# Убеждаемся, что дата регистрации тоже есть, прежде чем форматировать
formatted_reg_date = parser.parse(rec['pos_date_reg']).strftime('%Y.%m.%d %H:%M:%S') if rec['pos_date_reg'] else None
params_to_update = {
'FNNumber': rec['pos_fn_serial'],
'FNExpireDate': formatted_expire_date,
'LegalName': legal_name,
'RNKKT': rec['pos_rnm'],
'FRDownloader': rec['pos_boot_version'],
'KKTRegDate': formatted_reg_date
}
# Удаляем ключи с None-значениями, чтобы не отправлять их в API, если они не обязательны
params_to_update = {k: v for k, v in params_to_update.items() if v is not None}
try:
self.sd.update_fr(rec['sd_uuid'], params_to_update)
update_counter += 1
except ServiceDeskAPIError as e:
log.error(f"Не удалось обновить ФР с UUID {rec['sd_uuid']}: {e}")
except (parser.ParserError, TypeError) as e:
log.error(f"Ошибка парсинга даты для S/N {rec['serialNumber']}. "
f"FTP_date='{rec['pos_date_end']}', SD_date='{rec['sd_date_end']}'. Ошибка: {e}")
continue
log.info(f"Проверка обновлений завершена. Обновлено записей: {update_counter}.")
def _find_lookup_uuid_by_substring(self, lookup_type: str, substring: str) -> Optional[str]:
"""
Ищет UUID в кэше справочников по подстроке в ключе (title).
Например, ищет '13' в ключе '13 мес.'.
:param lookup_type: Тип справочника ('SrokiFN', 'FFD', etc.).
:param substring: Подстрока для поиска (например, '13').
:return: UUID или None, если не найдено.
"""
if not substring or lookup_type not in self.lookup_cache:
return None
# Проходим по всем парам (title, uuid) в нужном справочнике
for title, uuid in self.lookup_cache[lookup_type].items():
# Ищем точное вхождение подстроки, окруженное не-цифрами или границами строки,
# чтобы '15' не совпало с '150'.
if re.search(r'\b' + re.escape(substring) + r'\b', title):
return uuid
return None
def _create_new_frs(self):
"""Находит, определяет владельца и создает новые ФР."""
log.info("Поиск новых ФР для добавления в ServiceDesk...")
query = """
SELECT pos.*
FROM pos_fiscals pos
LEFT JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber
WHERE sd.serialNumber IS NULL
"""
new_frs = self.db._execute_query(query, fetch='all')
log.info(f"Найдено {len(new_frs)} новых ФР.")
for fr in new_frs:
owner_uuid = self._find_owner_uuid(fr['anydesk_id'], fr['teamviewer_id'])
# Собираем данные для создания
model_uuid = self.lookup_cache.get('ModeliFR', {}).get(fr['modelName'])
srok_fn_str = _extract_srok_fn_from_execution(fr['fnExecution'])
if not srok_fn_str:
srok_fn_str = '13'
log.warning(f"Для S/N {fr['serialNumber']} не удалось определить срок ФН из fnExecution. "
f"Установлен срок по умолчанию: {srok_fn_str} мес.")
srok_fn_uuid = self._find_lookup_uuid_by_substring('SrokiFN', srok_fn_str)
ffd_version_str = _map_ffd_version(fr['ffdVersion'])
ffd_uuid = self._find_lookup_uuid_by_substring('FFD', ffd_version_str)
if not all([model_uuid, srok_fn_uuid, ffd_uuid]):
log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: не найдены UUID для справочников. "
f"Model: {fr['modelName']}({model_uuid}), SrokFN: {srok_fn_str}({srok_fn_uuid}), "
f"FFD: {ffd_version_str}({ffd_uuid})")
continue
legal_name = f"{fr['organizationName']} ИНН:{fr['INN']}" if fr['organizationName'] and fr['INN'] else ""
creation_data = {
"RNKKT": fr['RNM'],
"KKTRegDate": parser.parse(fr['datetime_reg']).strftime('%Y.%m.%d %H:%M:%S'),
"FRSerialNumber": fr['serialNumber'],
"LegalName": legal_name,
"FNExpireDate": parser.parse(fr['dateTime_end']).strftime('%Y.%m.%d %H:%M:%S'),
"FNNumber": fr['fn_serial'],
"ModelKKT": model_uuid,
"SrokFN": srok_fn_uuid,
"FFD": ffd_uuid,
"owner": owner_uuid
}
try:
self.sd.create_fr(creation_data)
except ServiceDeskAPIError as e:
log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: {e}")
def _find_owner_uuid(self, anydesk_id: Optional[str], teamviewer_id: Optional[str]) -> str:
"""
Ищет владельца по ID. При наличии нескольких совпадений для одного ID,
выбирает запись с самой свежей датой lastModifiedDate.
Возвращает UUID владельца или UUID по умолчанию.
"""
def query_best_owner(id_value: Optional[str], id_type_column: str) -> Optional[str]:
"""
Запрашивает всех владельцев по ID и возвращает UUID самого "свежего".
"""
if not id_value:
return None
# Запрашиваем всех кандидатов, сортируя по дате в порядке убывания (самые свежие - первые)
query = f"""
SELECT owner_uuid, lastModifiedDate
FROM workstations
WHERE {id_type_column} = ?
ORDER BY lastModifiedDate DESC
"""
results = self.db._execute_query(query, (id_value,), fetch='all')
if not results:
return None
if len(results) > 1:
log.warning(f"Найдено несколько ({len(results)}) рабочих станций с {id_type_column} = {id_value}. "
f"Выбрана самая свежая запись от {results[0]['lastModifiedDate']}.")
# Так как мы отсортировали по DESC, первая запись и есть самая свежая
return results[0]['owner_uuid']
owner_by_anydesk = query_best_owner(anydesk_id, 'clean_anydesk_id')
owner_by_teamviewer = query_best_owner(teamviewer_id, 'clean_teamviewer_id')
# Логика выбора остается прежней, но теперь она работает с "лучшими" кандидатами
if owner_by_anydesk and owner_by_teamviewer and owner_by_anydesk == owner_by_teamviewer:
log.info(f"Владелец {owner_by_anydesk} уверенно найден по обоим ID: Anydesk={anydesk_id}, TV={teamviewer_id}.")
return owner_by_anydesk
if owner_by_anydesk:
log.info(f"Владелец {owner_by_anydesk} найден по Anydesk ID: {anydesk_id}.")
return owner_by_anydesk
if owner_by_teamviewer:
log.info(f"Владелец {owner_by_teamviewer} найден по TeamViewer ID: {teamviewer_id}.")
return owner_by_teamviewer
log.warning(f"Владелец не найден для Anydesk={anydesk_id}, TV={teamviewer_id}. "
f"Будет назначен владелец по умолчанию: {config.DEFAULT_OWNER_UUID}")
return config.DEFAULT_OWNER_UUID