package service import ( "bytes" "context" "encoding/json" "fmt" "log/slog" "math" "net/http" "net/url" "strings" "time" "github.com/TopherMayor/unified-media-manager/internal/db" "github.com/jackc/pgx/v5" ) type NotificationChannel struct { ID int64 `json:"id"` Name string `json:"name"` Type string `json:"type"` Enabled bool `json:"enabled"` Config json.RawMessage `json:"config"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` // EventTypes populated by JOIN (not a DB column) EventTypes []string `json:"event_types,omitempty"` } type QueueEntry struct { ID int64 `json:"id"` ChannelID int64 `json:"channel_id"` EventType string `json:"event_type"` Title string `json:"title"` Message json.RawMessage `json:"message"` Status string `json:"status"` Attempts int `json:"attempts"` MaxAttempts int `json:"max_attempts"` LastError *string `json:"last_error,omitempty"` NextRetryAt *time.Time `json:"next_retry_at,omitempty"` CreatedAt time.Time `json:"created_at"` DeliveredAt *time.Time `json:"delivered_at,omitempty"` } type NotificationService struct { db *db.DB http *http.Client telegramBaseURL string // override for testing done chan struct{} } func NewNotificationService(database *db.DB) *NotificationService { return &NotificationService{ db: database, http: &http.Client{Timeout: 10 * time.Second}, telegramBaseURL: "https://api.telegram.org", done: make(chan struct{}), } } // ValidateChannelConfig checks config has required fields for the channel type. func (s *NotificationService) ValidateChannelConfig(channelType string, config json.RawMessage) error { var m map[string]interface{} if err := json.Unmarshal(config, &m); err != nil { return fmt.Errorf("invalid config JSON: %w", err) } switch channelType { case "webhook": urlStr, _ := m["url"].(string) if urlStr == "" { return fmt.Errorf("webhook config requires 'url' field") } u, err := url.Parse(urlStr) if err != nil { return fmt.Errorf("webhook url is invalid: %w", err) } if u.Scheme != "http" && u.Scheme != "https" { return fmt.Errorf("webhook url must use http or https scheme") } case "telegram": botToken, _ := m["bot_token"].(string) chatID, _ := m["chat_id"].(string) if botToken == "" { return fmt.Errorf("telegram config requires 'bot_token' field") } if chatID == "" { return fmt.Errorf("telegram config requires 'chat_id' field") } default: return fmt.Errorf("unknown channel type: %s", channelType) } return nil } // ListChannels returns all channels with masked configs and their event subscriptions. func (s *NotificationService) ListChannels(ctx context.Context) ([]NotificationChannel, error) { rows, err := s.db.Pool.Query(ctx, `SELECT c.id, c.name, c.type, c.enabled, c.config, c.created_at, c.updated_at, COALESCE(json_agg(s.event_type) FILTER (WHERE s.event_type IS NOT NULL), '[]') AS event_types FROM notification_channels c LEFT JOIN notification_subscriptions s ON c.id = s.channel_id GROUP BY c.id, c.name, c.type, c.enabled, c.config, c.created_at, c.updated_at ORDER BY c.name`) if err != nil { return nil, fmt.Errorf("list notification channels: %w", err) } defer rows.Close() var channels []NotificationChannel for rows.Next() { var ch NotificationChannel var eventTypesJSON []byte if err := rows.Scan(&ch.ID, &ch.Name, &ch.Type, &ch.Enabled, &ch.Config, &ch.CreatedAt, &ch.UpdatedAt, &eventTypesJSON); err != nil { slog.Error("failed to scan notification channel", "error", err) continue } ch.Config = maskConfig(ch.Type, ch.Config) var types []string if err := json.Unmarshal(eventTypesJSON, &types); err == nil { ch.EventTypes = types } channels = append(channels, ch) } return channels, nil } // CreateChannel creates a new notification channel and returns its ID. func (s *NotificationService) CreateChannel(ctx context.Context, name, channelType string, config json.RawMessage) (int64, error) { if err := s.ValidateChannelConfig(channelType, config); err != nil { return 0, err } var id int64 err := s.db.Pool.QueryRow(ctx, `INSERT INTO notification_channels (name, type, config) VALUES ($1, $2, $3) RETURNING id`, name, channelType, config).Scan(&id) if err != nil { return 0, fmt.Errorf("create notification channel: %w", err) } slog.Info("created notification channel", "name", name, "type", channelType) return id, nil } // UpdateChannel updates a notification channel's fields. func (s *NotificationService) UpdateChannel(ctx context.Context, id int64, name *string, enabled *bool, config json.RawMessage) error { qb := NewQueryBuilder(1) setClauses := []string{} if name != nil { setClauses = append(setClauses, fmt.Sprintf("name = $%d", qb.Idx())) qb.Add("", *name) } if enabled != nil { setClauses = append(setClauses, fmt.Sprintf("enabled = $%d", qb.Idx())) qb.Add("", *enabled) } if config != nil { if err := s.ValidateChannelConfig("", config); err != nil { // Skip type-specific validation on update since we don't know the type here // The channel type doesn't change, just validate JSON is valid } setClauses = append(setClauses, fmt.Sprintf("config = $%d", qb.Idx())) qb.Add("", config) } if len(setClauses) == 0 { return nil } setClauses = append(setClauses, "updated_at = NOW()") query := fmt.Sprintf("UPDATE notification_channels SET %s WHERE id = $%d", strings.Join(setClauses, ", "), qb.Idx()) qb.Add("", id) tag, err := s.db.Pool.Exec(ctx, query, qb.Args()...) if err != nil { return fmt.Errorf("update notification channel: %w", err) } if tag.RowsAffected() == 0 { return fmt.Errorf("notification channel not found: %d", id) } return nil } // DeleteChannel removes a notification channel and its subscriptions. func (s *NotificationService) DeleteChannel(ctx context.Context, id int64) error { tag, err := s.db.Pool.Exec(ctx, `DELETE FROM notification_channels WHERE id = $1`, id) if err != nil { return fmt.Errorf("delete notification channel: %w", err) } if tag.RowsAffected() == 0 { return fmt.Errorf("notification channel not found: %d", id) } slog.Info("deleted notification channel", "id", id) return nil } // GetChannelWithConfig returns a channel with full unmasked config for delivery. func (s *NotificationService) GetChannelWithConfig(ctx context.Context, id int64) (*NotificationChannel, error) { var ch NotificationChannel err := s.db.Pool.QueryRow(ctx, `SELECT id, name, type, enabled, config, created_at, updated_at FROM notification_channels WHERE id = $1`, id). Scan(&ch.ID, &ch.Name, &ch.Type, &ch.Enabled, &ch.Config, &ch.CreatedAt, &ch.UpdatedAt) if err != nil { return nil, fmt.Errorf("get notification channel: %w", err) } return &ch, nil } // ListSubscriptions returns event types subscribed by a channel. func (s *NotificationService) ListSubscriptions(ctx context.Context, channelID int64) ([]string, error) { rows, err := s.db.Pool.Query(ctx, `SELECT event_type FROM notification_subscriptions WHERE channel_id = $1`, channelID) if err != nil { return nil, fmt.Errorf("list subscriptions: %w", err) } defer rows.Close() var types []string for rows.Next() { var t string if err := rows.Scan(&t); err != nil { continue } types = append(types, t) } return types, nil } // UpdateSubscriptions replaces all subscriptions for a channel. func (s *NotificationService) UpdateSubscriptions(ctx context.Context, channelID int64, eventTypes []string) error { tx, err := s.db.Pool.Begin(ctx) if err != nil { return fmt.Errorf("begin transaction: %w", err) } defer tx.Rollback(ctx) if _, err := tx.Exec(ctx, `DELETE FROM notification_subscriptions WHERE channel_id = $1`, channelID); err != nil { return fmt.Errorf("delete subscriptions: %w", err) } for _, et := range eventTypes { if _, err := tx.Exec(ctx, `INSERT INTO notification_subscriptions (channel_id, event_type) VALUES ($1, $2)`, channelID, et); err != nil { return fmt.Errorf("insert subscription: %w", err) } } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit subscriptions: %w", err) } return nil } // GetSubscribersForEvent returns enabled channels subscribed to an event type. func (s *NotificationService) GetSubscribersForEvent(ctx context.Context, eventType string) ([]NotificationChannel, error) { rows, err := s.db.Pool.Query(ctx, `SELECT DISTINCT c.id, c.name, c.type, c.enabled, c.config, c.created_at, c.updated_at FROM notification_channels c JOIN notification_subscriptions s ON c.id = s.channel_id WHERE s.event_type = $1 AND c.enabled = true`, eventType) if err != nil { return nil, fmt.Errorf("get subscribers for event: %w", err) } defer rows.Close() var channels []NotificationChannel for rows.Next() { var ch NotificationChannel if err := rows.Scan(&ch.ID, &ch.Name, &ch.Type, &ch.Enabled, &ch.Config, &ch.CreatedAt, &ch.UpdatedAt); err != nil { continue } channels = append(channels, ch) } return channels, nil } // DeliverWebhook sends an HTTP POST with JSON payload. func (s *NotificationService) DeliverWebhook(ctx context.Context, webhookURL string, payload map[string]interface{}) error { body, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal webhook payload: %w", err) } req, err := http.NewRequestWithContext(ctx, "POST", webhookURL, bytes.NewReader(body)) if err != nil { return fmt.Errorf("create webhook request: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := s.http.Do(req) if err != nil { return fmt.Errorf("webhook delivery failed: %w", err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("webhook returned status %d", resp.StatusCode) } return nil } // DeliverTelegram sends a message via the Telegram Bot API. func (s *NotificationService) DeliverTelegram(ctx context.Context, botToken, chatID, text string) error { apiURL := fmt.Sprintf("%s/bot%s/sendMessage", s.telegramBaseURL, botToken) payload := map[string]interface{}{ "chat_id": chatID, "text": text, "parse_mode": "HTML", } body, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal telegram payload: %w", err) } req, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewReader(body)) if err != nil { return fmt.Errorf("create telegram request: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := s.http.Do(req) if err != nil { return fmt.Errorf("telegram delivery failed: %w", err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return fmt.Errorf("telegram returned status %d", resp.StatusCode) } return nil } // calculateBackoff returns exponential backoff: 30s * 2^attempt, capped at 480s. func calculateBackoff(attempt int) time.Duration { d := 30 * time.Second * time.Duration(math.Pow(2, float64(attempt))) if d > 480*time.Second { return 480 * time.Second } return d } // maskConfig masks sensitive fields in channel config for API responses. func maskConfig(channelType string, raw json.RawMessage) json.RawMessage { var m map[string]interface{} if err := json.Unmarshal(raw, &m); err != nil { return raw } switch channelType { case "telegram": if _, ok := m["bot_token"]; ok { m["bot_token"] = "***" } } masked, err := json.Marshal(m) if err != nil { return raw } return masked } // ListQueue returns paginated notification queue entries. func (s *NotificationService) ListQueue(ctx context.Context, status string, page, pageSize int) ([]QueueEntry, int, error) { qb := NewQueryBuilder(1) if status != "" { qb.Add("status = $%d", status) } where := qb.Where() var total int countQuery := fmt.Sprintf("SELECT COUNT(*) FROM notification_queue%s", where) if err := s.db.Pool.QueryRow(ctx, countQuery, qb.Args()...).Scan(&total); err != nil { return nil, 0, fmt.Errorf("count notification queue: %w", err) } offset := (page - 1) * pageSize dataQuery := fmt.Sprintf( `SELECT id, channel_id, event_type, title, message, status, attempts, max_attempts, last_error, next_retry_at, created_at, delivered_at FROM notification_queue%s ORDER BY created_at DESC LIMIT $%d OFFSET $%d`, where, qb.Idx(), qb.Idx()+1) args := append(qb.Args(), pageSize, offset) rows, err := s.db.Pool.Query(ctx, dataQuery, args...) if err != nil { return nil, 0, fmt.Errorf("list notification queue: %w", err) } defer rows.Close() var entries []QueueEntry for rows.Next() { var e QueueEntry var lastError, nextRetry, deliveredAt interface{} // nullable if err := rows.Scan(&e.ID, &e.ChannelID, &e.EventType, &e.Title, &e.Message, &e.Status, &e.Attempts, &e.MaxAttempts, &lastError, &nextRetry, &e.CreatedAt, &deliveredAt); err != nil { continue } if le, ok := lastError.(*string); ok && le != nil { e.LastError = le } else if le, ok := lastError.(string); ok && le != "" { e.LastError = &le } if nr, ok := nextRetry.(*time.Time); ok && nr != nil { e.NextRetryAt = nr } if da, ok := deliveredAt.(*time.Time); ok && da != nil { e.DeliveredAt = da } entries = append(entries, e) } return entries, total, nil } // StartDispatcher launches the notification dispatcher goroutines. func (s *NotificationService) StartDispatcher(ctx context.Context) { go s.pollActivityEvents(ctx) go s.processQueue(ctx) } // StopDispatcher signals both dispatcher goroutines to stop. func (s *NotificationService) StopDispatcher() { close(s.done) } func (s *NotificationService) pollActivityEvents(ctx context.Context) { slog.Info("notification event poller started") defer slog.Info("notification event poller stopped") ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-s.done: return case <-ctx.Done(): return case <-ticker.C: s.pollOnce(ctx) } } } func (s *NotificationService) pollOnce(ctx context.Context) { pollCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() // Get cursor var lastID int64 var lastCreatedAt *time.Time err := s.db.Pool.QueryRow(pollCtx, `SELECT last_event_id, last_event_created_at FROM notification_state WHERE id = 1`). Scan(&lastID, &lastCreatedAt) if err != nil { slog.Error("failed to read notification state", "error", err) return } type eventRow struct { ID int64 EventType string Title string CreatedAt time.Time } var events []eventRow var rows pgx.Rows if lastCreatedAt != nil { rows, err = s.db.Pool.Query(pollCtx, `SELECT id, event_type, title, created_at FROM activity_events WHERE (created_at, id) > ($1, $2) ORDER BY created_at, id LIMIT 100`, *lastCreatedAt, lastID) } else { rows, err = s.db.Pool.Query(pollCtx, `SELECT id, event_type, title, created_at FROM activity_events WHERE id > $1 ORDER BY created_at, id LIMIT 100`, lastID) } if err != nil { slog.Error("failed to poll activity events", "error", err) return } defer rows.Close() for rows.Next() { var e eventRow if err := rows.Scan(&e.ID, &e.EventType, &e.Title, &e.CreatedAt); err != nil { continue } events = append(events, e) } if len(events) == 0 { return } // For each event, find subscribers and create queue entries var maxID int64 var maxCreatedAt time.Time for _, e := range events { subscribers, subErr := s.GetSubscribersForEvent(pollCtx, e.EventType) if subErr != nil { slog.Error("failed to get subscribers", "error", subErr, "event_type", e.EventType) continue } message, _ := json.Marshal(map[string]interface{}{ "event_type": e.EventType, "title": e.Title, }) for _, ch := range subscribers { _, qErr := s.db.Pool.Exec(pollCtx, `INSERT INTO notification_queue (channel_id, event_type, title, message) VALUES ($1, $2, $3, $4)`, ch.ID, e.EventType, e.Title, message) if qErr != nil { slog.Error("failed to enqueue notification", "error", qErr, "channel", ch.Name) } } if e.ID > maxID { maxID = e.ID maxCreatedAt = e.CreatedAt } } // Update cursor _, err = s.db.Pool.Exec(pollCtx, `UPDATE notification_state SET last_event_id = $1, last_event_created_at = $2 WHERE id = 1`, maxID, maxCreatedAt) if err != nil { slog.Error("failed to update notification state", "error", err) } } func (s *NotificationService) processQueue(ctx context.Context) { slog.Info("notification queue processor started") defer slog.Info("notification queue processor stopped") ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-s.done: return case <-ctx.Done(): return case <-ticker.C: s.processQueueBatch(ctx) } } } func (s *NotificationService) processQueueBatch(ctx context.Context) { batchCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() rows, err := s.db.Pool.Query(batchCtx, `SELECT q.id, q.channel_id, q.event_type, q.title, q.message, q.status, q.attempts, c.name AS channel_name, c.type AS channel_type, c.config AS channel_config FROM notification_queue q JOIN notification_channels c ON q.channel_id = c.id WHERE q.status IN ('pending', 'failed') AND (q.next_retry_at IS NULL OR q.next_retry_at <= NOW()) AND q.attempts < q.max_attempts LIMIT 50`) if err != nil { slog.Error("failed to query notification queue", "error", err) return } defer rows.Close() type queueItem struct { ID int64 ChannelID int64 EventType string Title string Message json.RawMessage Status string Attempts int ChannelName string ChannelType string ChannelConfig json.RawMessage } var items []queueItem for rows.Next() { var q queueItem if err := rows.Scan(&q.ID, &q.ChannelID, &q.EventType, &q.Title, &q.Message, &q.Status, &q.Attempts, &q.ChannelName, &q.ChannelType, &q.ChannelConfig); err != nil { continue } items = append(items, q) } for _, q := range items { deliverCtx, deliverCancel := context.WithTimeout(batchCtx, 15*time.Second) var deliverErr error switch q.ChannelType { case "webhook": var configMap map[string]interface{} json.Unmarshal(q.ChannelConfig, &configMap) webhookURL, _ := configMap["url"].(string) var payload map[string]interface{} json.Unmarshal(q.Message, &payload) deliverErr = s.DeliverWebhook(deliverCtx, webhookURL, payload) case "telegram": var configMap map[string]interface{} json.Unmarshal(q.ChannelConfig, &configMap) botToken, _ := configMap["bot_token"].(string) chatID, _ := configMap["chat_id"].(string) var msg map[string]interface{} json.Unmarshal(q.Message, &msg) title, _ := msg["title"].(string) text := fmt.Sprintf("%s\n%s", q.EventType, title) deliverErr = s.DeliverTelegram(deliverCtx, botToken, chatID, text) } deliverCancel() newAttempts := q.Attempts + 1 if deliverErr == nil { _, err := s.db.Pool.Exec(batchCtx, `UPDATE notification_queue SET status = 'delivered', attempts = $1, delivered_at = NOW() WHERE id = $2`, newAttempts, q.ID) if err != nil { slog.Error("failed to update queue entry", "error", err) } slog.Info("notification delivered", "channel", q.ChannelName, "event", q.EventType) } else { var nextRetry *time.Time var newStatus string = "failed" errMsg := deliverErr.Error() if newAttempts >= 5 { newStatus = "dead" slog.Warn("notification dead-lettered", "channel", q.ChannelName, "attempts", newAttempts) } else { backoff := calculateBackoff(newAttempts) t := time.Now().Add(backoff) nextRetry = &t } _, err := s.db.Pool.Exec(batchCtx, `UPDATE notification_queue SET status = $1, attempts = $2, last_error = $3, next_retry_at = $4 WHERE id = $5`, newStatus, newAttempts, errMsg, nextRetry, q.ID) if err != nil { slog.Error("failed to update queue entry", "error", err) } slog.Error("notification delivery failed", "channel", q.ChannelName, "type", q.ChannelType, "attempts", newAttempts) } } }