299 lines
15 KiB
Python
299 lines
15 KiB
Python
# sync_logic.py
|
||
|
||
import re
|
||
import logging
|
||
from typing import Dict, List, Optional, Any
|
||
from datetime import datetime
|
||
from dateutil import parser
|
||
|
||
# Импортируем наши модули
|
||
import config
|
||
from database import DatabaseManager
|
||
from sd_api import ServiceDeskClient, ServiceDeskAPIError
|
||
import ftp_parser
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
def _clean_sd_remote_id(raw_id: Optional[str]) -> Optional[str]:
|
||
"""
|
||
Очищает ID удаленного доступа, полученный из ServiceDesk.
|
||
(Эта функция была ранее в ftp_parser, но перенесена сюда,
|
||
так как "грязные" ID приходят из SD).
|
||
"""
|
||
if not raw_id or raw_id.lower() == 'none':
|
||
return None
|
||
no_spaces_id = re.sub(r'\s+', '', raw_id)
|
||
match = re.search(r'\d+', no_spaces_id)
|
||
return match.group(0) if match else None
|
||
|
||
|
||
def _extract_srok_fn_from_execution(fn_execution: Optional[str]) -> Optional[str]:
|
||
"""Извлекает срок ФН (13, 15, 36) из строки fnExecution."""
|
||
if not fn_execution:
|
||
return None
|
||
# Ищем числа 13, 15 или 36 в строке
|
||
match = re.search(r'(13|15|36)', fn_execution)
|
||
return match.group(1) if match else None
|
||
|
||
|
||
def _map_ffd_version(ffd_from_ftp: Optional[str]) -> Optional[str]:
|
||
"""Сопоставляет версию ФФД из FTP ('120') с версией в SD ('1.2')."""
|
||
if ffd_from_ftp == '120':
|
||
return '1.2'
|
||
if ffd_from_ftp == '105':
|
||
return '1.05'
|
||
# Можно добавить другие сопоставления, если появятся
|
||
return ffd_from_ftp # Возвращаем как есть, если нет сопоставления
|
||
|
||
|
||
class Synchronizer:
|
||
"""
|
||
Класс, инкапсулирующий всю логику синхронизации данных.
|
||
"""
|
||
def __init__(self, db_manager: DatabaseManager, sd_client: ServiceDeskClient):
|
||
self.db = db_manager
|
||
self.sd = sd_client
|
||
self.lookup_cache: Dict[str, Dict[str, str]] = {}
|
||
|
||
def _prepare_data_for_db(self, fr_list: List[Dict], source: str) -> List[tuple]:
|
||
"""
|
||
Преобразует список словарей в список кортежей для массовой вставки в БД.
|
||
Также логирует отсутствующие, но важные поля.
|
||
"""
|
||
prepared_data = []
|
||
|
||
if source == 'pos':
|
||
# Ожидаемые колонки для pos_fiscals
|
||
columns = ('serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial',
|
||
'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion',
|
||
'fnExecution', 'INN', 'anydesk_id', 'teamviewer_id', 'lastModifiedDate')
|
||
|
||
for fr in fr_list:
|
||
# Проверка на наличие обязательных полей
|
||
if not all(fr.get(key) for key in ['serialNumber', 'modelName', 'dateTime_end']):
|
||
log.warning(f"Пропущена запись из FTP: отсутствует одно из ключевых полей "
|
||
f"(serialNumber, modelName, dateTime_end). Данные: {fr}")
|
||
continue
|
||
|
||
prepared_data.append(tuple(fr.get(col) for col in columns))
|
||
|
||
elif source == 'sd':
|
||
# Ожидаемые колонки для sd_fiscals
|
||
columns = ('UUID', 'serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial',
|
||
'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion',
|
||
'owner_uuid', 'lastModifiedDate')
|
||
|
||
for fr in fr_list:
|
||
if not fr.get('UUID') or not fr.get('FRSerialNumber'):
|
||
log.warning(f"Пропущена запись из SD: отсутствует UUID или FRSerialNumber. Данные: {fr}")
|
||
continue
|
||
|
||
# Извлекаем и нормализуем данные
|
||
owner = fr.get('owner')
|
||
model = fr.get('ModelKKT')
|
||
ofd = fr.get('OFDName')
|
||
ffd = fr.get('FFD')
|
||
|
||
prepared_data.append((
|
||
fr.get('UUID'),
|
||
fr.get('FRSerialNumber'),
|
||
model.get('title') if model else None,
|
||
fr.get('RNKKT'),
|
||
fr.get('LegalName'),
|
||
fr.get('FNNumber'),
|
||
fr.get('KKTRegDate'),
|
||
fr.get('FNExpireDate'),
|
||
ofd.get('title') if ofd else None,
|
||
fr.get('FRDownloader'),
|
||
ffd.get('title') if ffd else None,
|
||
owner.get('UUID') if owner else None,
|
||
fr.get('lastModifiedDate')
|
||
))
|
||
return prepared_data, columns
|
||
|
||
def _fetch_and_cache_lookups(self):
|
||
"""Запрашивает и кэширует UUID справочников из SD."""
|
||
log.info("Кэширование справочников из ServiceDesk...")
|
||
metaclasses = ['ModeliFR', 'SrokiFN', 'FFD']
|
||
for mc in metaclasses:
|
||
try:
|
||
values = self.sd.get_lookup_values(mc)
|
||
# Создаем словарь вида {'Название': 'UUID'}
|
||
self.lookup_cache[mc] = {item['title']: item['UUID'] for item in values}
|
||
except ServiceDeskAPIError as e:
|
||
log.error(f"Не удалось загрузить справочник {mc}: {e}")
|
||
# Если не удалось загрузить критически важные справочники, прерываем работу
|
||
raise
|
||
log.info("Справочники успешно закэшированы.")
|
||
|
||
def run_full_sync(self):
|
||
"""Выполняет полный цикл синхронизации."""
|
||
log.info("--- НАЧАЛО ЦИКЛА СИНХРОНИЗАЦИИ ---")
|
||
try:
|
||
# 1. Кэшируем справочники
|
||
self._fetch_and_cache_lookups()
|
||
|
||
# 2. Очищаем таблицы перед заполнением
|
||
self.db.clear_tables(['pos_fiscals', 'sd_fiscals', 'workstations'])
|
||
|
||
# 3. Сбор и загрузка данных с FTP
|
||
pos_frs_raw = ftp_parser.process_json_files(config.JSON_FILES_PATH)
|
||
pos_frs_data, pos_cols = self._prepare_data_for_db(pos_frs_raw, 'pos')
|
||
self.db.bulk_insert('pos_fiscals', pos_cols, pos_frs_data)
|
||
|
||
# 4. Сбор и загрузка данных из ServiceDesk (ФР)
|
||
sd_frs_raw = self.sd.get_all_frs()
|
||
sd_frs_data, sd_cols = self._prepare_data_for_db(sd_frs_raw, 'sd')
|
||
self.db.bulk_insert('sd_fiscals', sd_cols, sd_frs_data)
|
||
|
||
# 5. Сбор и загрузка данных из ServiceDesk (Рабочие станции)
|
||
workstations_raw = self.sd.get_all_workstations()
|
||
workstations_data = []
|
||
for ws in workstations_raw:
|
||
owner = ws.get('owner')
|
||
if ws.get('UUID') and owner and owner.get('UUID'):
|
||
workstations_data.append((
|
||
ws['UUID'],
|
||
owner['UUID'],
|
||
_clean_sd_remote_id(ws.get('AnyDesk')),
|
||
_clean_sd_remote_id(ws.get('Teamviewer'))
|
||
))
|
||
self.db.bulk_insert('workstations', ['uuid', 'owner_uuid', 'clean_anydesk_id', 'clean_teamviewer_id'], workstations_data)
|
||
|
||
# 6. Логика синхронизации
|
||
self._update_existing_frs()
|
||
self._create_new_frs()
|
||
|
||
log.info("--- ЦИКЛ СИНХРОНИЗАЦИИ УСПЕШНО ЗАВЕРШЕН ---")
|
||
|
||
except Exception as e:
|
||
log.critical(f"Критическая ошибка в цикле синхронизации: {e}", exc_info=True)
|
||
|
||
|
||
def _update_existing_frs(self):
|
||
"""Находит и обновляет ФР с отличающимися датами."""
|
||
log.info("Поиск ФР для обновления даты окончания ФН...")
|
||
query = """
|
||
SELECT
|
||
pos.serialNumber,
|
||
pos.dateTime_end AS pos_date,
|
||
pos.datetime_reg AS pos_reg_date,
|
||
pos.fn_serial AS pos_fn_serial,
|
||
pos.RNM AS pos_rnm,
|
||
pos.bootVersion AS pos_boot_version,
|
||
pos.organizationName AS pos_org_name,
|
||
pos.INN AS pos_inn,
|
||
sd.UUID AS sd_uuid,
|
||
sd.dateTime_end AS sd_date
|
||
FROM pos_fiscals pos
|
||
JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber
|
||
WHERE pos.dateTime_end != sd.dateTime_end
|
||
"""
|
||
# Дополнительное условие по дате, как в старом коде, можно добавить сюда, если нужно
|
||
# WHERE date(pos.dateTime_end) > date(sd.dateTime_end, '-65 day') AND pos.dateTime_end != sd.dateTime_end
|
||
|
||
records_to_update = self.db._execute_query(query, fetch='all')
|
||
log.info(f"Найдено {len(records_to_update)} ФР для обновления.")
|
||
|
||
for rec in records_to_update:
|
||
# Проверка, что дата действительно новее (избегаем "старых" данных с FTP)
|
||
# Это простая проверка, можно усложнить, если нужно
|
||
if parser.parse(rec['pos_date']) < parser.parse(rec['sd_date']):
|
||
log.warning(f"Пропуск обновления для S/N {rec['serialNumber']}: дата с FTP ({rec['pos_date']}) "
|
||
f"старше, чем в SD ({rec['sd_date']}).")
|
||
continue
|
||
|
||
legal_name = f"{rec['pos_org_name']} ИНН:{rec['pos_inn']}" if rec['pos_org_name'] and rec['pos_inn'] else "ЗАКОНЧИЛСЯ ФН"
|
||
|
||
params_to_update = {
|
||
'FNNumber': rec['pos_fn_serial'],
|
||
'FNExpireDate': parser.parse(rec['pos_date']).strftime('%Y.%m.%d %H:%M:%S'),
|
||
'KKTRegDate': parser.parse(rec['pos_reg_date']).strftime('%Y.%m.%d %H:%M:%S'),
|
||
'LegalName': legal_name,
|
||
'RNKKT': rec['pos_rnm'],
|
||
'FRDownloader': rec['pos_boot_version']
|
||
}
|
||
try:
|
||
self.sd.update_fr(rec['sd_uuid'], params_to_update)
|
||
except ServiceDeskAPIError as e:
|
||
log.error(f"Не удалось обновить ФР с UUID {rec['sd_uuid']}: {e}")
|
||
|
||
def _create_new_frs(self):
|
||
"""Находит, определяет владельца и создает новые ФР."""
|
||
log.info("Поиск новых ФР для добавления в ServiceDesk...")
|
||
query = """
|
||
SELECT pos.*
|
||
FROM pos_fiscals pos
|
||
LEFT JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber
|
||
WHERE sd.serialNumber IS NULL
|
||
"""
|
||
new_frs = self.db._execute_query(query, fetch='all')
|
||
log.info(f"Найдено {len(new_frs)} новых ФР.")
|
||
|
||
for fr in new_frs:
|
||
owner_uuid = self._find_owner_uuid(fr['anydesk_id'], fr['teamviewer_id'])
|
||
|
||
# Собираем данные для создания
|
||
model_uuid = self.lookup_cache.get('ModeliFR', {}).get(fr['modelName'])
|
||
|
||
srok_fn_str = _extract_srok_fn_from_execution(fr['fnExecution'])
|
||
srok_fn_uuid = self.lookup_cache.get('SrokiFN', {}).get(srok_fn_str) if srok_fn_str else None
|
||
|
||
ffd_version_str = _map_ffd_version(fr['ffdVersion'])
|
||
ffd_uuid = self.lookup_cache.get('FFD', {}).get(ffd_version_str) if ffd_version_str else None
|
||
|
||
if not all([model_uuid, srok_fn_uuid, ffd_uuid]):
|
||
log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: не найдены UUID для справочников. "
|
||
f"Model: {fr['modelName']}({model_uuid}), SrokFN: {srok_fn_str}({srok_fn_uuid}), "
|
||
f"FFD: {ffd_version_str}({ffd_uuid})")
|
||
continue
|
||
|
||
legal_name = f"{fr['organizationName']} ИНН:{fr['INN']}" if fr['organizationName'] and fr['INN'] else ""
|
||
|
||
creation_data = {
|
||
"RNKKT": fr['RNM'],
|
||
"KKTRegDate": parser.parse(fr['datetime_reg']).strftime('%Y.%m.%d %H:%M:%S'),
|
||
"FRSerialNumber": fr['serialNumber'],
|
||
"LegalName": legal_name,
|
||
"FNExpireDate": parser.parse(fr['dateTime_end']).strftime('%Y.%m.%d %H:%M:%S'),
|
||
"FNNumber": fr['fn_serial'],
|
||
"ModelKKT": model_uuid,
|
||
"SrokFN": srok_fn_uuid,
|
||
"FFD": ffd_uuid,
|
||
"owner": owner_uuid
|
||
}
|
||
|
||
try:
|
||
self.sd.create_fr(creation_data)
|
||
except ServiceDeskAPIError as e:
|
||
log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: {e}")
|
||
|
||
def _find_owner_uuid(self, anydesk_id: Optional[str], teamviewer_id: Optional[str]) -> str:
|
||
"""Ищет владельца по ID. Возвращает UUID владельца или UUID по умолчанию."""
|
||
|
||
def query_owner(id_value, id_type_column):
|
||
if not id_value:
|
||
return None
|
||
query = f"SELECT owner_uuid FROM workstations WHERE {id_type_column} = ?"
|
||
result = self.db._execute_query(query, (id_value,), fetch='one')
|
||
return result['owner_uuid'] if result else None
|
||
|
||
owner_by_anydesk = query_owner(anydesk_id, 'clean_anydesk_id')
|
||
owner_by_teamviewer = query_owner(teamviewer_id, 'clean_teamviewer_id')
|
||
|
||
if owner_by_anydesk and owner_by_teamviewer and owner_by_anydesk == owner_by_teamviewer:
|
||
log.info(f"Владелец {owner_by_anydesk} найден по обоим ID: Anydesk={anydesk_id}, TV={teamviewer_id}.")
|
||
return owner_by_anydesk
|
||
|
||
if owner_by_anydesk:
|
||
log.info(f"Владелец {owner_by_anydesk} найден по Anydesk ID: {anydesk_id}.")
|
||
return owner_by_anydesk
|
||
|
||
if owner_by_teamviewer:
|
||
log.info(f"Владелец {owner_by_teamviewer} найден по TeamViewer ID: {teamviewer_id}.")
|
||
return owner_by_teamviewer
|
||
|
||
log.warning(f"Владелец не найден для Anydesk={anydesk_id}, TV={teamviewer_id}. "
|
||
f"Будет назначен владелец по умолчанию: {config.DEFAULT_OWNER_UUID}")
|
||
return config.DEFAULT_OWNER_UUID |