package worker import ( "context" "time" "github.com/google/uuid" "go.uber.org/zap" "rmser/internal/domain/account" "rmser/internal/infrastructure/rms" "rmser/internal/services/recommend" ) // SyncService интерфейс для синхронизации данных type SyncService interface { // SyncAllDataForServer синхронизирует данные для конкретного сервера SyncAllDataForServer(serverID uuid.UUID, force bool) error } // SyncWorker фоновый процесс для автоматической синхронизации данных с iiko серверами type SyncWorker struct { syncService SyncService // сервис для синхронизации accountRepo account.Repository // репозиторий для работы с серверами rmsFactory *rms.Factory // фабрика для создания клиентов RMS recService *recommend.Service // сервис рекомендаций logger *zap.Logger tickerInterval time.Duration // интервал проверки (например, 1 минута) idleThreshold time.Duration // порог простоя (10 минут) } // NewSyncWorker создает новый экземпляр SyncWorker func NewSyncWorker( syncService SyncService, accountRepo account.Repository, rmsFactory *rms.Factory, recService *recommend.Service, logger *zap.Logger, ) *SyncWorker { return &SyncWorker{ syncService: syncService, accountRepo: accountRepo, rmsFactory: rmsFactory, recService: recService, logger: logger, tickerInterval: 1 * time.Minute, idleThreshold: 10 * time.Minute, } } // Run запускает фоновый процесс синхронизации func (w *SyncWorker) Run(ctx context.Context) { w.logger.Info("Запуск SyncWorker", zap.Duration("ticker_interval", w.tickerInterval), zap.Duration("idle_threshold", w.idleThreshold)) ticker := time.NewTicker(w.tickerInterval) defer ticker.Stop() // Первый запуск сразу w.processSync(ctx) for { select { case <-ctx.Done(): w.logger.Info("Остановка SyncWorker") return case <-ticker.C: w.processSync(ctx) } } } // processSync обрабатывает синхронизацию для всех серверов, готовых к синхронизации func (w *SyncWorker) processSync(ctx context.Context) { // Получаем серверы, готовые для синхронизации servers, err := w.accountRepo.GetServersForSync(w.idleThreshold) if err != nil { w.logger.Error("Ошибка получения серверов для синхронизации", zap.Error(err)) return } if len(servers) == 0 { return } w.logger.Info("Найдены серверы для синхронизации", zap.Int("count", len(servers))) for _, server := range servers { // Обрабатываем каждый сервер в отдельной горутине go w.syncServer(ctx, server) } } // syncServer выполняет синхронизацию для конкретного сервера func (w *SyncWorker) syncServer(ctx context.Context, server account.RMSServer) { defer func() { if r := recover(); r != nil { w.logger.Error("Паника при синхронизации сервера", zap.String("server_id", server.ID.String()), zap.Any("recover", r)) } }() w.logger.Info("Начало синхронизации сервера", zap.String("server_id", server.ID.String()), zap.String("server_name", server.Name)) // Вызываем синхронизацию через syncService err := w.syncService.SyncAllDataForServer(server.ID, false) if err != nil { w.logger.Error("Ошибка синхронизации сервера", zap.String("server_id", server.ID.String()), zap.Error(err)) return } // Обновляем время последней синхронизации err = w.accountRepo.UpdateLastSync(server.ID) if err != nil { w.logger.Error("Ошибка обновления времени синхронизации", zap.String("server_id", server.ID.String()), zap.Error(err)) } // Обновляем рекомендации после успешной синхронизации if err := w.recService.RefreshRecommendations(server.ID); err != nil { w.logger.Error("Ошибка обновления рекомендаций", zap.String("server_id", server.ID.String()), zap.Error(err)) } w.logger.Info("Синхронизация сервера завершена успешно", zap.String("server_id", server.ID.String())) }