Files
openclaw-taskboard/lib/tasks.ts

376 lines
12 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import { all, get, run } from "@/lib/db";
import { TASK_TEMPLATES } from "@/lib/fleet-config";
import type {
AgentFamily,
DispatchMethod,
DispatchState,
TaskEvent,
TaskEventType,
TaskPriority,
TaskRecord,
TaskStatus,
TaskTemplate,
} from "@/lib/types";
const VALID_STATUSES: TaskStatus[] = ["Backlog", "Todo", "In Progress", "Review", "Done"];
const VALID_PRIORITIES: TaskPriority[] = ["Low", "Medium", "High", "Critical"];
const VALID_FAMILIES: AgentFamily[] = ["openclaw", "zeroclaw"];
const VALID_DISPATCH_METHODS: DispatchMethod[] = ["manual", "openclaw-swarm", "zeroclaw-webhook"];
const VALID_DISPATCH_STATES: DispatchState[] = [
"planned",
"assigned",
"dispatched",
"acknowledged",
"completed",
"failed",
];
const WIKI_DIR = process.env.WIKI_DIR || path.join(process.cwd(), "wiki");
fs.mkdirSync(WIKI_DIR, { recursive: true });
type DatabaseTaskRow = Omit<TaskRecord, "tags"> & { tags: string };
function parseTags(raw: string) {
try {
const parsed = JSON.parse(raw || "[]");
return Array.isArray(parsed) ? parsed.filter((tag) => typeof tag === "string") : [];
} catch {
return [];
}
}
function extractTagValue(tags: string[], prefix: string) {
const match = tags.find((tag) => tag.startsWith(prefix));
return match ? match.slice(prefix.length) : null;
}
function deriveDispatchState(task: Partial<TaskRecord>, existing?: TaskRecord): DispatchState {
if (task.dispatch_state && VALID_DISPATCH_STATES.includes(task.dispatch_state)) {
return task.dispatch_state;
}
if (task.status === "Done") {
return "completed";
}
const status = task.status ?? existing?.status;
const priorState = existing?.dispatch_state ?? "planned";
if (status === "In Progress" || status === "Review") {
return priorState === "failed" ? priorState : "acknowledged";
}
if (status === "Todo" && (priorState === "planned" || priorState === "assigned")) {
return existing?.assignee || task.assignee ? "assigned" : "planned";
}
return existing?.assignee || task.assignee ? priorState === "planned" ? "assigned" : priorState : "planned";
}
function deriveAcknowledgedAt(
nextState: DispatchState,
existing?: TaskRecord,
explicitValue?: string | null,
) {
if (explicitValue !== undefined) {
return explicitValue;
}
if (nextState === "acknowledged" || nextState === "completed") {
return existing?.acknowledged_at || new Date().toISOString();
}
return null;
}
function normalizeNullableString(value: unknown) {
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
}
export function normalizeTask(row: DatabaseTaskRow): TaskRecord {
return {
...row,
tags: parseTags(row.tags),
family: row.family || null,
target_host: row.target_host || "",
target_channel: row.target_channel || "",
dispatch_method: row.dispatch_method || "manual",
dispatch_state: row.dispatch_state || "planned",
template_key: row.template_key || null,
repo_slug: row.repo_slug || null,
base_branch: row.base_branch || null,
preferred_agent: row.preferred_agent || null,
reasoning_effort: row.reasoning_effort || null,
model_hint: row.model_hint || null,
last_dispatch_at: row.last_dispatch_at || null,
acknowledged_at: row.acknowledged_at || null,
last_error: row.last_error || null,
};
}
export async function listTasks() {
const rows = await all<DatabaseTaskRow>(
`SELECT * FROM tasks
ORDER BY
CASE dispatch_state WHEN 'failed' THEN 0 ELSE 1 END,
CASE status
WHEN 'In Progress' THEN 0
WHEN 'Review' THEN 1
WHEN 'Todo' THEN 2
WHEN 'Backlog' THEN 3
ELSE 4
END,
id DESC`,
);
return rows.map(normalizeTask);
}
export async function listFailedTasks() {
const rows = await all<DatabaseTaskRow>(
"SELECT * FROM tasks WHERE dispatch_state = 'failed' ORDER BY updated_at DESC",
);
return rows.map(normalizeTask);
}
export async function findTask(id: number) {
const row = await get<DatabaseTaskRow>("SELECT * FROM tasks WHERE id = ?", [id]);
return row ? normalizeTask(row) : null;
}
export function validateTaskPayload(payload: Partial<TaskRecord> & { tags?: unknown }, partial = false) {
const errors: string[] = [];
if (!partial || payload.title !== undefined) {
if (typeof payload.title !== "string" || payload.title.trim().length === 0) {
errors.push("title is required");
}
}
if (payload.status !== undefined && !VALID_STATUSES.includes(payload.status)) {
errors.push(`status must be one of: ${VALID_STATUSES.join(", ")}`);
}
if (payload.priority !== undefined && !VALID_PRIORITIES.includes(payload.priority)) {
errors.push(`priority must be one of: ${VALID_PRIORITIES.join(", ")}`);
}
if (payload.family !== undefined && payload.family !== null && !VALID_FAMILIES.includes(payload.family)) {
errors.push(`family must be one of: ${VALID_FAMILIES.join(", ")}`);
}
if (payload.dispatch_method !== undefined && !VALID_DISPATCH_METHODS.includes(payload.dispatch_method)) {
errors.push(`dispatch_method must be one of: ${VALID_DISPATCH_METHODS.join(", ")}`);
}
if (payload.dispatch_state !== undefined && !VALID_DISPATCH_STATES.includes(payload.dispatch_state)) {
errors.push(`dispatch_state must be one of: ${VALID_DISPATCH_STATES.join(", ")}`);
}
if (payload.tags !== undefined && !Array.isArray(payload.tags)) {
errors.push("tags must be an array of strings");
}
return errors;
}
function buildWikiMarkdown(task: TaskRecord) {
const renderedTags = task.tags.length ? task.tags.join(", ") : "None";
return `# ${task.title}
- Task ID: ${task.id}
- Assignee: ${task.assignee || "Unassigned"}
- Priority: ${task.priority}
- Status: ${task.status}
- Dispatch: ${task.dispatch_method} / ${task.dispatch_state}
- Host: ${task.target_host || "n/a"}
- Channel: ${task.target_channel || "n/a"}
- Tags: ${renderedTags}
- Created: ${task.created_at}
- Completed: ${task.completed_at || new Date().toISOString()}
## Description
${task.description || "No description provided."}
`;
}
async function writeWikiForTask(task: TaskRecord) {
const safeTitle = task.title
.toLowerCase()
.replace(/[^a-z0-9]+/g, "-")
.replace(/^-+|-+$/g, "")
.slice(0, 80);
const fileName = `${new Date().toISOString().slice(0, 10)}-task-${task.id}-${safeTitle || `task-${task.id}`}.md`;
fs.writeFileSync(path.join(WIKI_DIR, fileName), buildWikiMarkdown(task), "utf8");
}
export async function appendTaskEvent(input: {
taskId: number;
assignee?: string;
family?: AgentFamily | null;
host?: string;
eventType: TaskEventType;
state?: DispatchState | null;
summary: string;
detail?: string;
}) {
await run(
`INSERT INTO task_events (task_id, assignee, family, host, event_type, state, summary, detail)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
[
input.taskId,
input.assignee || "",
input.family || null,
input.host || "",
input.eventType,
input.state || null,
input.summary,
input.detail || "",
],
);
}
export async function listTaskEvents(taskId?: number, limit = 50) {
const rows = taskId
? await all<TaskEvent>(
"SELECT * FROM task_events WHERE task_id = ? ORDER BY created_at DESC LIMIT ?",
[taskId, limit],
)
: await all<TaskEvent>("SELECT * FROM task_events ORDER BY created_at DESC LIMIT ?", [limit]);
return rows;
}
export async function listTaskTemplates(): Promise<TaskTemplate[]> {
return TASK_TEMPLATES;
}
export async function createTask(input: Partial<TaskRecord>) {
const tags = Array.isArray(input.tags) ? input.tags.filter((tag) => typeof tag === "string") : [];
const dispatchState = deriveDispatchState(input);
const result = await run(
`INSERT INTO tasks (
title, description, assignee, family, target_host, target_channel,
dispatch_method, dispatch_state, template_key, repo_slug, base_branch,
preferred_agent, reasoning_effort, model_hint, priority, status, tags,
last_dispatch_at, acknowledged_at, last_error
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
[
input.title?.trim() || "",
input.description || "",
input.assignee || "",
input.family || null,
input.target_host || "",
input.target_channel || "",
input.dispatch_method || "manual",
dispatchState,
normalizeNullableString(input.template_key),
normalizeNullableString(input.repo_slug) || extractTagValue(tags, "repo:"),
normalizeNullableString(input.base_branch) || extractTagValue(tags, "base:"),
normalizeNullableString(input.preferred_agent) || extractTagValue(tags, "agent:"),
normalizeNullableString(input.reasoning_effort) || extractTagValue(tags, "reasoning:"),
normalizeNullableString(input.model_hint) || extractTagValue(tags, "model:"),
input.priority || "Medium",
input.status || "Backlog",
JSON.stringify(tags),
input.last_dispatch_at || null,
deriveAcknowledgedAt(dispatchState),
normalizeNullableString(input.last_error),
],
);
const task = await findTask(result.lastID);
if (!task) {
throw new Error("failed_to_fetch_created_task");
}
await appendTaskEvent({
taskId: task.id,
assignee: task.assignee,
family: task.family,
host: task.target_host,
eventType: "created",
state: task.dispatch_state,
summary: `Task created for ${task.assignee || "unassigned"} flow`,
detail: task.description,
});
return task;
}
export async function updateTask(id: number, input: Partial<TaskRecord>) {
const existing = await findTask(id);
if (!existing) {
return null;
}
const hasField = <K extends keyof TaskRecord>(field: K) =>
Object.prototype.hasOwnProperty.call(input, field);
const mergedTags = Array.isArray(input.tags) ? input.tags.filter((tag) => typeof tag === "string") : existing.tags;
const nextStatus = input.status ?? existing.status;
const nextDispatchState = deriveDispatchState({ ...existing, ...input, tags: mergedTags }, existing);
const completedAt = nextStatus === "Done" ? existing.completed_at || new Date().toISOString() : null;
const acknowledgedAt = deriveAcknowledgedAt(nextDispatchState, existing, input.acknowledged_at);
await run(
`UPDATE tasks
SET title = ?, description = ?, assignee = ?, family = ?, target_host = ?, target_channel = ?,
dispatch_method = ?, dispatch_state = ?, template_key = ?, repo_slug = ?, base_branch = ?,
preferred_agent = ?, reasoning_effort = ?, model_hint = ?, priority = ?, status = ?, tags = ?,
last_dispatch_at = ?, acknowledged_at = ?, last_error = ?, completed_at = ?, updated_at = datetime('now')
WHERE id = ?`,
[
input.title?.trim() || existing.title,
input.description ?? existing.description,
input.assignee ?? existing.assignee,
input.family ?? existing.family,
input.target_host ?? existing.target_host,
input.target_channel ?? existing.target_channel,
input.dispatch_method ?? existing.dispatch_method,
nextDispatchState,
input.template_key ?? existing.template_key,
input.repo_slug ?? existing.repo_slug,
input.base_branch ?? existing.base_branch,
input.preferred_agent ?? existing.preferred_agent,
input.reasoning_effort ?? existing.reasoning_effort,
input.model_hint ?? existing.model_hint,
input.priority ?? existing.priority,
nextStatus,
JSON.stringify(mergedTags),
hasField("last_dispatch_at") ? input.last_dispatch_at ?? null : existing.last_dispatch_at,
acknowledgedAt,
hasField("last_error") ? input.last_error ?? null : existing.last_error,
completedAt,
id,
],
);
const updated = await findTask(id);
if (!updated) {
throw new Error("failed_to_fetch_updated_task");
}
const eventType: TaskEventType =
updated.dispatch_state === "acknowledged" && existing.dispatch_state !== "acknowledged"
? "acknowledged"
: updated.status !== existing.status
? "status_changed"
: "updated";
await appendTaskEvent({
taskId: updated.id,
assignee: updated.assignee,
family: updated.family,
host: updated.target_host,
eventType,
state: updated.dispatch_state,
summary: `${eventType.replace(/_/g, " ")} -> ${updated.status} / ${updated.dispatch_state}`,
detail: updated.description,
});
if (nextStatus === "Done" && existing.status !== "Done") {
await writeWikiForTask(updated);
}
return updated;
}