package worker import ( "context" "fmt" "log/slog" "time" "github.com/TopherMayor/unified-media-manager/internal/config" "github.com/TopherMayor/unified-media-manager/internal/db" "github.com/TopherMayor/unified-media-manager/internal/service" ) type QueueProcessor struct { database *db.DB importSvc *service.ImportService dcSvc *service.DownloadClientService cfg *config.Config activitySvc *service.ActivityService } func NewQueueProcessor(database *db.DB, importSvc *service.ImportService, dcSvc *service.DownloadClientService, cfg *config.Config, activitySvc *service.ActivityService) *QueueProcessor { return &QueueProcessor{ database: database, importSvc: importSvc, dcSvc: dcSvc, cfg: cfg, activitySvc: activitySvc, } } func (w *QueueProcessor) Name() string { return "queue_processor" } func (w *QueueProcessor) CronExpr() string { return w.cfg.WorkerQueueInterval } func (w *QueueProcessor) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() progressUpdated := 0 errors := 0 nzbClients, err := w.dcSvc.GetAllEnabled(ctx, "nzb") if err != nil { slog.Error("failed to get nzb clients", "error", err) } torrentClients, err := w.dcSvc.GetAllEnabled(ctx, "torrent") if err != nil { slog.Error("failed to get torrent clients", "error", err) } allClients := append(nzbClients, torrentClients...) for _, client := range allClients { rows, err := w.database.Pool.Query(ctx, `SELECT id, download_id FROM download_queue WHERE status = 'downloading' AND download_client = $1 AND download_id IS NOT NULL AND download_id != ''`, client.Config.Name) if err != nil { slog.Error("failed to query active downloads", "client", client.Config.Name, "error", err) continue } type queueItem struct { id int64 downloadID string } var items []queueItem for rows.Next() { var item queueItem if err := rows.Scan(&item.id, &item.downloadID); err != nil { slog.Error("failed to scan queue item", "error", err) continue } items = append(items, item) } rows.Close() for _, item := range items { progress, err := client.Client.GetProgress(ctx, item.downloadID) if err != nil { slog.Error("failed to get download progress", "id", item.downloadID, "error", err) errors++ continue } // Detect failed downloads if progress.Status == "failed" || progress.Status == "error" { if w.activitySvc != nil { w.activitySvc.LogAsync(service.LogEntry{ EventType: "download_failed", Title: fmt.Sprintf("Download failed: %s", item.downloadID), Data: []byte(fmt.Sprintf(`{"queue_id":%d,"download_id":"%s"}`, item.id, item.downloadID)), }) } } _, err = w.database.Pool.Exec(ctx, "UPDATE download_queue SET progress = $1, updated_at = NOW() WHERE id = $2", progress.Progress, item.id) if err != nil { slog.Error("failed to update progress", "id", item.id, "error", err) errors++ continue } progressUpdated++ } } report, err := w.importSvc.ProcessCompleted(ctx) if err != nil { slog.Error("failed to process completed downloads", "error", err) errors++ } imported := 0 if report != nil { imported = report.Imported } slog.Info("queue processor completed", "progress_updated", progressUpdated, "completed_imported", imported, "errors", errors) return nil }