import fs from "node:fs"; import path from "node:path"; import { all, get, run } from "@/lib/db"; import { TASK_TEMPLATES } from "@/lib/fleet-config"; import type { AgentFamily, DispatchMethod, DispatchState, TaskEvent, TaskEventType, TaskPriority, TaskRecord, TaskStatus, TaskTemplate, } from "@/lib/types"; const VALID_STATUSES: TaskStatus[] = ["Backlog", "Todo", "In Progress", "Review", "Done"]; const VALID_PRIORITIES: TaskPriority[] = ["Low", "Medium", "High", "Critical"]; const VALID_FAMILIES: AgentFamily[] = ["openclaw", "zeroclaw"]; const VALID_DISPATCH_METHODS: DispatchMethod[] = ["manual", "openclaw-swarm", "zeroclaw-webhook"]; const VALID_DISPATCH_STATES: DispatchState[] = [ "planned", "assigned", "dispatched", "acknowledged", "completed", "failed", ]; const WIKI_DIR = process.env.WIKI_DIR || path.join(process.cwd(), "wiki"); fs.mkdirSync(WIKI_DIR, { recursive: true }); type DatabaseTaskRow = Omit & { tags: string }; function parseTags(raw: string) { try { const parsed = JSON.parse(raw || "[]"); return Array.isArray(parsed) ? parsed.filter((tag) => typeof tag === "string") : []; } catch { return []; } } function extractTagValue(tags: string[], prefix: string) { const match = tags.find((tag) => tag.startsWith(prefix)); return match ? match.slice(prefix.length) : null; } function deriveDispatchState(task: Partial, existing?: TaskRecord): DispatchState { if (task.dispatch_state && VALID_DISPATCH_STATES.includes(task.dispatch_state)) { return task.dispatch_state; } if (task.status === "Done") { return "completed"; } const status = task.status ?? existing?.status; const priorState = existing?.dispatch_state ?? "planned"; if (status === "In Progress" || status === "Review") { return priorState === "failed" ? priorState : "acknowledged"; } if (status === "Todo" && (priorState === "planned" || priorState === "assigned")) { return existing?.assignee || task.assignee ? "assigned" : "planned"; } return existing?.assignee || task.assignee ? priorState === "planned" ? "assigned" : priorState : "planned"; } function deriveAcknowledgedAt( nextState: DispatchState, existing?: TaskRecord, explicitValue?: string | null, ) { if (explicitValue !== undefined) { return explicitValue; } if (nextState === "acknowledged" || nextState === "completed") { return existing?.acknowledged_at || new Date().toISOString(); } return null; } function normalizeNullableString(value: unknown) { return typeof value === "string" && value.trim().length > 0 ? value.trim() : null; } export function normalizeTask(row: DatabaseTaskRow): TaskRecord { return { ...row, tags: parseTags(row.tags), family: row.family || null, target_host: row.target_host || "", target_channel: row.target_channel || "", dispatch_method: row.dispatch_method || "manual", dispatch_state: row.dispatch_state || "planned", template_key: row.template_key || null, repo_slug: row.repo_slug || null, base_branch: row.base_branch || null, preferred_agent: row.preferred_agent || null, reasoning_effort: row.reasoning_effort || null, model_hint: row.model_hint || null, last_dispatch_at: row.last_dispatch_at || null, acknowledged_at: row.acknowledged_at || null, last_error: row.last_error || null, }; } export async function listTasks() { const rows = await all( `SELECT * FROM tasks ORDER BY CASE dispatch_state WHEN 'failed' THEN 0 ELSE 1 END, CASE status WHEN 'In Progress' THEN 0 WHEN 'Review' THEN 1 WHEN 'Todo' THEN 2 WHEN 'Backlog' THEN 3 ELSE 4 END, id DESC`, ); return rows.map(normalizeTask); } export async function listFailedTasks() { const rows = await all( "SELECT * FROM tasks WHERE dispatch_state = 'failed' ORDER BY updated_at DESC", ); return rows.map(normalizeTask); } export async function findTask(id: number) { const row = await get("SELECT * FROM tasks WHERE id = ?", [id]); return row ? normalizeTask(row) : null; } export function validateTaskPayload(payload: Partial & { tags?: unknown }, partial = false) { const errors: string[] = []; if (!partial || payload.title !== undefined) { if (typeof payload.title !== "string" || payload.title.trim().length === 0) { errors.push("title is required"); } } if (payload.status !== undefined && !VALID_STATUSES.includes(payload.status)) { errors.push(`status must be one of: ${VALID_STATUSES.join(", ")}`); } if (payload.priority !== undefined && !VALID_PRIORITIES.includes(payload.priority)) { errors.push(`priority must be one of: ${VALID_PRIORITIES.join(", ")}`); } if (payload.family !== undefined && payload.family !== null && !VALID_FAMILIES.includes(payload.family)) { errors.push(`family must be one of: ${VALID_FAMILIES.join(", ")}`); } if (payload.dispatch_method !== undefined && !VALID_DISPATCH_METHODS.includes(payload.dispatch_method)) { errors.push(`dispatch_method must be one of: ${VALID_DISPATCH_METHODS.join(", ")}`); } if (payload.dispatch_state !== undefined && !VALID_DISPATCH_STATES.includes(payload.dispatch_state)) { errors.push(`dispatch_state must be one of: ${VALID_DISPATCH_STATES.join(", ")}`); } if (payload.tags !== undefined && !Array.isArray(payload.tags)) { errors.push("tags must be an array of strings"); } return errors; } function buildWikiMarkdown(task: TaskRecord) { const renderedTags = task.tags.length ? task.tags.join(", ") : "None"; return `# ${task.title} - Task ID: ${task.id} - Assignee: ${task.assignee || "Unassigned"} - Priority: ${task.priority} - Status: ${task.status} - Dispatch: ${task.dispatch_method} / ${task.dispatch_state} - Host: ${task.target_host || "n/a"} - Channel: ${task.target_channel || "n/a"} - Tags: ${renderedTags} - Created: ${task.created_at} - Completed: ${task.completed_at || new Date().toISOString()} ## Description ${task.description || "No description provided."} `; } async function writeWikiForTask(task: TaskRecord) { const safeTitle = task.title .toLowerCase() .replace(/[^a-z0-9]+/g, "-") .replace(/^-+|-+$/g, "") .slice(0, 80); const fileName = `${new Date().toISOString().slice(0, 10)}-task-${task.id}-${safeTitle || `task-${task.id}`}.md`; fs.writeFileSync(path.join(WIKI_DIR, fileName), buildWikiMarkdown(task), "utf8"); } export async function appendTaskEvent(input: { taskId: number; assignee?: string; family?: AgentFamily | null; host?: string; eventType: TaskEventType; state?: DispatchState | null; summary: string; detail?: string; }) { await run( `INSERT INTO task_events (task_id, assignee, family, host, event_type, state, summary, detail) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, [ input.taskId, input.assignee || "", input.family || null, input.host || "", input.eventType, input.state || null, input.summary, input.detail || "", ], ); } export async function listTaskEvents(taskId?: number, limit = 50) { const rows = taskId ? await all( "SELECT * FROM task_events WHERE task_id = ? ORDER BY created_at DESC LIMIT ?", [taskId, limit], ) : await all("SELECT * FROM task_events ORDER BY created_at DESC LIMIT ?", [limit]); return rows; } export async function listTaskTemplates(): Promise { return TASK_TEMPLATES; } export async function createTask(input: Partial) { const tags = Array.isArray(input.tags) ? input.tags.filter((tag) => typeof tag === "string") : []; const dispatchState = deriveDispatchState(input); const result = await run( `INSERT INTO tasks ( title, description, assignee, family, target_host, target_channel, dispatch_method, dispatch_state, template_key, repo_slug, base_branch, preferred_agent, reasoning_effort, model_hint, priority, status, tags, last_dispatch_at, acknowledged_at, last_error ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, [ input.title?.trim() || "", input.description || "", input.assignee || "", input.family || null, input.target_host || "", input.target_channel || "", input.dispatch_method || "manual", dispatchState, normalizeNullableString(input.template_key), normalizeNullableString(input.repo_slug) || extractTagValue(tags, "repo:"), normalizeNullableString(input.base_branch) || extractTagValue(tags, "base:"), normalizeNullableString(input.preferred_agent) || extractTagValue(tags, "agent:"), normalizeNullableString(input.reasoning_effort) || extractTagValue(tags, "reasoning:"), normalizeNullableString(input.model_hint) || extractTagValue(tags, "model:"), input.priority || "Medium", input.status || "Backlog", JSON.stringify(tags), input.last_dispatch_at || null, deriveAcknowledgedAt(dispatchState), normalizeNullableString(input.last_error), ], ); const task = await findTask(result.lastID); if (!task) { throw new Error("failed_to_fetch_created_task"); } await appendTaskEvent({ taskId: task.id, assignee: task.assignee, family: task.family, host: task.target_host, eventType: "created", state: task.dispatch_state, summary: `Task created for ${task.assignee || "unassigned"} flow`, detail: task.description, }); return task; } export async function updateTask(id: number, input: Partial) { const existing = await findTask(id); if (!existing) { return null; } const mergedTags = Array.isArray(input.tags) ? input.tags.filter((tag) => typeof tag === "string") : existing.tags; const nextStatus = input.status ?? existing.status; const nextDispatchState = deriveDispatchState({ ...existing, ...input, tags: mergedTags }, existing); const completedAt = nextStatus === "Done" ? existing.completed_at || new Date().toISOString() : null; const acknowledgedAt = deriveAcknowledgedAt(nextDispatchState, existing, input.acknowledged_at); await run( `UPDATE tasks SET title = ?, description = ?, assignee = ?, family = ?, target_host = ?, target_channel = ?, dispatch_method = ?, dispatch_state = ?, template_key = ?, repo_slug = ?, base_branch = ?, preferred_agent = ?, reasoning_effort = ?, model_hint = ?, priority = ?, status = ?, tags = ?, last_dispatch_at = ?, acknowledged_at = ?, last_error = ?, completed_at = ?, updated_at = datetime('now') WHERE id = ?`, [ input.title?.trim() || existing.title, input.description ?? existing.description, input.assignee ?? existing.assignee, input.family ?? existing.family, input.target_host ?? existing.target_host, input.target_channel ?? existing.target_channel, input.dispatch_method ?? existing.dispatch_method, nextDispatchState, input.template_key ?? existing.template_key, input.repo_slug ?? existing.repo_slug, input.base_branch ?? existing.base_branch, input.preferred_agent ?? existing.preferred_agent, input.reasoning_effort ?? existing.reasoning_effort, input.model_hint ?? existing.model_hint, input.priority ?? existing.priority, nextStatus, JSON.stringify(mergedTags), input.last_dispatch_at ?? existing.last_dispatch_at, acknowledgedAt, input.last_error ?? existing.last_error, completedAt, id, ], ); const updated = await findTask(id); if (!updated) { throw new Error("failed_to_fetch_updated_task"); } const eventType: TaskEventType = updated.dispatch_state === "acknowledged" && existing.dispatch_state !== "acknowledged" ? "acknowledged" : updated.status !== existing.status ? "status_changed" : "updated"; await appendTaskEvent({ taskId: updated.id, assignee: updated.assignee, family: updated.family, host: updated.target_host, eventType, state: updated.dispatch_state, summary: `${eventType.replace(/_/g, " ")} -> ${updated.status} / ${updated.dispatch_state}`, detail: updated.description, }); if (nextStatus === "Done" && existing.status !== "Done") { await writeWikiForTask(updated); } return updated; }