132 lines
3.4 KiB
Go
132 lines
3.4 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/TopherMayor/unified-media-manager/internal/config"
|
|
"github.com/TopherMayor/unified-media-manager/internal/db"
|
|
"github.com/TopherMayor/unified-media-manager/internal/service"
|
|
)
|
|
|
|
type QueueProcessor struct {
|
|
database *db.DB
|
|
importSvc *service.ImportService
|
|
dcSvc *service.DownloadClientService
|
|
cfg *config.Config
|
|
activitySvc *service.ActivityService
|
|
}
|
|
|
|
func NewQueueProcessor(database *db.DB, importSvc *service.ImportService, dcSvc *service.DownloadClientService, cfg *config.Config, activitySvc *service.ActivityService) *QueueProcessor {
|
|
return &QueueProcessor{
|
|
database: database,
|
|
importSvc: importSvc,
|
|
dcSvc: dcSvc,
|
|
cfg: cfg,
|
|
activitySvc: activitySvc,
|
|
}
|
|
}
|
|
|
|
func (w *QueueProcessor) Name() string {
|
|
return "queue_processor"
|
|
}
|
|
|
|
func (w *QueueProcessor) CronExpr() string {
|
|
return w.cfg.WorkerQueueInterval
|
|
}
|
|
|
|
func (w *QueueProcessor) Run(ctx context.Context) error {
|
|
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
progressUpdated := 0
|
|
errors := 0
|
|
|
|
nzbClients, err := w.dcSvc.GetAllEnabled(ctx, "nzb")
|
|
if err != nil {
|
|
slog.Error("failed to get nzb clients", "error", err)
|
|
}
|
|
torrentClients, err := w.dcSvc.GetAllEnabled(ctx, "torrent")
|
|
if err != nil {
|
|
slog.Error("failed to get torrent clients", "error", err)
|
|
}
|
|
|
|
allClients := append(nzbClients, torrentClients...)
|
|
|
|
for _, client := range allClients {
|
|
rows, err := w.database.Pool.Query(ctx,
|
|
`SELECT id, download_id FROM download_queue
|
|
WHERE status = 'downloading' AND download_client = $1 AND download_id IS NOT NULL AND download_id != ''`,
|
|
client.Config.Name)
|
|
if err != nil {
|
|
slog.Error("failed to query active downloads", "client", client.Config.Name, "error", err)
|
|
continue
|
|
}
|
|
|
|
type queueItem struct {
|
|
id int64
|
|
downloadID string
|
|
}
|
|
var items []queueItem
|
|
for rows.Next() {
|
|
var item queueItem
|
|
if err := rows.Scan(&item.id, &item.downloadID); err != nil {
|
|
slog.Error("failed to scan queue item", "error", err)
|
|
continue
|
|
}
|
|
items = append(items, item)
|
|
}
|
|
rows.Close()
|
|
|
|
for _, item := range items {
|
|
progress, err := client.Client.GetProgress(ctx, item.downloadID)
|
|
if err != nil {
|
|
slog.Error("failed to get download progress", "id", item.downloadID, "error", err)
|
|
errors++
|
|
continue
|
|
}
|
|
|
|
// Detect failed downloads
|
|
if progress.Status == "failed" || progress.Status == "error" {
|
|
if w.activitySvc != nil {
|
|
w.activitySvc.LogAsync(service.LogEntry{
|
|
EventType: "download_failed",
|
|
Title: fmt.Sprintf("Download failed: %s", item.downloadID),
|
|
Data: []byte(fmt.Sprintf(`{"queue_id":%d,"download_id":"%s"}`, item.id, item.downloadID)),
|
|
})
|
|
}
|
|
}
|
|
|
|
_, err = w.database.Pool.Exec(ctx,
|
|
"UPDATE download_queue SET progress = $1, updated_at = NOW() WHERE id = $2",
|
|
progress.Progress, item.id)
|
|
if err != nil {
|
|
slog.Error("failed to update progress", "id", item.id, "error", err)
|
|
errors++
|
|
continue
|
|
}
|
|
progressUpdated++
|
|
}
|
|
}
|
|
|
|
report, err := w.importSvc.ProcessCompleted(ctx)
|
|
if err != nil {
|
|
slog.Error("failed to process completed downloads", "error", err)
|
|
errors++
|
|
}
|
|
|
|
imported := 0
|
|
if report != nil {
|
|
imported = report.Imported
|
|
}
|
|
|
|
slog.Info("queue processor completed",
|
|
"progress_updated", progressUpdated,
|
|
"completed_imported", imported,
|
|
"errors", errors)
|
|
|
|
return nil
|
|
}
|