167 lines
5.3 KiB
TypeScript
167 lines
5.3 KiB
TypeScript
import { dispatchTask } from "@/lib/dispatch";
|
|
import { findAgentByAssignmentKey } from "@/lib/agents";
|
|
import { syncOpenClawTasks } from "@/lib/openclaw-sync";
|
|
import { appendTaskEvent, findTask, listTasksForAssignee } from "@/lib/tasks";
|
|
import type { FleetAgent, TaskRecord } from "@/lib/types";
|
|
|
|
type HeartbeatTaskSummary = Pick<
|
|
TaskRecord,
|
|
"id" | "title" | "status" | "dispatch_state" | "priority" | "assignee" | "target_host"
|
|
> & {
|
|
blocked: boolean;
|
|
blockedBy: number[];
|
|
};
|
|
|
|
type AgentHeartbeatResult = {
|
|
agent: {
|
|
slug: string;
|
|
assignmentKey: string;
|
|
family: FleetAgent["family"];
|
|
host: string;
|
|
heartbeatAt: string | null;
|
|
currentTask: string | null;
|
|
};
|
|
pending_tasks: number;
|
|
active_tasks: number;
|
|
blocked_tasks: number;
|
|
dispatched_task: HeartbeatTaskSummary | null;
|
|
tasks: HeartbeatTaskSummary[];
|
|
notes: string[];
|
|
};
|
|
|
|
function extractDependencyIds(task: TaskRecord) {
|
|
const values = task.tags
|
|
.filter((tag) => tag.startsWith("depends-on:") || tag.startsWith("dependency:"))
|
|
.flatMap((tag) => tag.split(":").slice(1))
|
|
.flatMap((value) => value.split(/[|,]/))
|
|
.map((value) => Number(value.trim()))
|
|
.filter((value) => Number.isInteger(value) && value > 0);
|
|
|
|
return [...new Set(values)];
|
|
}
|
|
|
|
function getTaskPriorityWeight(priority: TaskRecord["priority"]) {
|
|
switch (priority) {
|
|
case "Critical":
|
|
return 0;
|
|
case "High":
|
|
return 1;
|
|
case "Medium":
|
|
return 2;
|
|
default:
|
|
return 3;
|
|
}
|
|
}
|
|
|
|
function isActiveTask(task: Pick<TaskRecord, "dispatch_state" | "status">) {
|
|
return ["dispatched", "acknowledged"].includes(task.dispatch_state) || ["In Progress", "Review"].includes(task.status);
|
|
}
|
|
|
|
function isTaskRunnable(task: Pick<TaskRecord, "status" | "dispatch_state">) {
|
|
return ["Backlog", "Todo"].includes(task.status) && ["planned", "assigned", "failed"].includes(task.dispatch_state);
|
|
}
|
|
|
|
function sortTasks(tasks: TaskRecord[]) {
|
|
return [...tasks].sort((left, right) => {
|
|
const priorityDelta = getTaskPriorityWeight(left.priority) - getTaskPriorityWeight(right.priority);
|
|
if (priorityDelta !== 0) {
|
|
return priorityDelta;
|
|
}
|
|
return left.id - right.id;
|
|
});
|
|
}
|
|
|
|
export async function processAgentHeartbeat(assignmentKey: string): Promise<AgentHeartbeatResult> {
|
|
const agent = await findAgentByAssignmentKey(assignmentKey);
|
|
if (!agent) {
|
|
throw new Error("agent_not_found");
|
|
}
|
|
|
|
if (agent.family === "openclaw") {
|
|
await syncOpenClawTasks();
|
|
}
|
|
|
|
const assignedTasks = sortTasks(await listTasksForAssignee(agent.aliases, { includeDone: false }));
|
|
const taskSummaries = await Promise.all(
|
|
assignedTasks.map(async (task) => {
|
|
const blockedBy = (
|
|
await Promise.all(
|
|
extractDependencyIds(task).map(async (dependencyId) => {
|
|
const dependency = await findTask(dependencyId);
|
|
return dependency?.status === "Done" ? null : dependencyId;
|
|
}),
|
|
)
|
|
).filter((dependencyId): dependencyId is number => dependencyId !== null);
|
|
|
|
return {
|
|
id: task.id,
|
|
title: task.title,
|
|
status: task.status,
|
|
dispatch_state: task.dispatch_state,
|
|
priority: task.priority,
|
|
assignee: task.assignee,
|
|
target_host: task.target_host,
|
|
blocked: blockedBy.length > 0,
|
|
blockedBy,
|
|
} satisfies HeartbeatTaskSummary;
|
|
}),
|
|
);
|
|
|
|
const activeTasks = taskSummaries.filter((task) => isActiveTask(task) && !task.blocked);
|
|
const runnableTask = taskSummaries.find((task) => isTaskRunnable(task) && !task.blocked) || null;
|
|
const blockedTasks = taskSummaries.filter((task) => task.blocked);
|
|
const notes: string[] = [];
|
|
let dispatchedTask: HeartbeatTaskSummary | null = null;
|
|
|
|
if (blockedTasks.length > 0) {
|
|
notes.push(`${blockedTasks.length} blocked task(s) waiting on dependencies`);
|
|
}
|
|
|
|
if (activeTasks.length === 0 && runnableTask) {
|
|
const updatedTask = await dispatchTask(runnableTask.id);
|
|
dispatchedTask = {
|
|
id: updatedTask.id,
|
|
title: updatedTask.title,
|
|
status: updatedTask.status,
|
|
dispatch_state: updatedTask.dispatch_state,
|
|
priority: updatedTask.priority,
|
|
assignee: updatedTask.assignee,
|
|
target_host: updatedTask.target_host,
|
|
blocked: false,
|
|
blockedBy: [],
|
|
};
|
|
notes.push(`Auto-dispatched task #${updatedTask.id}`);
|
|
await appendTaskEvent({
|
|
taskId: updatedTask.id,
|
|
assignee: updatedTask.assignee,
|
|
family: updatedTask.family,
|
|
host: updatedTask.target_host,
|
|
eventType: "updated",
|
|
state: updatedTask.dispatch_state,
|
|
summary: `Heartbeat auto-dispatched ${agent.name}`,
|
|
detail: `Triggered from /api/heartbeat/${assignmentKey}`,
|
|
});
|
|
} else if (activeTasks.length > 0) {
|
|
notes.push(`${activeTasks.length} active task(s) already in progress`);
|
|
} else if (!runnableTask && assignedTasks.length > 0) {
|
|
notes.push("No runnable tasks available for dispatch");
|
|
}
|
|
|
|
return {
|
|
agent: {
|
|
slug: agent.slug,
|
|
assignmentKey: agent.assignmentKey,
|
|
family: agent.family,
|
|
host: agent.host,
|
|
heartbeatAt: agent.heartbeatAt,
|
|
currentTask: agent.currentTask,
|
|
},
|
|
pending_tasks: taskSummaries.filter((task) => !["Done"].includes(task.status)).length,
|
|
active_tasks: activeTasks.length,
|
|
blocked_tasks: blockedTasks.length,
|
|
dispatched_task: dispatchedTask,
|
|
tasks: taskSummaries,
|
|
notes,
|
|
};
|
|
}
|