Files
unified-media-manager/internal/service/indexer.go
2026-04-24 10:45:19 -07:00

558 lines
16 KiB
Go

package service
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"github.com/TopherMayor/unified-media-manager/internal/cardigann"
"github.com/TopherMayor/unified-media-manager/internal/db"
)
type Indexer struct {
ID int64 `json:"id"`
Name string `json:"name"`
Implementation string `json:"implementation"`
URL string `json:"url"`
APIKey *string `json:"-"`
Categories json.RawMessage `json:"categories"`
Settings json.RawMessage `json:"settings"`
Enabled bool `json:"enabled"`
Priority int `json:"priority"`
LastSuccessAt *time.Time `json:"last_success_at,omitempty"`
FailureCount int `json:"failure_count"`
DisabledUntil *time.Time `json:"disabled_until,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type IndexerResponse struct {
ID int64 `json:"id"`
Name string `json:"name"`
Implementation string `json:"implementation"`
URL string `json:"url"`
Categories json.RawMessage `json:"categories"`
Settings json.RawMessage `json:"settings"`
Enabled bool `json:"enabled"`
Priority int `json:"priority"`
LastSuccessAt *time.Time `json:"last_success_at,omitempty"`
FailureCount int `json:"failure_count"`
DisabledUntil *time.Time `json:"disabled_until,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type IndexerTestResult struct {
Success bool `json:"success"`
Error string `json:"error,omitempty"`
StatusCode int `json:"status_code,omitempty"`
}
type IndexerStats struct {
ID int64 `json:"id"`
Name string `json:"name"`
TotalGrabs int `json:"total_grabs"`
TotalFailed int `json:"total_failed"`
SuccessRate float64 `json:"success_rate"`
FailureCount int `json:"failure_count"`
LastSuccess string `json:"last_success,omitempty"`
}
type CreateIndexerRequest struct {
Name string `json:"name"`
Implementation string `json:"implementation"`
URL string `json:"url"`
APIKey *string `json:"api_key,omitempty"`
Categories json.RawMessage `json:"categories,omitempty"`
Settings json.RawMessage `json:"settings,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
Priority *int `json:"priority,omitempty"`
}
type UpdateIndexerRequest struct {
Name *string `json:"name,omitempty"`
Implementation *string `json:"implementation,omitempty"`
URL *string `json:"url,omitempty"`
APIKey *string `json:"api_key,omitempty"`
Categories json.RawMessage `json:"categories,omitempty"`
Settings json.RawMessage `json:"settings,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
Priority *int `json:"priority,omitempty"`
}
const indexerColumns = `id, name, implementation, url, api_key, categories, settings,
enabled, priority, last_success_at, failure_count, disabled_until, created_at, updated_at`
type IndexerService struct {
db *db.DB
cardigannEngine *cardigann.CardigannEngine
}
func NewIndexerService(database *db.DB) *IndexerService {
return &IndexerService{db: database}
}
// SetCardigannEngine sets the Cardigann engine for advanced indexer testing.
func (s *IndexerService) SetCardigannEngine(engine *cardigann.CardigannEngine) {
s.cardigannEngine = engine
}
// CardigannIndexerConfig holds the Cardigann-specific configuration stored in settings JSONB.
type CardigannIndexerConfig struct {
YAML string `json:"yaml"`
Config map[string]string `json:"config"`
}
// GetCardigannConfig extracts Cardigann configuration from indexer settings JSONB.
func (s *IndexerService) GetCardigannConfig(settings json.RawMessage) (*CardigannIndexerConfig, error) {
if len(settings) == 0 {
return nil, fmt.Errorf("no settings provided")
}
var cfg CardigannIndexerConfig
if err := json.Unmarshal(settings, &cfg); err != nil {
return nil, fmt.Errorf("parse cardigann config: %w", err)
}
if cfg.YAML == "" {
return nil, fmt.Errorf("cardigann settings missing yaml field")
}
return &cfg, nil
}
func scanIndexer(scanner interface{ Scan(...interface{}) error }) (*Indexer, error) {
var idx Indexer
var apiKey sql.NullString
var categories, settings []byte
var lastSuccessAt, disabledUntil sql.NullTime
err := scanner.Scan(&idx.ID, &idx.Name, &idx.Implementation, &idx.URL, &apiKey,
&categories, &settings, &idx.Enabled, &idx.Priority,
&lastSuccessAt, &idx.FailureCount, &disabledUntil,
&idx.CreatedAt, &idx.UpdatedAt)
if err != nil {
return nil, err
}
if apiKey.Valid {
idx.APIKey = &apiKey.String
}
idx.Categories = json.RawMessage(categories)
idx.Settings = json.RawMessage(settings)
if lastSuccessAt.Valid {
idx.LastSuccessAt = &lastSuccessAt.Time
}
if disabledUntil.Valid {
idx.DisabledUntil = &disabledUntil.Time
}
return &idx, nil
}
func indexerToResponse(idx *Indexer) IndexerResponse {
return IndexerResponse{
ID: idx.ID,
Name: idx.Name,
Implementation: idx.Implementation,
URL: idx.URL,
Categories: idx.Categories,
Settings: idx.Settings,
Enabled: idx.Enabled,
Priority: idx.Priority,
LastSuccessAt: idx.LastSuccessAt,
FailureCount: idx.FailureCount,
DisabledUntil: idx.DisabledUntil,
CreatedAt: idx.CreatedAt,
UpdatedAt: idx.UpdatedAt,
}
}
func (s *IndexerService) List(ctx context.Context) ([]IndexerResponse, error) {
rows, err := s.db.Pool.Query(ctx,
fmt.Sprintf("SELECT %s FROM indexers ORDER BY priority, name", indexerColumns))
if err != nil {
return nil, fmt.Errorf("list indexers: %w", err)
}
defer rows.Close()
var items []IndexerResponse
for rows.Next() {
idx, err := scanIndexer(rows)
if err != nil {
slog.Error("failed to scan indexer", "error", err)
continue
}
items = append(items, indexerToResponse(idx))
}
return items, nil
}
func (s *IndexerService) Create(ctx context.Context, req CreateIndexerRequest) (int64, error) {
categories := req.Categories
if categories == nil {
categories = json.RawMessage("[]")
}
settings := req.Settings
if settings == nil {
settings = json.RawMessage("{}")
}
enabled := true
if req.Enabled != nil {
enabled = *req.Enabled
}
priority := 0
if req.Priority != nil {
priority = *req.Priority
}
// For Cardigann indexers, extract URL from YAML definition
url := req.URL
if req.Implementation == "cardigann" {
cfg, err := s.GetCardigannConfig(settings)
if err != nil {
return 0, fmt.Errorf("invalid cardigann settings: %w", err)
}
def, err := cardigann.ParseDefinition([]byte(cfg.YAML))
if err != nil {
return 0, fmt.Errorf("invalid cardigann YAML: %w", err)
}
if len(def.Links) > 0 {
url = def.Links[0]
}
if req.Name == "" {
req.Name = def.Name
}
}
var id int64
err := s.db.Pool.QueryRow(ctx,
`INSERT INTO indexers (name, implementation, url, api_key, categories, settings, enabled, priority)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`,
req.Name, req.Implementation, url, req.APIKey, categories, settings, enabled, priority).Scan(&id)
if err != nil {
return 0, fmt.Errorf("create indexer: %w", err)
}
return id, nil
}
func (s *IndexerService) Update(ctx context.Context, id int64, req UpdateIndexerRequest) error {
var setClauses []string
var args []interface{}
idx := 1
addCol := func(col string, val interface{}) {
setClauses = append(setClauses, fmt.Sprintf("%s = $%d", col, idx))
args = append(args, val)
idx++
}
if req.Name != nil {
addCol("name", *req.Name)
}
if req.Implementation != nil {
addCol("implementation", *req.Implementation)
}
if req.URL != nil {
addCol("url", *req.URL)
}
if req.APIKey != nil {
addCol("api_key", *req.APIKey)
}
if req.Categories != nil {
addCol("categories", req.Categories)
}
if req.Settings != nil {
addCol("settings", req.Settings)
}
if req.Enabled != nil {
addCol("enabled", *req.Enabled)
}
if req.Priority != nil {
addCol("priority", *req.Priority)
}
if len(setClauses) == 0 {
return fmt.Errorf("no fields to update")
}
addCol("updated_at", time.Now())
query := fmt.Sprintf("UPDATE indexers SET %s WHERE id = $%d",
strings.Join(setClauses, ", "), idx)
args = append(args, id)
tag, err := s.db.Pool.Exec(ctx, query, args...)
if err != nil {
return fmt.Errorf("update indexer: %w", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("indexer not found")
}
return nil
}
func (s *IndexerService) Delete(ctx context.Context, id int64) error {
tag, err := s.db.Pool.Exec(ctx, "DELETE FROM indexers WHERE id = $1", id)
if err != nil {
return fmt.Errorf("delete indexer: %w", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("indexer not found")
}
return nil
}
func (s *IndexerService) Test(ctx context.Context, id int64) (*IndexerTestResult, error) {
row := s.db.Pool.QueryRow(ctx,
fmt.Sprintf("SELECT %s FROM indexers WHERE id = $1", indexerColumns), id)
idx, err := scanIndexer(row)
if err != nil {
return nil, fmt.Errorf("indexer not found")
}
// Cardigann indexers: parse YAML, perform connectivity check
if idx.Implementation == "cardigann" {
return s.testCardigannIndexer(ctx, idx)
}
testURL := idx.URL
switch idx.Implementation {
case "newznab", "torznab":
testURL = testURL + "/api?t=caps"
if idx.APIKey != nil && *idx.APIKey != "" {
testURL = testURL + "&apikey=" + *idx.APIKey
}
default:
testURL = strings.TrimRight(testURL, "/")
}
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, testURL, nil)
if err != nil {
return &IndexerTestResult{Success: false, Error: err.Error()}, nil
}
resp, err := client.Do(req)
if err != nil {
s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = failure_count + 1, updated_at = NOW() WHERE id = $1", id)
return &IndexerTestResult{Success: false, Error: err.Error()}, nil
}
resp.Body.Close()
if resp.StatusCode >= 400 {
s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = failure_count + 1, updated_at = NOW() WHERE id = $1", id)
return &IndexerTestResult{
Success: false,
Error: fmt.Sprintf("HTTP %d", resp.StatusCode),
StatusCode: resp.StatusCode,
}, nil
}
s.db.Pool.Exec(ctx,
"UPDATE indexers SET last_success_at = NOW(), failure_count = 0, updated_at = NOW() WHERE id = $1", id)
return &IndexerTestResult{
Success: true,
StatusCode: resp.StatusCode,
}, nil
}
func (s *IndexerService) testCardigannIndexer(ctx context.Context, idx *Indexer) (*IndexerTestResult, error) {
cfg, err := s.GetCardigannConfig(idx.Settings)
if err != nil {
return &IndexerTestResult{Success: false, Error: fmt.Sprintf("invalid cardigann config: %v", err)}, nil
}
def, err := cardigann.ParseDefinition([]byte(cfg.YAML))
if err != nil {
return &IndexerTestResult{Success: false, Error: fmt.Sprintf("invalid YAML: %v", err)}, nil
}
// Use CardigannEngine for full test if available
if s.cardigannEngine != nil {
result, err := s.cardigannEngine.Test(ctx, def, cfg.Config)
if err != nil {
return &IndexerTestResult{Success: false, Error: err.Error()}, nil
}
if !result.Success {
s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = failure_count + 1, updated_at = NOW() WHERE id = $1", idx.ID)
} else {
s.db.Pool.Exec(ctx,
"UPDATE indexers SET last_success_at = NOW(), failure_count = 0, updated_at = NOW() WHERE id = $1", idx.ID)
}
return &IndexerTestResult{
Success: result.Success,
Error: result.Error,
}, nil
}
// Fallback: basic connectivity check to first link
if len(def.Links) == 0 {
return &IndexerTestResult{Success: false, Error: "definition has no links"}, nil
}
testURL := def.Links[0]
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, testURL, nil)
if err != nil {
return &IndexerTestResult{Success: false, Error: err.Error()}, nil
}
resp, err := client.Do(req)
if err != nil {
s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = failure_count + 1, updated_at = NOW() WHERE id = $1", idx.ID)
return &IndexerTestResult{Success: false, Error: err.Error()}, nil
}
resp.Body.Close()
if resp.StatusCode >= 400 {
s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = failure_count + 1, updated_at = NOW() WHERE id = $1", idx.ID)
return &IndexerTestResult{
Success: false,
Error: fmt.Sprintf("HTTP %d", resp.StatusCode),
StatusCode: resp.StatusCode,
}, nil
}
s.db.Pool.Exec(ctx,
"UPDATE indexers SET last_success_at = NOW(), failure_count = 0, updated_at = NOW() WHERE id = $1", idx.ID)
return &IndexerTestResult{
Success: true,
StatusCode: resp.StatusCode,
}, nil
}
func (s *IndexerService) ListEnabled(ctx context.Context) ([]Indexer, error) {
rows, err := s.db.Pool.Query(ctx,
fmt.Sprintf("SELECT %s FROM indexers WHERE enabled = true AND (disabled_until IS NULL OR disabled_until < NOW()) ORDER BY priority, name", indexerColumns))
if err != nil {
return nil, fmt.Errorf("list enabled indexers: %w", err)
}
defer rows.Close()
var items []Indexer
for rows.Next() {
idx, err := scanIndexer(rows)
if err != nil {
slog.Error("failed to scan indexer", "error", err)
continue
}
items = append(items, *idx)
}
return items, nil
}
func (s *IndexerService) RecordSuccess(ctx context.Context, id int64) error {
_, err := s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = 0, last_success_at = NOW(), disabled_until = NULL, updated_at = NOW() WHERE id = $1", id)
if err != nil {
return fmt.Errorf("record indexer success: %w", err)
}
return nil
}
func (s *IndexerService) RecordFailure(ctx context.Context, id int64) error {
var failureCount int
err := s.db.Pool.QueryRow(ctx,
"SELECT failure_count FROM indexers WHERE id = $1", id).Scan(&failureCount)
if err != nil {
return fmt.Errorf("get indexer failure count: %w", err)
}
failureCount++
if failureCount >= 5 {
backoffMinutes := 1 << min(failureCount, 6)
if backoffMinutes > 60 {
backoffMinutes = 60
}
disabledUntil := time.Now().Add(time.Duration(backoffMinutes) * time.Minute)
_, err = s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = $1, disabled_until = $2, updated_at = NOW() WHERE id = $3",
failureCount, disabledUntil, id)
slog.Warn("indexer auto-disabled after consecutive failures", "id", id, "failure_count", failureCount, "disabled_until", disabledUntil)
} else {
_, err = s.db.Pool.Exec(ctx,
"UPDATE indexers SET failure_count = $1, updated_at = NOW() WHERE id = $2",
failureCount, id)
slog.Warn("indexer search failed", "id", id, "failure_count", failureCount)
}
if err != nil {
return fmt.Errorf("record indexer failure: %w", err)
}
return nil
}
func MediaTypeToCategory(mediaType string) string {
switch strings.ToLower(mediaType) {
case "movie":
return "2000"
case "series", "episode":
return "5000"
case "music", "album":
return "3000"
case "book":
return "7000"
case "audiobook":
return "3030"
default:
return ""
}
}
func (s *IndexerService) Stats(ctx context.Context, id int64) (*IndexerStats, error) {
var name string
var failureCount int
var lastSuccessAt sql.NullTime
err := s.db.Pool.QueryRow(ctx,
"SELECT name, failure_count, last_success_at FROM indexers WHERE id = $1", id,
).Scan(&name, &failureCount, &lastSuccessAt)
if err != nil {
return nil, fmt.Errorf("indexer not found")
}
var totalGrabs, totalFailed int
s.db.Pool.QueryRow(ctx,
`SELECT COUNT(*) FILTER (WHERE action IN ('grabbed', 'imported')),
COUNT(*) FILTER (WHERE action = 'failed')
FROM download_history WHERE indexer = $1`, name,
).Scan(&totalGrabs, &totalFailed)
successRate := 0.0
total := totalGrabs + totalFailed
if total > 0 {
successRate = float64(totalGrabs) / float64(total) * 100
}
result := &IndexerStats{
ID: id,
Name: name,
TotalGrabs: totalGrabs,
TotalFailed: totalFailed,
SuccessRate: successRate,
FailureCount: failureCount,
}
if lastSuccessAt.Valid {
result.LastSuccess = lastSuccessAt.Time.Format(time.RFC3339)
}
return result, nil
}