[taskboard] add heartbeat task pickup
This commit is contained in:
19
README.md
19
README.md
@@ -39,6 +39,7 @@ It tracks and visualizes:
|
|||||||
- direct SSH dispatch for `pve`, `truenas`, and `panda`
|
- direct SSH dispatch for `pve`, `truenas`, and `panda`
|
||||||
- task callback API for remote completion/result sync
|
- task callback API for remote completion/result sync
|
||||||
- OpenClaw registry sync API for swarm task state reconciliation
|
- OpenClaw registry sync API for swarm task state reconciliation
|
||||||
|
- heartbeat pickup API at `/api/heartbeat/{agent}` for queue inspection and self-dispatch
|
||||||
- failure queue and dispatch history views
|
- failure queue and dispatch history views
|
||||||
- family-specific runtime views for OpenClaw and ZeroClaw plus unified direct-host visibility
|
- family-specific runtime views for OpenClaw and ZeroClaw plus unified direct-host visibility
|
||||||
- architecture documentation rendered directly from tracked config
|
- architecture documentation rendered directly from tracked config
|
||||||
@@ -99,6 +100,24 @@ It tracks and visualizes:
|
|||||||
- `DIRECT_SSH_KEY_PATH`
|
- `DIRECT_SSH_KEY_PATH`
|
||||||
- `DIRECT_SSH_TIMEOUT_MS`
|
- `DIRECT_SSH_TIMEOUT_MS`
|
||||||
|
|
||||||
|
## Heartbeat Pickup
|
||||||
|
|
||||||
|
Configured OpenClaw and ZeroClaw runtimes can hit:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -s http://127.0.0.1:8395/api/heartbeat/<agent>
|
||||||
|
```
|
||||||
|
|
||||||
|
The heartbeat endpoint will:
|
||||||
|
|
||||||
|
- sync OpenClaw swarm state before scheduling
|
||||||
|
- inspect the agent's assigned tasks
|
||||||
|
- skip tasks blocked by `depends-on:<task-id>` or `dependency:<task-id>` tags
|
||||||
|
- auto-dispatch the next runnable task when the agent does not already have an active unblocked task
|
||||||
|
- return queue state, blocked items, and any task that was dispatched during the heartbeat
|
||||||
|
|
||||||
|
This is the canonical path for agent-driven task pickup. Assignment alone does not start work; heartbeat pickup or an explicit dispatch does.
|
||||||
|
|
||||||
## Development
|
## Development
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
|||||||
19
app/api/heartbeat/[agent]/route.ts
Normal file
19
app/api/heartbeat/[agent]/route.ts
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import { NextResponse } from "next/server";
|
||||||
|
|
||||||
|
import { processAgentHeartbeat } from "@/lib/heartbeat";
|
||||||
|
|
||||||
|
export async function GET(
|
||||||
|
_request: Request,
|
||||||
|
{ params }: { params: Promise<{ agent: string }> },
|
||||||
|
) {
|
||||||
|
const { agent } = await params;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const result = await processAgentHeartbeat(agent);
|
||||||
|
return NextResponse.json(result);
|
||||||
|
} catch (error) {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
const status = message === "agent_not_found" ? 404 : 500;
|
||||||
|
return NextResponse.json({ error: message }, { status });
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -96,7 +96,7 @@ function readWorkspaceAgent(agentRoot: string, fallbackName: string) {
|
|||||||
const heartbeatAt = deriveHeartbeatTimestamp(heartbeatPath, heartbeatMd);
|
const heartbeatAt = deriveHeartbeatTimestamp(heartbeatPath, heartbeatMd);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
files: ["AGENTS.md", "TOOLS.md", "IDENTITY.md"].filter((fileName) =>
|
files: ["AGENTS.md", "TOOLS.md", "IDENTITY.md", "HEARTBEAT.md"].filter((fileName) =>
|
||||||
fs.existsSync(path.join(workspaceRoot, fileName)),
|
fs.existsSync(path.join(workspaceRoot, fileName)),
|
||||||
),
|
),
|
||||||
tools,
|
tools,
|
||||||
|
|||||||
166
lib/heartbeat.ts
Normal file
166
lib/heartbeat.ts
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
import { dispatchTask } from "@/lib/dispatch";
|
||||||
|
import { findAgentByAssignmentKey } from "@/lib/agents";
|
||||||
|
import { syncOpenClawTasks } from "@/lib/openclaw-sync";
|
||||||
|
import { appendTaskEvent, findTask, listTasksForAssignee } from "@/lib/tasks";
|
||||||
|
import type { FleetAgent, TaskRecord } from "@/lib/types";
|
||||||
|
|
||||||
|
type HeartbeatTaskSummary = Pick<
|
||||||
|
TaskRecord,
|
||||||
|
"id" | "title" | "status" | "dispatch_state" | "priority" | "assignee" | "target_host"
|
||||||
|
> & {
|
||||||
|
blocked: boolean;
|
||||||
|
blockedBy: number[];
|
||||||
|
};
|
||||||
|
|
||||||
|
type AgentHeartbeatResult = {
|
||||||
|
agent: {
|
||||||
|
slug: string;
|
||||||
|
assignmentKey: string;
|
||||||
|
family: FleetAgent["family"];
|
||||||
|
host: string;
|
||||||
|
heartbeatAt: string | null;
|
||||||
|
currentTask: string | null;
|
||||||
|
};
|
||||||
|
pending_tasks: number;
|
||||||
|
active_tasks: number;
|
||||||
|
blocked_tasks: number;
|
||||||
|
dispatched_task: HeartbeatTaskSummary | null;
|
||||||
|
tasks: HeartbeatTaskSummary[];
|
||||||
|
notes: string[];
|
||||||
|
};
|
||||||
|
|
||||||
|
function extractDependencyIds(task: TaskRecord) {
|
||||||
|
const values = task.tags
|
||||||
|
.filter((tag) => tag.startsWith("depends-on:") || tag.startsWith("dependency:"))
|
||||||
|
.flatMap((tag) => tag.split(":").slice(1))
|
||||||
|
.flatMap((value) => value.split(/[|,]/))
|
||||||
|
.map((value) => Number(value.trim()))
|
||||||
|
.filter((value) => Number.isInteger(value) && value > 0);
|
||||||
|
|
||||||
|
return [...new Set(values)];
|
||||||
|
}
|
||||||
|
|
||||||
|
function getTaskPriorityWeight(priority: TaskRecord["priority"]) {
|
||||||
|
switch (priority) {
|
||||||
|
case "Critical":
|
||||||
|
return 0;
|
||||||
|
case "High":
|
||||||
|
return 1;
|
||||||
|
case "Medium":
|
||||||
|
return 2;
|
||||||
|
default:
|
||||||
|
return 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function isActiveTask(task: Pick<TaskRecord, "dispatch_state" | "status">) {
|
||||||
|
return ["dispatched", "acknowledged"].includes(task.dispatch_state) || ["In Progress", "Review"].includes(task.status);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isTaskRunnable(task: Pick<TaskRecord, "status" | "dispatch_state">) {
|
||||||
|
return ["Backlog", "Todo"].includes(task.status) && ["planned", "assigned", "failed"].includes(task.dispatch_state);
|
||||||
|
}
|
||||||
|
|
||||||
|
function sortTasks(tasks: TaskRecord[]) {
|
||||||
|
return [...tasks].sort((left, right) => {
|
||||||
|
const priorityDelta = getTaskPriorityWeight(left.priority) - getTaskPriorityWeight(right.priority);
|
||||||
|
if (priorityDelta !== 0) {
|
||||||
|
return priorityDelta;
|
||||||
|
}
|
||||||
|
return left.id - right.id;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function processAgentHeartbeat(assignmentKey: string): Promise<AgentHeartbeatResult> {
|
||||||
|
const agent = await findAgentByAssignmentKey(assignmentKey);
|
||||||
|
if (!agent) {
|
||||||
|
throw new Error("agent_not_found");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (agent.family === "openclaw") {
|
||||||
|
await syncOpenClawTasks();
|
||||||
|
}
|
||||||
|
|
||||||
|
const assignedTasks = sortTasks(await listTasksForAssignee(agent.aliases, { includeDone: false }));
|
||||||
|
const taskSummaries = await Promise.all(
|
||||||
|
assignedTasks.map(async (task) => {
|
||||||
|
const blockedBy = (
|
||||||
|
await Promise.all(
|
||||||
|
extractDependencyIds(task).map(async (dependencyId) => {
|
||||||
|
const dependency = await findTask(dependencyId);
|
||||||
|
return dependency?.status === "Done" ? null : dependencyId;
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
).filter((dependencyId): dependencyId is number => dependencyId !== null);
|
||||||
|
|
||||||
|
return {
|
||||||
|
id: task.id,
|
||||||
|
title: task.title,
|
||||||
|
status: task.status,
|
||||||
|
dispatch_state: task.dispatch_state,
|
||||||
|
priority: task.priority,
|
||||||
|
assignee: task.assignee,
|
||||||
|
target_host: task.target_host,
|
||||||
|
blocked: blockedBy.length > 0,
|
||||||
|
blockedBy,
|
||||||
|
} satisfies HeartbeatTaskSummary;
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const activeTasks = taskSummaries.filter((task) => isActiveTask(task) && !task.blocked);
|
||||||
|
const runnableTask = taskSummaries.find((task) => isTaskRunnable(task) && !task.blocked) || null;
|
||||||
|
const blockedTasks = taskSummaries.filter((task) => task.blocked);
|
||||||
|
const notes: string[] = [];
|
||||||
|
let dispatchedTask: HeartbeatTaskSummary | null = null;
|
||||||
|
|
||||||
|
if (blockedTasks.length > 0) {
|
||||||
|
notes.push(`${blockedTasks.length} blocked task(s) waiting on dependencies`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (activeTasks.length === 0 && runnableTask) {
|
||||||
|
const updatedTask = await dispatchTask(runnableTask.id);
|
||||||
|
dispatchedTask = {
|
||||||
|
id: updatedTask.id,
|
||||||
|
title: updatedTask.title,
|
||||||
|
status: updatedTask.status,
|
||||||
|
dispatch_state: updatedTask.dispatch_state,
|
||||||
|
priority: updatedTask.priority,
|
||||||
|
assignee: updatedTask.assignee,
|
||||||
|
target_host: updatedTask.target_host,
|
||||||
|
blocked: false,
|
||||||
|
blockedBy: [],
|
||||||
|
};
|
||||||
|
notes.push(`Auto-dispatched task #${updatedTask.id}`);
|
||||||
|
await appendTaskEvent({
|
||||||
|
taskId: updatedTask.id,
|
||||||
|
assignee: updatedTask.assignee,
|
||||||
|
family: updatedTask.family,
|
||||||
|
host: updatedTask.target_host,
|
||||||
|
eventType: "updated",
|
||||||
|
state: updatedTask.dispatch_state,
|
||||||
|
summary: `Heartbeat auto-dispatched ${agent.name}`,
|
||||||
|
detail: `Triggered from /api/heartbeat/${assignmentKey}`,
|
||||||
|
});
|
||||||
|
} else if (activeTasks.length > 0) {
|
||||||
|
notes.push(`${activeTasks.length} active task(s) already in progress`);
|
||||||
|
} else if (!runnableTask && assignedTasks.length > 0) {
|
||||||
|
notes.push("No runnable tasks available for dispatch");
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
agent: {
|
||||||
|
slug: agent.slug,
|
||||||
|
assignmentKey: agent.assignmentKey,
|
||||||
|
family: agent.family,
|
||||||
|
host: agent.host,
|
||||||
|
heartbeatAt: agent.heartbeatAt,
|
||||||
|
currentTask: agent.currentTask,
|
||||||
|
},
|
||||||
|
pending_tasks: taskSummaries.filter((task) => !["Done"].includes(task.status)).length,
|
||||||
|
active_tasks: activeTasks.length,
|
||||||
|
blocked_tasks: blockedTasks.length,
|
||||||
|
dispatched_task: dispatchedTask,
|
||||||
|
tasks: taskSummaries,
|
||||||
|
notes,
|
||||||
|
};
|
||||||
|
}
|
||||||
39
lib/tasks.ts
39
lib/tasks.ts
@@ -243,6 +243,45 @@ export async function listTaskEvents(taskId?: number, limit = 50) {
|
|||||||
return rows;
|
return rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function listTasksForAssignee(assigneeAliases: string[], options?: {
|
||||||
|
includeDone?: boolean;
|
||||||
|
}) {
|
||||||
|
if (assigneeAliases.length === 0) {
|
||||||
|
return [] as TaskRecord[];
|
||||||
|
}
|
||||||
|
|
||||||
|
const placeholders = assigneeAliases.map(() => "?").join(", ");
|
||||||
|
const params: unknown[] = [...assigneeAliases];
|
||||||
|
const clauses = [`assignee IN (${placeholders})`];
|
||||||
|
if (!options?.includeDone) {
|
||||||
|
clauses.push("status != 'Done'");
|
||||||
|
}
|
||||||
|
|
||||||
|
const rows = await all<DatabaseTaskRow>(
|
||||||
|
`SELECT * FROM tasks
|
||||||
|
WHERE ${clauses.join(" AND ")}
|
||||||
|
ORDER BY
|
||||||
|
CASE status
|
||||||
|
WHEN 'In Progress' THEN 0
|
||||||
|
WHEN 'Review' THEN 1
|
||||||
|
WHEN 'Todo' THEN 2
|
||||||
|
WHEN 'Backlog' THEN 3
|
||||||
|
ELSE 4
|
||||||
|
END,
|
||||||
|
CASE priority
|
||||||
|
WHEN 'Critical' THEN 0
|
||||||
|
WHEN 'High' THEN 1
|
||||||
|
WHEN 'Medium' THEN 2
|
||||||
|
ELSE 3
|
||||||
|
END,
|
||||||
|
created_at ASC,
|
||||||
|
id ASC`,
|
||||||
|
params,
|
||||||
|
);
|
||||||
|
|
||||||
|
return rows.map(normalizeTask);
|
||||||
|
}
|
||||||
|
|
||||||
export async function listTaskTemplates(): Promise<TaskTemplate[]> {
|
export async function listTaskTemplates(): Promise<TaskTemplate[]> {
|
||||||
return TASK_TEMPLATES;
|
return TASK_TEMPLATES;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user