diff --git a/README.md b/README.md index a6d3545..b033a3b 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ It tracks and visualizes: - direct SSH dispatch for `pve`, `truenas`, and `panda` - task callback API for remote completion/result sync - OpenClaw registry sync API for swarm task state reconciliation +- heartbeat pickup API at `/api/heartbeat/{agent}` for queue inspection and self-dispatch - failure queue and dispatch history views - family-specific runtime views for OpenClaw and ZeroClaw plus unified direct-host visibility - architecture documentation rendered directly from tracked config @@ -99,6 +100,24 @@ It tracks and visualizes: - `DIRECT_SSH_KEY_PATH` - `DIRECT_SSH_TIMEOUT_MS` +## Heartbeat Pickup + +Configured OpenClaw and ZeroClaw runtimes can hit: + +```bash +curl -s http://127.0.0.1:8395/api/heartbeat/ +``` + +The heartbeat endpoint will: + +- sync OpenClaw swarm state before scheduling +- inspect the agent's assigned tasks +- skip tasks blocked by `depends-on:` or `dependency:` tags +- auto-dispatch the next runnable task when the agent does not already have an active unblocked task +- return queue state, blocked items, and any task that was dispatched during the heartbeat + +This is the canonical path for agent-driven task pickup. Assignment alone does not start work; heartbeat pickup or an explicit dispatch does. + ## Development ```bash diff --git a/app/api/heartbeat/[agent]/route.ts b/app/api/heartbeat/[agent]/route.ts new file mode 100644 index 0000000..67022ea --- /dev/null +++ b/app/api/heartbeat/[agent]/route.ts @@ -0,0 +1,19 @@ +import { NextResponse } from "next/server"; + +import { processAgentHeartbeat } from "@/lib/heartbeat"; + +export async function GET( + _request: Request, + { params }: { params: Promise<{ agent: string }> }, +) { + const { agent } = await params; + + try { + const result = await processAgentHeartbeat(agent); + return NextResponse.json(result); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + const status = message === "agent_not_found" ? 404 : 500; + return NextResponse.json({ error: message }, { status }); + } +} diff --git a/lib/agents.ts b/lib/agents.ts index 34eb1f7..be7880c 100644 --- a/lib/agents.ts +++ b/lib/agents.ts @@ -96,7 +96,7 @@ function readWorkspaceAgent(agentRoot: string, fallbackName: string) { const heartbeatAt = deriveHeartbeatTimestamp(heartbeatPath, heartbeatMd); return { - files: ["AGENTS.md", "TOOLS.md", "IDENTITY.md"].filter((fileName) => + files: ["AGENTS.md", "TOOLS.md", "IDENTITY.md", "HEARTBEAT.md"].filter((fileName) => fs.existsSync(path.join(workspaceRoot, fileName)), ), tools, diff --git a/lib/heartbeat.ts b/lib/heartbeat.ts new file mode 100644 index 0000000..bab4ff3 --- /dev/null +++ b/lib/heartbeat.ts @@ -0,0 +1,166 @@ +import { dispatchTask } from "@/lib/dispatch"; +import { findAgentByAssignmentKey } from "@/lib/agents"; +import { syncOpenClawTasks } from "@/lib/openclaw-sync"; +import { appendTaskEvent, findTask, listTasksForAssignee } from "@/lib/tasks"; +import type { FleetAgent, TaskRecord } from "@/lib/types"; + +type HeartbeatTaskSummary = Pick< + TaskRecord, + "id" | "title" | "status" | "dispatch_state" | "priority" | "assignee" | "target_host" +> & { + blocked: boolean; + blockedBy: number[]; +}; + +type AgentHeartbeatResult = { + agent: { + slug: string; + assignmentKey: string; + family: FleetAgent["family"]; + host: string; + heartbeatAt: string | null; + currentTask: string | null; + }; + pending_tasks: number; + active_tasks: number; + blocked_tasks: number; + dispatched_task: HeartbeatTaskSummary | null; + tasks: HeartbeatTaskSummary[]; + notes: string[]; +}; + +function extractDependencyIds(task: TaskRecord) { + const values = task.tags + .filter((tag) => tag.startsWith("depends-on:") || tag.startsWith("dependency:")) + .flatMap((tag) => tag.split(":").slice(1)) + .flatMap((value) => value.split(/[|,]/)) + .map((value) => Number(value.trim())) + .filter((value) => Number.isInteger(value) && value > 0); + + return [...new Set(values)]; +} + +function getTaskPriorityWeight(priority: TaskRecord["priority"]) { + switch (priority) { + case "Critical": + return 0; + case "High": + return 1; + case "Medium": + return 2; + default: + return 3; + } +} + +function isActiveTask(task: Pick) { + return ["dispatched", "acknowledged"].includes(task.dispatch_state) || ["In Progress", "Review"].includes(task.status); +} + +function isTaskRunnable(task: Pick) { + return ["Backlog", "Todo"].includes(task.status) && ["planned", "assigned", "failed"].includes(task.dispatch_state); +} + +function sortTasks(tasks: TaskRecord[]) { + return [...tasks].sort((left, right) => { + const priorityDelta = getTaskPriorityWeight(left.priority) - getTaskPriorityWeight(right.priority); + if (priorityDelta !== 0) { + return priorityDelta; + } + return left.id - right.id; + }); +} + +export async function processAgentHeartbeat(assignmentKey: string): Promise { + const agent = await findAgentByAssignmentKey(assignmentKey); + if (!agent) { + throw new Error("agent_not_found"); + } + + if (agent.family === "openclaw") { + await syncOpenClawTasks(); + } + + const assignedTasks = sortTasks(await listTasksForAssignee(agent.aliases, { includeDone: false })); + const taskSummaries = await Promise.all( + assignedTasks.map(async (task) => { + const blockedBy = ( + await Promise.all( + extractDependencyIds(task).map(async (dependencyId) => { + const dependency = await findTask(dependencyId); + return dependency?.status === "Done" ? null : dependencyId; + }), + ) + ).filter((dependencyId): dependencyId is number => dependencyId !== null); + + return { + id: task.id, + title: task.title, + status: task.status, + dispatch_state: task.dispatch_state, + priority: task.priority, + assignee: task.assignee, + target_host: task.target_host, + blocked: blockedBy.length > 0, + blockedBy, + } satisfies HeartbeatTaskSummary; + }), + ); + + const activeTasks = taskSummaries.filter((task) => isActiveTask(task) && !task.blocked); + const runnableTask = taskSummaries.find((task) => isTaskRunnable(task) && !task.blocked) || null; + const blockedTasks = taskSummaries.filter((task) => task.blocked); + const notes: string[] = []; + let dispatchedTask: HeartbeatTaskSummary | null = null; + + if (blockedTasks.length > 0) { + notes.push(`${blockedTasks.length} blocked task(s) waiting on dependencies`); + } + + if (activeTasks.length === 0 && runnableTask) { + const updatedTask = await dispatchTask(runnableTask.id); + dispatchedTask = { + id: updatedTask.id, + title: updatedTask.title, + status: updatedTask.status, + dispatch_state: updatedTask.dispatch_state, + priority: updatedTask.priority, + assignee: updatedTask.assignee, + target_host: updatedTask.target_host, + blocked: false, + blockedBy: [], + }; + notes.push(`Auto-dispatched task #${updatedTask.id}`); + await appendTaskEvent({ + taskId: updatedTask.id, + assignee: updatedTask.assignee, + family: updatedTask.family, + host: updatedTask.target_host, + eventType: "updated", + state: updatedTask.dispatch_state, + summary: `Heartbeat auto-dispatched ${agent.name}`, + detail: `Triggered from /api/heartbeat/${assignmentKey}`, + }); + } else if (activeTasks.length > 0) { + notes.push(`${activeTasks.length} active task(s) already in progress`); + } else if (!runnableTask && assignedTasks.length > 0) { + notes.push("No runnable tasks available for dispatch"); + } + + return { + agent: { + slug: agent.slug, + assignmentKey: agent.assignmentKey, + family: agent.family, + host: agent.host, + heartbeatAt: agent.heartbeatAt, + currentTask: agent.currentTask, + }, + pending_tasks: taskSummaries.filter((task) => !["Done"].includes(task.status)).length, + active_tasks: activeTasks.length, + blocked_tasks: blockedTasks.length, + dispatched_task: dispatchedTask, + tasks: taskSummaries, + notes, + }; +} diff --git a/lib/tasks.ts b/lib/tasks.ts index f165208..366b046 100644 --- a/lib/tasks.ts +++ b/lib/tasks.ts @@ -243,6 +243,45 @@ export async function listTaskEvents(taskId?: number, limit = 50) { return rows; } +export async function listTasksForAssignee(assigneeAliases: string[], options?: { + includeDone?: boolean; +}) { + if (assigneeAliases.length === 0) { + return [] as TaskRecord[]; + } + + const placeholders = assigneeAliases.map(() => "?").join(", "); + const params: unknown[] = [...assigneeAliases]; + const clauses = [`assignee IN (${placeholders})`]; + if (!options?.includeDone) { + clauses.push("status != 'Done'"); + } + + const rows = await all( + `SELECT * FROM tasks + WHERE ${clauses.join(" AND ")} + ORDER BY + CASE status + WHEN 'In Progress' THEN 0 + WHEN 'Review' THEN 1 + WHEN 'Todo' THEN 2 + WHEN 'Backlog' THEN 3 + ELSE 4 + END, + CASE priority + WHEN 'Critical' THEN 0 + WHEN 'High' THEN 1 + WHEN 'Medium' THEN 2 + ELSE 3 + END, + created_at ASC, + id ASC`, + params, + ); + + return rows.map(normalizeTask); +} + export async function listTaskTemplates(): Promise { return TASK_TEMPLATES; }