package service import ( "context" "database/sql" "encoding/json" "fmt" "log/slog" "strings" "time" "github.com/TopherMayor/unified-media-manager/internal/db" ) type QueueItem struct { ID int64 `json:"id"` MediaID int64 `json:"media_id"` ReleaseTitle string `json:"release_title"` ReleaseURL *string `json:"release_url,omitempty"` Indexer string `json:"indexer"` DownloadClient string `json:"download_client"` Quality json.RawMessage `json:"quality"` Size *int64 `json:"size,omitempty"` Protocol string `json:"protocol"` Status string `json:"status"` Progress float64 `json:"progress"` ErrorMessage *string `json:"error_message,omitempty"` BatchID *string `json:"batch_id,omitempty"` Priority int `json:"priority"` RetryCount int `json:"retry_count"` MaxRetries int `json:"max_retries"` CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` CompletedAt *time.Time `json:"completed_at,omitempty"` UpdatedAt time.Time `json:"updated_at"` } type QueueFilters struct { Status string Page int PageSize int } type QueueBatchDeleteRequest struct { Status *string `json:"status,omitempty"` BatchID *string `json:"batch_id,omitempty"` IDs []int64 `json:"ids,omitempty"` } const queueColumns = `id, media_id, release_title, release_url, indexer, download_client, quality, size, protocol, status, progress, error_message, batch_id, priority, retry_count, max_retries, created_at, started_at, completed_at, updated_at` type QueueService struct { db *db.DB } func NewQueueService(database *db.DB) *QueueService { return &QueueService{db: database} } func scanQueueItem(scanner interface{ Scan(...interface{}) error }) (*QueueItem, error) { var item QueueItem var releaseURL, errorMsg, batchID sql.NullString var size sql.NullInt64 var startedAt, completedAt sql.NullTime err := scanner.Scan(&item.ID, &item.MediaID, &item.ReleaseTitle, &releaseURL, &item.Indexer, &item.DownloadClient, &item.Quality, &size, &item.Protocol, &item.Status, &item.Progress, &errorMsg, &batchID, &item.Priority, &item.RetryCount, &item.MaxRetries, &item.CreatedAt, &startedAt, &completedAt, &item.UpdatedAt) if err != nil { return nil, err } if releaseURL.Valid { item.ReleaseURL = &releaseURL.String } if errorMsg.Valid { item.ErrorMessage = &errorMsg.String } if batchID.Valid { item.BatchID = &batchID.String } if size.Valid { item.Size = &size.Int64 } if startedAt.Valid { item.StartedAt = &startedAt.Time } if completedAt.Valid { item.CompletedAt = &completedAt.Time } return &item, nil } func (s *QueueService) List(ctx context.Context, filters QueueFilters) ([]QueueItem, int, error) { qb := NewQueryBuilder(1) if filters.Status != "" { qb.Add("status = $%d", filters.Status) } var total int if err := s.db.Pool.QueryRow(ctx, "SELECT COUNT(*) FROM download_queue"+qb.Where(), qb.Args()...).Scan(&total); err != nil { return nil, 0, fmt.Errorf("count queue: %w", err) } query := fmt.Sprintf("SELECT %s FROM download_queue%s ORDER BY priority DESC, created_at ASC LIMIT $%d OFFSET $%d", queueColumns, qb.Where(), qb.Idx(), qb.Idx()+1) args := append(qb.Args(), filters.PageSize, (filters.Page-1)*filters.PageSize) rows, err := s.db.Pool.Query(ctx, query, args...) if err != nil { return nil, 0, fmt.Errorf("list queue: %w", err) } defer rows.Close() var items []QueueItem for rows.Next() { item, err := scanQueueItem(rows) if err != nil { slog.Error("failed to scan queue item", "error", err) continue } items = append(items, *item) } return items, total, nil } func (s *QueueService) Delete(ctx context.Context, id int64) error { tag, err := s.db.Pool.Exec(ctx, "UPDATE download_queue SET status = 'cancelled', updated_at = NOW() WHERE id = $1", id) if err != nil { return fmt.Errorf("cancel queue item: %w", err) } if tag.RowsAffected() == 0 { return fmt.Errorf("queue item not found") } return nil } func (s *QueueService) BatchDelete(ctx context.Context, req QueueBatchDeleteRequest) (int64, error) { qb := NewQueryBuilder(1) if req.Status != nil { qb.Add("status = $%d", *req.Status) } if req.BatchID != nil { qb.Add("batch_id = $%d", *req.BatchID) } if len(req.IDs) > 0 { qb.Add("id = ANY($%d)", req.IDs) } if len(qb.conditions) == 0 { return 0, fmt.Errorf("must provide status, batch_id, or ids") } query := fmt.Sprintf("UPDATE download_queue SET status = 'cancelled', updated_at = NOW() WHERE %s", strings.Join(qb.conditions, " AND ")) tag, err := s.db.Pool.Exec(ctx, query, qb.Args()...) if err != nil { return 0, fmt.Errorf("batch cancel queue: %w", err) } return tag.RowsAffected(), nil } func (s *QueueService) Clear(ctx context.Context) (int64, error) { tag, err := s.db.Pool.Exec(ctx, "DELETE FROM download_queue WHERE status IN ('imported', 'failed', 'cancelled')") if err != nil { return 0, fmt.Errorf("clear queue: %w", err) } return tag.RowsAffected(), nil } func (s *QueueService) Retry(ctx context.Context, id int64) error { tag, err := s.db.Pool.Exec(ctx, `UPDATE download_queue SET status = 'pending', progress = 0, error_message = NULL, retry_count = retry_count + 1, updated_at = NOW() WHERE id = $1 AND status = 'failed'`, id) if err != nil { return fmt.Errorf("retry queue item: %w", err) } if tag.RowsAffected() == 0 { return fmt.Errorf("queue item not found or not failed") } return nil } func (s *QueueService) RetryFailed(ctx context.Context) (int64, error) { tag, err := s.db.Pool.Exec(ctx, `UPDATE download_queue SET status = 'pending', progress = 0, error_message = NULL, retry_count = retry_count + 1, updated_at = NOW() WHERE status = 'failed' AND retry_count < max_retries`) if err != nil { return 0, fmt.Errorf("retry all failed: %w", err) } return tag.RowsAffected(), nil } type CreateQueueEntryRequest struct { MediaID int64 `json:"media_id"` MediaType string `json:"media_type"` ReleaseTitle string `json:"release_title"` Indexer string `json:"indexer"` DownloadClient string `json:"download_client"` Quality json.RawMessage `json:"quality"` Protocol string `json:"protocol"` DownloadID string `json:"download_id"` } func (s *QueueService) CreateQueueEntry(ctx context.Context, req CreateQueueEntryRequest) (int64, error) { var id int64 err := s.db.Pool.QueryRow(ctx, `INSERT INTO download_queue (media_id, media_type, release_title, indexer, download_client, quality, protocol, status, progress, download_id) VALUES ($1, $2, $3, $4, $5, $6, $7, 'downloading', 0, $8) RETURNING id`, req.MediaID, req.MediaType, req.ReleaseTitle, req.Indexer, req.DownloadClient, req.Quality, req.Protocol, req.DownloadID).Scan(&id) if err != nil { return 0, fmt.Errorf("create queue entry: %w", err) } return id, nil }