mirror of
https://github.com/serty2005/rmser.git
synced 2026-02-04 19:02:33 -06:00
пересчет поправил редактирование с перепроведением галка автопроведения работает рекомендации починил
137 lines
4.6 KiB
Go
137 lines
4.6 KiB
Go
package worker
|
||
|
||
import (
|
||
"context"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"go.uber.org/zap"
|
||
|
||
"rmser/internal/domain/account"
|
||
"rmser/internal/infrastructure/rms"
|
||
"rmser/internal/services/recommend"
|
||
)
|
||
|
||
// SyncService интерфейс для синхронизации данных
|
||
type SyncService interface {
|
||
// SyncAllDataForServer синхронизирует данные для конкретного сервера
|
||
SyncAllDataForServer(serverID uuid.UUID, force bool) error
|
||
}
|
||
|
||
// SyncWorker фоновый процесс для автоматической синхронизации данных с iiko серверами
|
||
type SyncWorker struct {
|
||
syncService SyncService // сервис для синхронизации
|
||
accountRepo account.Repository // репозиторий для работы с серверами
|
||
rmsFactory *rms.Factory // фабрика для создания клиентов RMS
|
||
recService *recommend.Service // сервис рекомендаций
|
||
logger *zap.Logger
|
||
tickerInterval time.Duration // интервал проверки (например, 1 минута)
|
||
idleThreshold time.Duration // порог простоя (10 минут)
|
||
}
|
||
|
||
// NewSyncWorker создает новый экземпляр SyncWorker
|
||
func NewSyncWorker(
|
||
syncService SyncService,
|
||
accountRepo account.Repository,
|
||
rmsFactory *rms.Factory,
|
||
recService *recommend.Service,
|
||
logger *zap.Logger,
|
||
) *SyncWorker {
|
||
return &SyncWorker{
|
||
syncService: syncService,
|
||
accountRepo: accountRepo,
|
||
rmsFactory: rmsFactory,
|
||
recService: recService,
|
||
logger: logger,
|
||
tickerInterval: 1 * time.Minute,
|
||
idleThreshold: 10 * time.Minute,
|
||
}
|
||
}
|
||
|
||
// Run запускает фоновый процесс синхронизации
|
||
func (w *SyncWorker) Run(ctx context.Context) {
|
||
w.logger.Info("Запуск SyncWorker",
|
||
zap.Duration("ticker_interval", w.tickerInterval),
|
||
zap.Duration("idle_threshold", w.idleThreshold))
|
||
|
||
ticker := time.NewTicker(w.tickerInterval)
|
||
defer ticker.Stop()
|
||
|
||
// Первый запуск сразу
|
||
w.processSync(ctx)
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
w.logger.Info("Остановка SyncWorker")
|
||
return
|
||
case <-ticker.C:
|
||
w.processSync(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
// processSync обрабатывает синхронизацию для всех серверов, готовых к синхронизации
|
||
func (w *SyncWorker) processSync(ctx context.Context) {
|
||
// Получаем серверы, готовые для синхронизации
|
||
servers, err := w.accountRepo.GetServersForSync(w.idleThreshold)
|
||
if err != nil {
|
||
w.logger.Error("Ошибка получения серверов для синхронизации", zap.Error(err))
|
||
return
|
||
}
|
||
|
||
if len(servers) == 0 {
|
||
return
|
||
}
|
||
|
||
w.logger.Info("Найдены серверы для синхронизации",
|
||
zap.Int("count", len(servers)))
|
||
|
||
for _, server := range servers {
|
||
// Обрабатываем каждый сервер в отдельной горутине
|
||
go w.syncServer(ctx, server)
|
||
}
|
||
}
|
||
|
||
// syncServer выполняет синхронизацию для конкретного сервера
|
||
func (w *SyncWorker) syncServer(ctx context.Context, server account.RMSServer) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
w.logger.Error("Паника при синхронизации сервера",
|
||
zap.String("server_id", server.ID.String()),
|
||
zap.Any("recover", r))
|
||
}
|
||
}()
|
||
|
||
w.logger.Info("Начало синхронизации сервера",
|
||
zap.String("server_id", server.ID.String()),
|
||
zap.String("server_name", server.Name))
|
||
|
||
// Вызываем синхронизацию через syncService
|
||
err := w.syncService.SyncAllDataForServer(server.ID, false)
|
||
if err != nil {
|
||
w.logger.Error("Ошибка синхронизации сервера",
|
||
zap.String("server_id", server.ID.String()),
|
||
zap.Error(err))
|
||
return
|
||
}
|
||
|
||
// Обновляем время последней синхронизации
|
||
err = w.accountRepo.UpdateLastSync(server.ID)
|
||
if err != nil {
|
||
w.logger.Error("Ошибка обновления времени синхронизации",
|
||
zap.String("server_id", server.ID.String()),
|
||
zap.Error(err))
|
||
}
|
||
|
||
// Обновляем рекомендации после успешной синхронизации
|
||
if err := w.recService.RefreshRecommendations(server.ID); err != nil {
|
||
w.logger.Error("Ошибка обновления рекомендаций",
|
||
zap.String("server_id", server.ID.String()),
|
||
zap.Error(err))
|
||
}
|
||
|
||
w.logger.Info("Синхронизация сервера завершена успешно",
|
||
zap.String("server_id", server.ID.String()))
|
||
}
|