# sync_logic.py import re import logging from typing import Dict, List, Optional, Any from datetime import datetime, timedelta from dateutil import parser # Импортируем наши модули import config from database import DatabaseManager from sd_api import ServiceDeskClient, ServiceDeskAPIError import ftp_parser log = logging.getLogger(__name__) def _clean_sd_remote_id(raw_id: Optional[str]) -> Optional[str]: """ Очищает ID удаленного доступа, полученный из ServiceDesk.. """ if not raw_id or raw_id.lower() == 'none': return None no_spaces_id = re.sub(r'\s+', '', raw_id) match = re.search(r'\d+', no_spaces_id) return match.group(0) if match else None def _extract_srok_fn_from_execution(fn_execution: Optional[str]) -> Optional[str]: """Извлекает срок ФН (13, 15, 36) из строки fnExecution.""" if not fn_execution: return None # Ищем числа 13, 15 или 36 в строке match = re.search(r'(13|15|36)', fn_execution) return match.group(1) if match else None def _map_ffd_version(ffd_from_ftp: Optional[str]) -> Optional[str]: """Сопоставляет версию ФФД из FTP ('120') с версией в SD ('1.2').""" if ffd_from_ftp == '120': return '1.2' if ffd_from_ftp == '105': return '1.05' # Можно добавить другие сопоставления, если появятся return ffd_from_ftp # Возвращаем как есть, если нет сопоставления class Synchronizer: """ Класс, инкапсулирующий всю логику синхронизации данных. """ def __init__(self, db_manager: DatabaseManager, sd_client: ServiceDeskClient): self.db = db_manager self.sd = sd_client self.lookup_cache: Dict[str, Dict[str, str]] = {} def _prepare_data_for_db(self, fr_list: List[Dict], source: str) -> List[tuple]: """ Преобразует список словарей в список кортежей для массовой вставки в БД. Также логирует отсутствующие, но важные поля. """ prepared_data = [] if source == 'pos': # Ожидаемые колонки для pos_fiscals columns = ('serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial', 'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion', 'fnExecution', 'INN', 'anydesk_id', 'teamviewer_id', 'lastModifiedDate') for fr in fr_list: # Проверка на наличие обязательных полей if not all(fr.get(key) for key in ['serialNumber', 'modelName', 'dateTime_end']): log.warning(f"Пропущена запись из FTP: отсутствует одно из ключевых полей " f"(serialNumber, modelName, dateTime_end). Данные: {fr}") continue prepared_data.append(tuple(fr.get(col) for col in columns)) elif source == 'sd': # Ожидаемые колонки для sd_fiscals columns = ('UUID', 'serialNumber', 'modelName', 'RNM', 'organizationName', 'fn_serial', 'datetime_reg', 'dateTime_end', 'ofdName', 'bootVersion', 'ffdVersion', 'owner_uuid', 'lastModifiedDate') for fr in fr_list: if not fr.get('UUID') or not fr.get('FRSerialNumber'): log.warning(f"Пропущена запись из SD: отсутствует UUID или FRSerialNumber. Данные: {fr}") continue # Извлекаем и нормализуем данные owner = fr.get('owner') model = fr.get('ModelKKT') ofd = fr.get('OFDName') ffd = fr.get('FFD') prepared_data.append(( fr.get('UUID'), fr.get('FRSerialNumber'), model.get('title') if model else None, fr.get('RNKKT'), fr.get('LegalName'), fr.get('FNNumber'), fr.get('KKTRegDate'), fr.get('FNExpireDate'), ofd.get('title') if ofd else None, fr.get('FRDownloader'), ffd.get('title') if ffd else None, owner.get('UUID') if owner else None, fr.get('lastModifiedDate') )) return prepared_data, columns def _fetch_and_cache_lookups(self): """Запрашивает и кэширует UUID справочников из SD.""" log.info("Кэширование справочников из ServiceDesk...") metaclasses = ['ModeliFR', 'SrokiFN', 'FFD'] for mc in metaclasses: try: values = self.sd.get_lookup_values(mc) # Создаем словарь вида {'Название': 'UUID'} self.lookup_cache[mc] = {item['title']: item['UUID'] for item in values} except ServiceDeskAPIError as e: log.error(f"Не удалось загрузить справочник {mc}: {e}") # Если не удалось загрузить критически важные справочники, прерываем работу raise log.info("Справочники успешно закэшированы.") def run_full_sync(self): """Выполняет полный цикл синхронизации.""" log.info("--- НАЧАЛО ЦИКЛА СИНХРОНИЗАЦИИ ---") try: # 1. Кэшируем справочники self._fetch_and_cache_lookups() # 2. Очищаем таблицы перед заполнением self.db.clear_tables(['pos_fiscals', 'sd_fiscals', 'workstations']) # 3. Сбор и загрузка данных с FTP pos_frs_raw = ftp_parser.process_json_files(config.JSON_FILES_PATH) pos_frs_data, pos_cols = self._prepare_data_for_db(pos_frs_raw, 'pos') self.db.bulk_insert('pos_fiscals', pos_cols, pos_frs_data) # 4. Сбор и загрузка данных из ServiceDesk (ФР) sd_frs_raw = self.sd.get_all_frs() sd_frs_data, sd_cols = self._prepare_data_for_db(sd_frs_raw, 'sd') self.db.bulk_insert('sd_fiscals', sd_cols, sd_frs_data) # 5. Сбор и загрузка данных из ServiceDesk (Рабочие станции) workstations_raw = self.sd.get_all_workstations() workstations_data = [] for ws in workstations_raw: owner = ws.get('owner') if ws.get('UUID') and owner and owner.get('UUID'): workstations_data.append(( ws['UUID'], owner['UUID'], _clean_sd_remote_id(ws.get('AnyDesk')), _clean_sd_remote_id(ws.get('Teamviewer')), ws.get('lastModifiedDate') )) self.db.bulk_insert('workstations', ['uuid', 'owner_uuid', 'clean_anydesk_id', 'clean_teamviewer_id', 'lastModifiedDate'], workstations_data) # 6. Логика синхронизации self._update_existing_frs() self._create_new_frs() log.info("--- ЦИКЛ СИНХРОНИЗАЦИИ УСПЕШНО ЗАВЕРШЕН ---") except Exception as e: log.critical(f"Критическая ошибка в цикле синхронизации: {e}", exc_info=True) def _update_existing_frs(self): """ Находит и обновляет ФР с отличающимися датами окончания ФН. Пропускает обновление, если разница в датах более 10 лет. """ log.info("Поиск ФР для обновления...") # 1. SQL-запрос для поиска расхождений. # Он объединяет таблицы по serialNumber и выбирает все необходимые поля # сразу, чтобы избежать вложенных циклов. query = """ SELECT pos.serialNumber, pos.dateTime_end AS pos_date_end, pos.datetime_reg AS pos_date_reg, pos.fn_serial AS pos_fn_serial, pos.RNM AS pos_rnm, pos.bootVersion AS pos_boot_version, pos.organizationName AS pos_org_name, pos.INN AS pos_inn, sd.UUID AS sd_uuid, sd.dateTime_end AS sd_date_end FROM pos_fiscals pos JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber """ records_to_check = self.db._execute_query(query, fetch='all') update_counter = 0 #Порог для сравнения дат archive_delta = timedelta(days=365*10) for rec in records_to_check: try: # 2. Приводим даты к одному "знаменателю" - объектам datetime # dateutil.parser отлично справляется с разными форматами # Проверяем, что обе даты существуют, прежде чем их парсить if not rec['pos_date_end'] or not rec['sd_date_end']: log.warning(f"Пропуск сравнения для S/N {rec['serialNumber']}: одна из дат отсутствует.") continue pos_date = parser.parse(rec['pos_date_end']) sd_date = parser.parse(rec['sd_date_end']) # 3. Сравниваем даты. Если они различаются, готовим обновление. # Проверяем на неравенство. Величина различия не важна. if pos_date != sd_date: # Проверяем, что разница не больше 10 лет if abs(pos_date - sd_date) > archive_delta: log.warning(f"Пропуск сравнения для S/N {rec['serialNumber']}: разница в датах больше 10 лет." f"FTP: {pos_date.date()}, SD: {sd_date.date()}.") continue log.info(f"Найдено расхождение в дате для S/N {rec['serialNumber']} (UUID: {rec['sd_uuid']}). " f"FTP: {pos_date}, SD: {sd_date}. Подготовка к обновлению.") # 4. Формируем данные для обновления # Проверяем, закончился ли ФН if rec['pos_rnm'] == '0000000000000000': legal_name = 'ЗАКОНЧИЛСЯ ФН' else: legal_name = f"{rec['pos_org_name']} ИНН:{rec['pos_inn']}" if rec['pos_org_name'] and rec['pos_inn'] else rec['pos_org_name'] # Форматируем даты для API ServiceDesk formatted_expire_date = pos_date.strftime('%Y.%m.%d %H:%M:%S') # Убеждаемся, что дата регистрации тоже есть, прежде чем форматировать formatted_reg_date = parser.parse(rec['pos_date_reg']).strftime('%Y.%m.%d %H:%M:%S') if rec['pos_date_reg'] else None params_to_update = { 'FNNumber': rec['pos_fn_serial'], 'FNExpireDate': formatted_expire_date, 'LegalName': legal_name, 'RNKKT': rec['pos_rnm'], 'FRDownloader': rec['pos_boot_version'], 'KKTRegDate': formatted_reg_date } # Удаляем ключи с None-значениями, чтобы не отправлять их в API, если они не обязательны params_to_update = {k: v for k, v in params_to_update.items() if v is not None} try: self.sd.update_fr(rec['sd_uuid'], params_to_update) update_counter += 1 except ServiceDeskAPIError as e: log.error(f"Не удалось обновить ФР с UUID {rec['sd_uuid']}: {e}") except (parser.ParserError, TypeError) as e: log.error(f"Ошибка парсинга даты для S/N {rec['serialNumber']}. " f"FTP_date='{rec['pos_date_end']}', SD_date='{rec['sd_date_end']}'. Ошибка: {e}") continue log.info(f"Проверка обновлений завершена. Обновлено записей: {update_counter}.") def _find_lookup_uuid_by_substring(self, lookup_type: str, substring: str) -> Optional[str]: """ Ищет UUID в кэше справочников по подстроке в ключе (title). Например, ищет '13' в ключе '13 мес.'. :param lookup_type: Тип справочника ('SrokiFN', 'FFD', etc.). :param substring: Подстрока для поиска (например, '13'). :return: UUID или None, если не найдено. """ if not substring or lookup_type not in self.lookup_cache: return None # Проходим по всем парам (title, uuid) в нужном справочнике for title, uuid in self.lookup_cache[lookup_type].items(): # Ищем точное вхождение подстроки, окруженное не-цифрами или границами строки, # чтобы '15' не совпало с '150'. if re.search(r'\b' + re.escape(substring) + r'\b', title): return uuid return None def _create_new_frs(self): """Находит, определяет владельца и создает новые ФР.""" log.info("Поиск новых ФР для добавления в ServiceDesk...") query = """ SELECT pos.* FROM pos_fiscals pos LEFT JOIN sd_fiscals sd ON pos.serialNumber = sd.serialNumber WHERE sd.serialNumber IS NULL """ new_frs = self.db._execute_query(query, fetch='all') log.info(f"Найдено {len(new_frs)} новых ФР.") for fr in new_frs: owner_uuid = self._find_owner_uuid(fr['anydesk_id'], fr['teamviewer_id']) # Собираем данные для создания model_uuid = self.lookup_cache.get('ModeliFR', {}).get(fr['modelName']) srok_fn_str = _extract_srok_fn_from_execution(fr['fnExecution']) if not srok_fn_str: srok_fn_str = '13' log.warning(f"Для S/N {fr['serialNumber']} не удалось определить срок ФН из fnExecution. " f"Установлен срок по умолчанию: {srok_fn_str} мес.") srok_fn_uuid = self._find_lookup_uuid_by_substring('SrokiFN', srok_fn_str) ffd_version_str = _map_ffd_version(fr['ffdVersion']) ffd_uuid = self._find_lookup_uuid_by_substring('FFD', ffd_version_str) if not all([model_uuid, srok_fn_uuid, ffd_uuid]): log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: не найдены UUID для справочников. " f"Model: {fr['modelName']}({model_uuid}), SrokFN: {srok_fn_str}({srok_fn_uuid}), " f"FFD: {ffd_version_str}({ffd_uuid})") continue legal_name = f"{fr['organizationName']} ИНН:{fr['INN']}" if fr['organizationName'] and fr['INN'] else "" creation_data = { "RNKKT": fr['RNM'], "KKTRegDate": parser.parse(fr['datetime_reg']).strftime('%Y.%m.%d %H:%M:%S'), "FRSerialNumber": fr['serialNumber'], "LegalName": legal_name, "FNExpireDate": parser.parse(fr['dateTime_end']).strftime('%Y.%m.%d %H:%M:%S'), "FNNumber": fr['fn_serial'], "ModelKKT": model_uuid, "SrokFN": srok_fn_uuid, "FFD": ffd_uuid, "owner": owner_uuid } try: self.sd.create_fr(creation_data) except ServiceDeskAPIError as e: log.error(f"Не удалось создать ФР с S/N {fr['serialNumber']}: {e}") def _find_owner_uuid(self, anydesk_id: Optional[str], teamviewer_id: Optional[str]) -> str: """ Ищет владельца по ID. При наличии нескольких совпадений для одного ID, выбирает запись с самой свежей датой lastModifiedDate. Возвращает UUID владельца или UUID по умолчанию. """ def query_best_owner(id_value: Optional[str], id_type_column: str) -> Optional[str]: """ Запрашивает всех владельцев по ID и возвращает UUID самого "свежего". """ if not id_value: return None # Запрашиваем всех кандидатов, сортируя по дате в порядке убывания (самые свежие - первые) query = f""" SELECT owner_uuid, lastModifiedDate FROM workstations WHERE {id_type_column} = ? ORDER BY lastModifiedDate DESC """ results = self.db._execute_query(query, (id_value,), fetch='all') if not results: return None if len(results) > 1: log.warning(f"Найдено несколько ({len(results)}) рабочих станций с {id_type_column} = {id_value}. " f"Выбрана самая свежая запись от {results[0]['lastModifiedDate']}.") # Так как мы отсортировали по DESC, первая запись и есть самая свежая return results[0]['owner_uuid'] owner_by_anydesk = query_best_owner(anydesk_id, 'clean_anydesk_id') owner_by_teamviewer = query_best_owner(teamviewer_id, 'clean_teamviewer_id') # Логика выбора остается прежней, но теперь она работает с "лучшими" кандидатами if owner_by_anydesk and owner_by_teamviewer and owner_by_anydesk == owner_by_teamviewer: log.info(f"Владелец {owner_by_anydesk} уверенно найден по обоим ID: Anydesk={anydesk_id}, TV={teamviewer_id}.") return owner_by_anydesk if owner_by_anydesk: log.info(f"Владелец {owner_by_anydesk} найден по Anydesk ID: {anydesk_id}.") return owner_by_anydesk if owner_by_teamviewer: log.info(f"Владелец {owner_by_teamviewer} найден по TeamViewer ID: {teamviewer_id}.") return owner_by_teamviewer log.warning(f"Владелец не найден для Anydesk={anydesk_id}, TV={teamviewer_id}. " f"Будет назначен владелец по умолчанию: {config.DEFAULT_OWNER_UUID}") return config.DEFAULT_OWNER_UUID