package worker import ( "context" "log/slog" "time" "github.com/TopherMayor/unified-media-manager/internal/config" "github.com/TopherMayor/unified-media-manager/internal/db" "github.com/TopherMayor/unified-media-manager/internal/service" ) type RSSSyncWorker struct { database *db.DB mediaSvc *service.MediaService searchSvc *service.SearchService dcSvc *service.DownloadClientService qualitySvc *service.QualityService cfg *config.Config } func NewRSSSyncWorker(database *db.DB, mediaSvc *service.MediaService, searchSvc *service.SearchService, dcSvc *service.DownloadClientService, qualitySvc *service.QualityService, cfg *config.Config) *RSSSyncWorker { return &RSSSyncWorker{ database: database, mediaSvc: mediaSvc, searchSvc: searchSvc, dcSvc: dcSvc, qualitySvc: qualitySvc, cfg: cfg, } } func (w *RSSSyncWorker) Name() string { return "rss_sync" } func (w *RSSSyncWorker) CronExpr() string { return w.cfg.WorkerRSSSyncInterval } func (w *RSSSyncWorker) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() searched := 0 grabs := 0 errors := 0 seen := make(map[int64]bool) missing, _, err := w.mediaSvc.SearchMissing(ctx, service.MediaFilters{PageSize: 100}) if err != nil { slog.Error("failed to search missing media", "error", err) errors++ } for i := range missing { seen[missing[i].ID] = true } upgrades, _, err := w.mediaSvc.SearchUpgrades(ctx, service.MediaFilters{PageSize: 100}) if err != nil { slog.Error("failed to search upgrade media", "error", err) errors++ } for i := range upgrades { seen[upgrades[i].ID] = true } var allItems []service.Media for _, m := range missing { allItems = append(allItems, m) } for _, m := range upgrades { if !seen[m.ID] { allItems = append(allItems, m) seen[m.ID] = true } } for _, item := range allItems { var hasPending bool err := w.database.Pool.QueryRow(ctx, `SELECT EXISTS(SELECT 1 FROM download_queue WHERE media_id = $1 AND status IN ('pending', 'downloading'))`, item.ID).Scan(&hasPending) if err != nil { slog.Error("failed to check pending queue", "media_id", item.ID, "error", err) continue } if hasPending { continue } _, err = w.database.Pool.Exec(ctx, "UPDATE media SET last_search_at = NOW() WHERE id = $1", item.ID) if err != nil { slog.Error("failed to update last_search_at", "media_id", item.ID, "error", err) } results, err := w.searchSvc.Search(ctx, service.SearchRequest{ Query: item.Title, MediaType: item.MediaType, }) if err != nil { slog.Error("failed to search indexers", "media_id", item.ID, "title", item.Title, "error", err) errors++ searched++ continue } searched++ if len(results) == 0 { continue } var filtered []service.SearchResult for _, r := range results { var blocked bool err := w.database.Pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM blocklist WHERE release_title = $1)", r.Title).Scan(&blocked) if err != nil { slog.Error("failed to check blocklist", "error", err) continue } if blocked { continue } if item.QualityProfileID != nil { profile, err := w.qualitySvc.GetByID(ctx, *item.QualityProfileID) if err != nil { slog.Error("failed to get quality profile", "profile_id", *item.QualityProfileID, "error", err) filtered = append(filtered, r) continue } if r.QualityTier != nil { allowed := false for _, name := range profile.AllowedQualities { if name == r.QualityTier.Name { allowed = true break } } if !allowed { continue } } } filtered = append(filtered, r) } if len(filtered) == 0 { continue } best := filtered[0] for _, r := range filtered[1:] { bestRank := 0 if best.QualityTier != nil { bestRank = best.QualityTier.Rank } rank := 0 if r.QualityTier != nil { rank = r.QualityTier.Rank } if rank > bestRank || (rank == bestRank && r.Size > best.Size) { best = r } } _, err = w.searchSvc.Grab(ctx, service.GrabRequest{ DownloadURL: best.DownloadURL, Title: best.Title, MediaType: item.MediaType, Quality: best.Quality, IndexerName: best.IndexerName, MediaID: item.ID, }, w.dcSvc) if err != nil { slog.Error("failed to auto-grab release", "media_id", item.ID, "release", best.Title, "error", err) errors++ continue } status := "searching" if err := w.mediaSvc.Update(ctx, item.ID, item.MediaType, service.UpdateMediaRequest{Status: &status}); err != nil { slog.Error("failed to update media status", "media_id", item.ID, "error", err) } slog.Info("auto-grabbed release", "media", item.Title, "release", best.Title, "quality", best.QualityTier.Name) grabs++ } slog.Info("rss sync completed", "searched", searched, "grabs", grabs, "errors", errors) return nil }