227 lines
7.0 KiB
Go
227 lines
7.0 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/TopherMayor/unified-media-manager/internal/db"
|
|
)
|
|
|
|
type QueueItem struct {
|
|
ID int64 `json:"id"`
|
|
MediaID int64 `json:"media_id"`
|
|
ReleaseTitle string `json:"release_title"`
|
|
ReleaseURL *string `json:"release_url,omitempty"`
|
|
Indexer string `json:"indexer"`
|
|
DownloadClient string `json:"download_client"`
|
|
Quality json.RawMessage `json:"quality"`
|
|
Size *int64 `json:"size,omitempty"`
|
|
Protocol string `json:"protocol"`
|
|
Status string `json:"status"`
|
|
Progress float64 `json:"progress"`
|
|
ErrorMessage *string `json:"error_message,omitempty"`
|
|
BatchID *string `json:"batch_id,omitempty"`
|
|
Priority int `json:"priority"`
|
|
RetryCount int `json:"retry_count"`
|
|
MaxRetries int `json:"max_retries"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
StartedAt *time.Time `json:"started_at,omitempty"`
|
|
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
type QueueFilters struct {
|
|
Status string
|
|
Page int
|
|
PageSize int
|
|
}
|
|
|
|
type QueueBatchDeleteRequest struct {
|
|
Status *string `json:"status,omitempty"`
|
|
BatchID *string `json:"batch_id,omitempty"`
|
|
IDs []int64 `json:"ids,omitempty"`
|
|
}
|
|
|
|
const queueColumns = `id, media_id, release_title, release_url, indexer, download_client,
|
|
quality, size, protocol, status, progress, error_message, batch_id, priority,
|
|
retry_count, max_retries, created_at, started_at, completed_at, updated_at`
|
|
|
|
type QueueService struct {
|
|
db *db.DB
|
|
}
|
|
|
|
func NewQueueService(database *db.DB) *QueueService {
|
|
return &QueueService{db: database}
|
|
}
|
|
|
|
func scanQueueItem(scanner interface{ Scan(...interface{}) error }) (*QueueItem, error) {
|
|
var item QueueItem
|
|
var releaseURL, errorMsg, batchID sql.NullString
|
|
var size sql.NullInt64
|
|
var startedAt, completedAt sql.NullTime
|
|
|
|
err := scanner.Scan(&item.ID, &item.MediaID, &item.ReleaseTitle, &releaseURL, &item.Indexer,
|
|
&item.DownloadClient, &item.Quality, &size, &item.Protocol, &item.Status,
|
|
&item.Progress, &errorMsg, &batchID, &item.Priority, &item.RetryCount,
|
|
&item.MaxRetries, &item.CreatedAt, &startedAt, &completedAt, &item.UpdatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if releaseURL.Valid {
|
|
item.ReleaseURL = &releaseURL.String
|
|
}
|
|
if errorMsg.Valid {
|
|
item.ErrorMessage = &errorMsg.String
|
|
}
|
|
if batchID.Valid {
|
|
item.BatchID = &batchID.String
|
|
}
|
|
if size.Valid {
|
|
item.Size = &size.Int64
|
|
}
|
|
if startedAt.Valid {
|
|
item.StartedAt = &startedAt.Time
|
|
}
|
|
if completedAt.Valid {
|
|
item.CompletedAt = &completedAt.Time
|
|
}
|
|
return &item, nil
|
|
}
|
|
|
|
func (s *QueueService) List(ctx context.Context, filters QueueFilters) ([]QueueItem, int, error) {
|
|
qb := NewQueryBuilder(1)
|
|
|
|
if filters.Status != "" {
|
|
qb.Add("status = $%d", filters.Status)
|
|
}
|
|
|
|
var total int
|
|
if err := s.db.Pool.QueryRow(ctx, "SELECT COUNT(*) FROM download_queue"+qb.Where(), qb.Args()...).Scan(&total); err != nil {
|
|
return nil, 0, fmt.Errorf("count queue: %w", err)
|
|
}
|
|
|
|
query := fmt.Sprintf("SELECT %s FROM download_queue%s ORDER BY priority DESC, created_at ASC LIMIT $%d OFFSET $%d",
|
|
queueColumns, qb.Where(), qb.Idx(), qb.Idx()+1)
|
|
args := append(qb.Args(), filters.PageSize, (filters.Page-1)*filters.PageSize)
|
|
|
|
rows, err := s.db.Pool.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("list queue: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var items []QueueItem
|
|
for rows.Next() {
|
|
item, err := scanQueueItem(rows)
|
|
if err != nil {
|
|
slog.Error("failed to scan queue item", "error", err)
|
|
continue
|
|
}
|
|
items = append(items, *item)
|
|
}
|
|
|
|
return items, total, nil
|
|
}
|
|
|
|
func (s *QueueService) Delete(ctx context.Context, id int64) error {
|
|
tag, err := s.db.Pool.Exec(ctx,
|
|
"UPDATE download_queue SET status = 'cancelled', updated_at = NOW() WHERE id = $1", id)
|
|
if err != nil {
|
|
return fmt.Errorf("cancel queue item: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("queue item not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *QueueService) BatchDelete(ctx context.Context, req QueueBatchDeleteRequest) (int64, error) {
|
|
qb := NewQueryBuilder(1)
|
|
|
|
if req.Status != nil {
|
|
qb.Add("status = $%d", *req.Status)
|
|
}
|
|
if req.BatchID != nil {
|
|
qb.Add("batch_id = $%d", *req.BatchID)
|
|
}
|
|
if len(req.IDs) > 0 {
|
|
qb.Add("id = ANY($%d)", req.IDs)
|
|
}
|
|
|
|
if len(qb.conditions) == 0 {
|
|
return 0, fmt.Errorf("must provide status, batch_id, or ids")
|
|
}
|
|
|
|
query := fmt.Sprintf("UPDATE download_queue SET status = 'cancelled', updated_at = NOW() WHERE %s",
|
|
strings.Join(qb.conditions, " AND "))
|
|
tag, err := s.db.Pool.Exec(ctx, query, qb.Args()...)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("batch cancel queue: %w", err)
|
|
}
|
|
|
|
return tag.RowsAffected(), nil
|
|
}
|
|
|
|
func (s *QueueService) Clear(ctx context.Context) (int64, error) {
|
|
tag, err := s.db.Pool.Exec(ctx,
|
|
"DELETE FROM download_queue WHERE status IN ('imported', 'failed', 'cancelled')")
|
|
if err != nil {
|
|
return 0, fmt.Errorf("clear queue: %w", err)
|
|
}
|
|
return tag.RowsAffected(), nil
|
|
}
|
|
|
|
func (s *QueueService) Retry(ctx context.Context, id int64) error {
|
|
tag, err := s.db.Pool.Exec(ctx,
|
|
`UPDATE download_queue SET status = 'pending', progress = 0, error_message = NULL,
|
|
retry_count = retry_count + 1, updated_at = NOW() WHERE id = $1 AND status = 'failed'`, id)
|
|
if err != nil {
|
|
return fmt.Errorf("retry queue item: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return fmt.Errorf("queue item not found or not failed")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *QueueService) RetryFailed(ctx context.Context) (int64, error) {
|
|
tag, err := s.db.Pool.Exec(ctx,
|
|
`UPDATE download_queue SET status = 'pending', progress = 0, error_message = NULL,
|
|
retry_count = retry_count + 1, updated_at = NOW()
|
|
WHERE status = 'failed' AND retry_count < max_retries`)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("retry all failed: %w", err)
|
|
}
|
|
return tag.RowsAffected(), nil
|
|
}
|
|
|
|
type CreateQueueEntryRequest struct {
|
|
MediaID int64 `json:"media_id"`
|
|
MediaType string `json:"media_type"`
|
|
ReleaseTitle string `json:"release_title"`
|
|
Indexer string `json:"indexer"`
|
|
DownloadClient string `json:"download_client"`
|
|
Quality json.RawMessage `json:"quality"`
|
|
Protocol string `json:"protocol"`
|
|
DownloadID string `json:"download_id"`
|
|
}
|
|
|
|
func (s *QueueService) CreateQueueEntry(ctx context.Context, req CreateQueueEntryRequest) (int64, error) {
|
|
var id int64
|
|
err := s.db.Pool.QueryRow(ctx,
|
|
`INSERT INTO download_queue (media_id, media_type, release_title, indexer, download_client, quality, protocol, status, progress, download_id)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, 'downloading', 0, $8) RETURNING id`,
|
|
req.MediaID, req.MediaType, req.ReleaseTitle, req.Indexer, req.DownloadClient,
|
|
req.Quality, req.Protocol, req.DownloadID).Scan(&id)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("create queue entry: %w", err)
|
|
}
|
|
return id, nil
|
|
}
|