386 lines
20 KiB
Python
386 lines
20 KiB
Python
# sync_logic.py
|
||
|
||
import re
|
||
import logging
|
||
from typing import Dict, List, Optional, Any
|
||
from datetime import datetime
|
||
from dateutil import parser
|
||
|
||
# Импортируем наши модули
|
||
import config
|
||
from database import DatabaseManager
|
||
from sd_api import ServiceDeskClient, ServiceDeskAPIError
|
||
import ftp_parser
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
def _clean_sd_remote_id(raw_id: Optional[str]) -> Optional[str]:
|
||
"""
|
||
Очищает ID удаленного доступа, полученный из ServiceDesk..
|
||
"""
|
||
if not raw_id or raw_id.lower() == 'none':
|
||
return None
|
||
no_spaces_id = re.sub(r'\s+', '', raw_id)
|
||
match = re.search(r'\d+', no_spaces_id)
|
||
return match.group(0) if match else None
|
||
|
||
|
||
def _extract_srok_fn_from_execution(fn_execution: Optional[str]) -> Optional[str]:
|
||
"""Извлекает срок ФН (13, 15, 36) из строки fnExecution."""
|
||
if not fn_execution:
|
||
return None
|
||
# Ищем числа 13, 15 или 36 в строке
|
||
match = re.search(r'(13|15|36)', fn_execution)
|
||
return match.group(1) if match else None
|
||
|
||
|
||
def _map_ffd_version(ffd_from_ftp: Optional[str]) -> Optional[str]:
|
||
"""Сопоставляет версию ФФД из FTP ('120') с версией в SD ('1.2')."""
|
||
if ffd_from_ftp == '120':
|
||
return '1.2'
|
||
if ffd_from_ftp == '105':
|
||
return '1.05'
|
||
# Можно добавить другие сопоставления, если появятся
|
||
return ffd_from_ftp # Возвращаем как есть, если нет сопоставления
|
||
|
||
|
||
class Synchronizer:
|
||
"""
|
||
Класс, инкапсулирующий всю логику синхронизации данных.
|
||
"""
|
||
def __init__(self, db_manager: DatabaseManager, sd_client: ServiceDeskClient):
|
||
self.db = db_manager
|
||
self.sd = sd_client
|
||
self.lookup_cache: Dict[str, Dict[str, str]] = {}
|
||
|
||
def _prepare_data_for_db(self, fr_list: List[Dict], source: str) -> List[tuple]:
|
||
"""
|
||
Преобразует список словарей в список кортежей для массовой вставки в БД.
|
||
Также логирует отсутствующие, но важные поля.
|
||
"""
|
||
prepared_data = []
|
||
|
||
if source == 'pos':
|
||
# Ожидаемые колонки для pos_fiscals
|
||
columns = ('serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial',
|
||
'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion',
|
||
'fnExecution', 'INN', 'anydesk_id', 'teamviewer_id', 'lastModifiedDate')
|
||
|
||
for fr in fr_list:
|
||
# Проверка на наличие обязательных полей
|
||
if not all(fr.get(key) for key in ['serialNumber', 'modelName', 'dateTime_end']):
|
||
log.warning(f"Пропущена запись из FTP: отсутствует одно из ключевых полей "
|
||
f"(serialNumber, modelName, dateTime_end). Данные: {fr}")
|
||
continue
|
||
|
||
prepared_data.append(tuple(fr.get(col) for col in columns))
|
||
|
||
elif source == 'sd':
|
||
# Ожидаемые колонки для sd_fiscals
|
||
columns = ('UUID', 'serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial',
|
||
'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion',
|
||
'owner_uuid', 'lastModifiedDate')
|
||
|
||
for fr in fr_list:
|
||
if not fr.get('UUID') or not fr.get('FRSerialNumber'):
|
||
log.warning(f"Пропущена запись из SD: отсутствует UUID или FRSerialNumber. Данные: {fr}")
|
||
continue
|
||
|
||
# Извлекаем и нормализуем данные
|
||
owner = fr.get('owner')
|
||
model = fr.get('ModelKKT')
|
||
ofd = fr.get('OFDName')
|
||
ffd = fr.get('FFD')
|
||
|
||
prepared_data.append((
|
||
fr.get('UUID'),
|
||
fr.get('FRSerialNumber'),
|
||
model.get('title') if model else None,
|
||
fr.get('RNKKT'),
|
||
fr.get('LegalName'),
|
||
fr.get('FNNumber'),
|
||
fr.get('KKTRegDate'),
|
||
fr.get('FNExpireDate'),
|
||
ofd.get('title') if ofd else None,
|
||
fr.get('FRDownloader'),
|
||
ffd.get('title') if ffd else None,
|
||
owner.get('UUID') if owner else None,
|
||
fr.get('lastModifiedDate')
|
||
))
|
||
return prepared_data, columns
|
||
|
||
def _fetch_and_cache_lookups(self):
|
||
"""Запрашивает и кэширует UUID справочников из SD."""
|
||
log.info("Кэширование справочников из ServiceDesk...")
|
||
metaclasses = ['ModeliFR', 'SrokiFN', 'FFD']
|
||
for mc in metaclasses:
|
||
try:
|
||
values = self.sd.get_lookup_values(mc)
|
||
# Создаем словарь вида {'Название': 'UUID'}
|
||
self.lookup_cache[mc] = {item['title']: item['UUID'] for item in values}
|
||
except ServiceDeskAPIError as e:
|
||
log.error(f"Не удалось загрузить справочник {mc}: {e}")
|
||
# Если не удалось загрузить критически важные справочники, прерываем работу
|
||
raise
|
||
log.info("Справочники успешно закэшированы.")
|
||
|
||
def run_full_sync(self):
|
||
"""Выполняет полный цикл синхронизации."""
|
||
log.info("--- НАЧАЛО ЦИКЛА СИНХРОНИЗАЦИИ ---")
|
||
try:
|
||
# 1. Кэшируем справочники
|
||
self._fetch_and_cache_lookups()
|
||
|
||
# 2. Очищаем таблицы перед заполнением
|
||
self.db.clear_tables(['pos_fiscals', 'sd_fiscals', 'workstations'])
|
||
|
||
# 3. Сбор и загрузка данных с FTP
|
||
pos_frs_raw = ftp_parser.process_json_files(config.JSON_FILES_PATH)
|
||
pos_frs_data, pos_cols = self._prepare_data_for_db(pos_frs_raw, 'pos')
|
||
self.db.bulk_insert('pos_fiscals', pos_cols, pos_frs_data)
|
||
|
||
# 4. Сбор и загрузка данных из ServiceDesk (ФР)
|
||
sd_frs_raw = self.sd.get_all_frs()
|
||
sd_frs_data, sd_cols = self._prepare_data_for_db(sd_frs_raw, 'sd')
|
||
self.db.bulk_insert('sd_fiscals', sd_cols, sd_frs_data)
|
||
|
||
# 5. Сбор и загрузка данных из ServiceDesk (Рабочие станции)
|
||
workstations_raw = self.sd.get_all_workstations()
|
||
workstations_data = []
|
||
for ws in workstations_raw:
|
||
owner = ws.get('owner')
|
||
if ws.get('UUID') and owner and owner.get('UUID'):
|
||
workstations_data.append((
|
||
ws['UUID'],
|
||
owner['UUID'],
|
||
_clean_sd_remote_id(ws.get('AnyDesk')),
|
||
_clean_sd_remote_id(ws.get('Teamviewer')),
|
||
ws.get('lastModifiedDate')
|
||
))
|
||
self.db.bulk_insert('workstations', ['uuid', 'owner_uuid', 'clean_anydesk_id', 'clean_teamviewer_id', 'lastModifiedDate'], workstations_data)
|
||
|
||
# 6. Логика синхронизации
|
||
self._update_existing_frs()
|
||
self._create_new_frs()
|
||
|
||
log.info("--- ЦИКЛ СИНХРОНИЗАЦИИ УСПЕШНО ЗАВЕРШЕН ---")
|
||
|
||
except Exception as e:
|
||
log.critical(f"Критическая ошибка в цикле синхронизации: {e}", exc_info=True)
|
||
|
||
|
||
def _update_existing_frs(self):
|
||
"""
|
||
Находит и обновляет ФР с отличающимися датами окончания ФН,
|
||
воспроизводя проверенную логику в более производительном виде.
|
||
"""
|
||
log.info("Поиск ФР для обновления...")
|
||
|
||
# 1. SQL-запрос для поиска расхождений.
|
||
# Он объединяет таблицы по serialNumber и выбирает все необходимые поля
|
||
# сразу, чтобы избежать вложенных циклов.
|
||
query = """
|
||
SELECT
|
||
pos.serialNumber,
|
||
pos.dateTime_end AS pos_date_end,
|
||
pos.datetime_reg AS pos_date_reg,
|
||
pos.fn_serial AS pos_fn_serial,
|
||
pos.RNM AS pos_rnm,
|
||
pos.bootVersion AS pos_boot_version,
|
||
pos.organizationName AS pos_org_name,
|
||
pos.INN AS pos_inn,
|
||
sd.UUID AS sd_uuid,
|
||
sd.dateTime_end AS sd_date_end
|
||
FROM pos_fiscals pos
|
||
JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber
|
||
"""
|
||
|
||
records_to_check = self.db._execute_query(query, fetch='all')
|
||
update_counter = 0
|
||
|
||
for rec in records_to_check:
|
||
try:
|
||
# 2. Приводим даты к одному "знаменателю" - объектам datetime
|
||
# dateutil.parser отлично справляется с разными форматами
|
||
# Проверяем, что обе даты существуют, прежде чем их парсить
|
||
if not rec['pos_date_end'] or not rec['sd_date_end']:
|
||
log.warning(f"Пропуск сравнения для S/N {rec['serialNumber']}: одна из дат отсутствует.")
|
||
continue
|
||
|
||
pos_date = parser.parse(rec['pos_date_end'])
|
||
sd_date = parser.parse(rec['sd_date_end'])
|
||
|
||
# 3. Сравниваем даты. Если они различаются, готовим обновление.
|
||
# Проверяем на неравенство. Величина различия не важна.
|
||
if pos_date != sd_date:
|
||
log.info(f"Найдено расхождение в дате для S/N {rec['serialNumber']} (UUID: {rec['sd_uuid']}). "
|
||
f"FTP: {pos_date}, SD: {sd_date}. Подготовка к обновлению.")
|
||
|
||
# 4. Формируем данные для обновления
|
||
|
||
# 5. Проверяем, закончился ли ФН
|
||
if rec['pos_rnm'] == '0000000000000000':
|
||
legal_name = 'ЗАКОНЧИЛСЯ ФН'
|
||
else:
|
||
legal_name = f"{rec['pos_org_name']} ИНН:{rec['pos_inn']}" if rec['pos_org_name'] and rec['pos_inn'] else rec['pos_org_name']
|
||
|
||
# Форматируем даты для API ServiceDesk
|
||
formatted_expire_date = pos_date.strftime('%Y.%m.%d %H:%M:%S')
|
||
# Убеждаемся, что дата регистрации тоже есть, прежде чем форматировать
|
||
formatted_reg_date = parser.parse(rec['pos_date_reg']).strftime('%Y.%m.%d %H:%M:%S') if rec['pos_date_reg'] else None
|
||
|
||
params_to_update = {
|
||
'FNNumber': rec['pos_fn_serial'],
|
||
'FNExpireDate': formatted_expire_date,
|
||
'LegalName': legal_name,
|
||
'RNKKT': rec['pos_rnm'],
|
||
'FRDownloader': rec['pos_boot_version'],
|
||
'KKTRegDate': formatted_reg_date
|
||
}
|
||
|
||
# Удаляем ключи с None-значениями, чтобы не отправлять их в API, если они не обязательны
|
||
params_to_update = {k: v for k, v in params_to_update.items() if v is not None}
|
||
|
||
try:
|
||
self.sd.update_fr(rec['sd_uuid'], params_to_update)
|
||
update_counter += 1
|
||
except ServiceDeskAPIError as e:
|
||
log.error(f"Не удалось обновить ФР с UUID {rec['sd_uuid']}: {e}")
|
||
|
||
except (parser.ParserError, TypeError) as e:
|
||
log.error(f"Ошибка парсинга даты для S/N {rec['serialNumber']}. "
|
||
f"FTP_date='{rec['pos_date_end']}', SD_date='{rec['sd_date_end']}'. Ошибка: {e}")
|
||
continue
|
||
|
||
log.info(f"Проверка обновлений завершена. Обновлено записей: {update_counter}.")
|
||
|
||
def _find_lookup_uuid_by_substring(self, lookup_type: str, substring: str) -> Optional[str]:
|
||
"""
|
||
Ищет UUID в кэше справочников по подстроке в ключе (title).
|
||
Например, ищет '13' в ключе '13 мес.'.
|
||
|
||
:param lookup_type: Тип справочника ('SrokiFN', 'FFD', etc.).
|
||
:param substring: Подстрока для поиска (например, '13').
|
||
:return: UUID или None, если не найдено.
|
||
"""
|
||
if not substring or lookup_type not in self.lookup_cache:
|
||
return None
|
||
|
||
# Проходим по всем парам (title, uuid) в нужном справочнике
|
||
for title, uuid in self.lookup_cache[lookup_type].items():
|
||
# Ищем точное вхождение подстроки, окруженное не-цифрами или границами строки,
|
||
# чтобы '15' не совпало с '150'.
|
||
if re.search(r'\b' + re.escape(substring) + r'\b', title):
|
||
return uuid
|
||
return None
|
||
|
||
|
||
def _create_new_frs(self):
|
||
"""Находит, определяет владельца и создает новые ФР."""
|
||
log.info("Поиск новых ФР для добавления в ServiceDesk...")
|
||
query = """
|
||
SELECT pos.*
|
||
FROM pos_fiscals pos
|
||
LEFT JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber
|
||
WHERE sd.serialNumber IS NULL
|
||
"""
|
||
new_frs = self.db._execute_query(query, fetch='all')
|
||
log.info(f"Найдено {len(new_frs)} новых ФР.")
|
||
|
||
for fr in new_frs:
|
||
owner_uuid = self._find_owner_uuid(fr['anydesk_id'], fr['teamviewer_id'])
|
||
|
||
# Собираем данные для создания
|
||
model_uuid = self.lookup_cache.get('ModeliFR', {}).get(fr['modelName'])
|
||
|
||
srok_fn_str = _extract_srok_fn_from_execution(fr['fnExecution'])
|
||
if not srok_fn_str:
|
||
srok_fn_str = '13'
|
||
log.warning(f"Для S/N {fr['serialNumber']} не удалось определить срок ФН из fnExecution. "
|
||
f"Установлен срок по умолчанию: {srok_fn_str} мес.")
|
||
srok_fn_uuid = self._find_lookup_uuid_by_substring('SrokiFN', srok_fn_str)
|
||
|
||
ffd_version_str = _map_ffd_version(fr['ffdVersion'])
|
||
ffd_uuid = self._find_lookup_uuid_by_substring('FFD', ffd_version_str)
|
||
|
||
if not all([model_uuid, srok_fn_uuid, ffd_uuid]):
|
||
log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: не найдены UUID для справочников. "
|
||
f"Model: {fr['modelName']}({model_uuid}), SrokFN: {srok_fn_str}({srok_fn_uuid}), "
|
||
f"FFD: {ffd_version_str}({ffd_uuid})")
|
||
continue
|
||
|
||
legal_name = f"{fr['organizationName']} ИНН:{fr['INN']}" if fr['organizationName'] and fr['INN'] else ""
|
||
|
||
creation_data = {
|
||
"RNKKT": fr['RNM'],
|
||
"KKTRegDate": parser.parse(fr['datetime_reg']).strftime('%Y.%m.%d %H:%M:%S'),
|
||
"FRSerialNumber": fr['serialNumber'],
|
||
"LegalName": legal_name,
|
||
"FNExpireDate": parser.parse(fr['dateTime_end']).strftime('%Y.%m.%d %H:%M:%S'),
|
||
"FNNumber": fr['fn_serial'],
|
||
"ModelKKT": model_uuid,
|
||
"SrokFN": srok_fn_uuid,
|
||
"FFD": ffd_uuid,
|
||
"owner": owner_uuid
|
||
}
|
||
|
||
try:
|
||
self.sd.create_fr(creation_data)
|
||
except ServiceDeskAPIError as e:
|
||
log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: {e}")
|
||
|
||
def _find_owner_uuid(self, anydesk_id: Optional[str], teamviewer_id: Optional[str]) -> str:
|
||
"""
|
||
Ищет владельца по ID. При наличии нескольких совпадений для одного ID,
|
||
выбирает запись с самой свежей датой lastModifiedDate.
|
||
Возвращает UUID владельца или UUID по умолчанию.
|
||
"""
|
||
|
||
def query_best_owner(id_value: Optional[str], id_type_column: str) -> Optional[str]:
|
||
"""
|
||
Запрашивает всех владельцев по ID и возвращает UUID самого "свежего".
|
||
"""
|
||
if not id_value:
|
||
return None
|
||
|
||
# Запрашиваем всех кандидатов, сортируя по дате в порядке убывания (самые свежие - первые)
|
||
query = f"""
|
||
SELECT owner_uuid, lastModifiedDate
|
||
FROM workstations
|
||
WHERE {id_type_column} = ?
|
||
ORDER BY lastModifiedDate DESC
|
||
"""
|
||
|
||
results = self.db._execute_query(query, (id_value,), fetch='all')
|
||
|
||
if not results:
|
||
return None
|
||
|
||
if len(results) > 1:
|
||
log.warning(f"Найдено несколько ({len(results)}) рабочих станций с {id_type_column} = {id_value}. "
|
||
f"Выбрана самая свежая запись от {results[0]['lastModifiedDate']}.")
|
||
|
||
# Так как мы отсортировали по DESC, первая запись и есть самая свежая
|
||
return results[0]['owner_uuid']
|
||
|
||
owner_by_anydesk = query_best_owner(anydesk_id, 'clean_anydesk_id')
|
||
owner_by_teamviewer = query_best_owner(teamviewer_id, 'clean_teamviewer_id')
|
||
|
||
# Логика выбора остается прежней, но теперь она работает с "лучшими" кандидатами
|
||
if owner_by_anydesk and owner_by_teamviewer and owner_by_anydesk == owner_by_teamviewer:
|
||
log.info(f"Владелец {owner_by_anydesk} уверенно найден по обоим ID: Anydesk={anydesk_id}, TV={teamviewer_id}.")
|
||
return owner_by_anydesk
|
||
|
||
if owner_by_anydesk:
|
||
log.info(f"Владелец {owner_by_anydesk} найден по Anydesk ID: {anydesk_id}.")
|
||
return owner_by_anydesk
|
||
|
||
if owner_by_teamviewer:
|
||
log.info(f"Владелец {owner_by_teamviewer} найден по TeamViewer ID: {teamviewer_id}.")
|
||
return owner_by_teamviewer
|
||
|
||
log.warning(f"Владелец не найден для Anydesk={anydesk_id}, TV={teamviewer_id}. "
|
||
f"Будет назначен владелец по умолчанию: {config.DEFAULT_OWNER_UUID}")
|
||
return config.DEFAULT_OWNER_UUID
|
||
|