# database.py import sqlite3 import logging # Настройка логирования для модуля БД log = logging.getLogger(__name__) class DatabaseManager: """ Класс для управления всеми операциями с базой данных SQLite. Обеспечивает централизованное подключение и выполнение запросов. """ def __init__(self, db_path): self.db_path = db_path self.conn = None def __enter__(self): """Открывает соединение с БД при входе в контекстный менеджер.""" try: self.conn = sqlite3.connect(self.db_path) # Устанавливаем row_factory для получения результатов в виде словарей self.conn.row_factory = sqlite3.Row log.info(f"Соединение с БД {self.db_path} установлено.") return self except sqlite3.Error as e: log.error(f"Ошибка подключения к БД: {e}") raise def __exit__(self, exc_type, exc_val, exc_tb): """Закрывает соединение при выходе из контекстного менеджера.""" if self.conn: self.conn.close() log.info("Соединение с БД закрыто.") def _execute_query(self, query, params=(), fetch=None): """ Приватный метод для выполнения SQL-запросов. Использует параметризацию для предотвращения SQL-инъекций. :param query: SQL-запрос. :param params: Кортеж с параметрами для запроса. :param fetch: 'one', 'all' или None для коммита. """ try: cursor = self.conn.cursor() cursor.execute(query, params) if fetch == 'one': return cursor.fetchone() elif fetch == 'all': return cursor.fetchall() else: self.conn.commit() return cursor.lastrowid except sqlite3.Error as e: log.error(f"Ошибка выполнения запроса: {query} | {e}") self.conn.rollback() # Откатываем транзакцию в случае ошибки raise def setup_tables(self): """Создает все необходимые таблицы, если они не существуют.""" log.info("Проверка и создание таблиц в БД...") # Таблица для данных с FTP self._execute_query(""" CREATE TABLE IF NOT EXISTS pos_fiscals ( serialNumber TEXT PRIMARY KEY, modelName TEXT, RNM TEXT, organizationName TEXT, fn_serial TEXT, datetime_reg TEXT, dateTime_end TEXT, ofdName TEXT, bootVersion TEXT, ffdVersion TEXT, fnExecution TEXT, INN TEXT, anydesk_id TEXT, teamviewer_id TEXT, lastModifiedDate TEXT )""") # Таблица для данных из ServiceDesk self._execute_query(""" CREATE TABLE IF NOT EXISTS sd_fiscals ( UUID TEXT PRIMARY KEY, serialNumber TEXT UNIQUE, modelName TEXT, RNM TEXT, organizationName TEXT, fn_serial TEXT, datetime_reg TEXT, dateTime_end TEXT, ofdName TEXT, bootVersion TEXT, ffdVersion TEXT, owner_uuid TEXT, lastModifiedDate TEXT )""") # Новая таблица для рабочих станций self._execute_query(""" CREATE TABLE IF NOT EXISTS workstations ( uuid TEXT PRIMARY KEY, owner_uuid TEXT, clean_anydesk_id TEXT, clean_teamviewer_id TEXT )""") # Новая таблица для кэширования UUID справочников self._execute_query(""" CREATE TABLE IF NOT EXISTS sd_lookups ( lookup_type TEXT, -- 'ModelKKT', 'FFD', etc. title TEXT, -- 'АТОЛ 25Ф', '1.2', etc. uuid TEXT, PRIMARY KEY (lookup_type, title) )""") log.info("Все таблицы успешно созданы/проверены.") def clear_tables(self, table_names: list): """Очищает указанные таблицы перед каждым циклом синхронизации.""" log.info(f"Очистка таблиц: {', '.join(table_names)}") for table in table_names: # Проверяем, что имя таблицы "безопасное" if table.isalnum(): self._execute_query(f"DELETE FROM {table}") else: log.warning(f"Попытка очистить таблицу с некорректным именем: {table}") def bulk_insert(self, table_name: str, columns: list, data: list): """ Выполняет массовую вставку данных в таблицу. :param table_name: Имя таблицы. :param columns: Список названий колонок. :param data: Список кортежей с данными для вставки. """ if not data: return placeholders = ', '.join(['?'] * len(columns)) cols = ', '.join(columns) query = f"INSERT OR REPLACE INTO {table_name} ({cols}) VALUES ({placeholders})" try: cursor = self.conn.cursor() cursor.executemany(query, data) self.conn.commit() log.info(f"Вставлено {len(data)} записей в таблицу {table_name}.") except sqlite3.Error as e: log.error(f"Ошибка массовой вставки в {table_name}: {e}") self.conn.rollback() raise