Files
rmser/internal/services/worker/sync_worker.go
SERTY 88620f3fb6 0202-финиш перед десктопом
пересчет поправил
редактирование с перепроведением
галка автопроведения работает
рекомендации починил
2026-02-02 13:53:38 +03:00

137 lines
4.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package worker
import (
"context"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"rmser/internal/domain/account"
"rmser/internal/infrastructure/rms"
"rmser/internal/services/recommend"
)
// SyncService интерфейс для синхронизации данных
type SyncService interface {
// SyncAllDataForServer синхронизирует данные для конкретного сервера
SyncAllDataForServer(serverID uuid.UUID, force bool) error
}
// SyncWorker фоновый процесс для автоматической синхронизации данных с iiko серверами
type SyncWorker struct {
syncService SyncService // сервис для синхронизации
accountRepo account.Repository // репозиторий для работы с серверами
rmsFactory *rms.Factory // фабрика для создания клиентов RMS
recService *recommend.Service // сервис рекомендаций
logger *zap.Logger
tickerInterval time.Duration // интервал проверки (например, 1 минута)
idleThreshold time.Duration // порог простоя (10 минут)
}
// NewSyncWorker создает новый экземпляр SyncWorker
func NewSyncWorker(
syncService SyncService,
accountRepo account.Repository,
rmsFactory *rms.Factory,
recService *recommend.Service,
logger *zap.Logger,
) *SyncWorker {
return &SyncWorker{
syncService: syncService,
accountRepo: accountRepo,
rmsFactory: rmsFactory,
recService: recService,
logger: logger,
tickerInterval: 1 * time.Minute,
idleThreshold: 10 * time.Minute,
}
}
// Run запускает фоновый процесс синхронизации
func (w *SyncWorker) Run(ctx context.Context) {
w.logger.Info("Запуск SyncWorker",
zap.Duration("ticker_interval", w.tickerInterval),
zap.Duration("idle_threshold", w.idleThreshold))
ticker := time.NewTicker(w.tickerInterval)
defer ticker.Stop()
// Первый запуск сразу
w.processSync(ctx)
for {
select {
case <-ctx.Done():
w.logger.Info("Остановка SyncWorker")
return
case <-ticker.C:
w.processSync(ctx)
}
}
}
// processSync обрабатывает синхронизацию для всех серверов, готовых к синхронизации
func (w *SyncWorker) processSync(ctx context.Context) {
// Получаем серверы, готовые для синхронизации
servers, err := w.accountRepo.GetServersForSync(w.idleThreshold)
if err != nil {
w.logger.Error("Ошибка получения серверов для синхронизации", zap.Error(err))
return
}
if len(servers) == 0 {
return
}
w.logger.Info("Найдены серверы для синхронизации",
zap.Int("count", len(servers)))
for _, server := range servers {
// Обрабатываем каждый сервер в отдельной горутине
go w.syncServer(ctx, server)
}
}
// syncServer выполняет синхронизацию для конкретного сервера
func (w *SyncWorker) syncServer(ctx context.Context, server account.RMSServer) {
defer func() {
if r := recover(); r != nil {
w.logger.Error("Паника при синхронизации сервера",
zap.String("server_id", server.ID.String()),
zap.Any("recover", r))
}
}()
w.logger.Info("Начало синхронизации сервера",
zap.String("server_id", server.ID.String()),
zap.String("server_name", server.Name))
// Вызываем синхронизацию через syncService
err := w.syncService.SyncAllDataForServer(server.ID, false)
if err != nil {
w.logger.Error("Ошибка синхронизации сервера",
zap.String("server_id", server.ID.String()),
zap.Error(err))
return
}
// Обновляем время последней синхронизации
err = w.accountRepo.UpdateLastSync(server.ID)
if err != nil {
w.logger.Error("Ошибка обновления времени синхронизации",
zap.String("server_id", server.ID.String()),
zap.Error(err))
}
// Обновляем рекомендации после успешной синхронизации
if err := w.recService.RefreshRecommendations(server.ID); err != nil {
w.logger.Error("Ошибка обновления рекомендаций",
zap.String("server_id", server.ID.String()),
zap.Error(err))
}
w.logger.Info("Синхронизация сервера завершена успешно",
zap.String("server_id", server.ID.String()))
}