package worker import ( "context" "fmt" "log/slog" "strings" "time" "github.com/TopherMayor/unified-media-manager/internal/config" "github.com/TopherMayor/unified-media-manager/internal/db" ) type CleanupWorker struct { database *db.DB cfg *config.Config } func NewCleanupWorker(database *db.DB, cfg *config.Config) *CleanupWorker { return &CleanupWorker{ database: database, cfg: cfg, } } func (w *CleanupWorker) Name() string { return "cleanup" } func (w *CleanupWorker) CronExpr() string { return w.cfg.WorkerCleanupInterval } func (w *CleanupWorker) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() partitionsDropped := 0 rows, err := w.database.Pool.Query(ctx, `SELECT inhrelid::regclass::text FROM pg_inherits JOIN pg_class ON (inhrelid = oid) WHERE inhparent = 'download_history'::regclass`) if err != nil { return fmt.Errorf("query download_history partitions: %w", err) } defer rows.Close() cutoff := time.Now().AddDate(0, 0, -90) for rows.Next() { var partName string if err := rows.Scan(&partName); err != nil { slog.Error("failed to scan partition name", "error", err) continue } partLower := strings.ToLower(partName) if strings.HasPrefix(partLower, "download_history_") { dateStr := strings.TrimPrefix(partLower, "download_history_") partTime, err := time.Parse("2006_01_02", dateStr) if err != nil { continue } if partTime.Before(cutoff) { _, err := w.database.Pool.Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", partName)) if err != nil { slog.Error("failed to drop partition", "partition", partName, "error", err) continue } partitionsDropped++ } } } tag, err := w.database.Pool.Exec(ctx, "DELETE FROM task_executions WHERE started_at < NOW() - INTERVAL '7 days'") if err != nil { slog.Error("failed to clean old task executions", "error", err) } executionsCleaned := tag.RowsAffected() slog.Info("cleanup completed", "partitions_dropped", partitionsDropped, "executions_cleaned", executionsCleaned) return nil }