Files
openclaw-taskboard/lib/dispatch.ts

364 lines
11 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import { promisify } from "node:util";
import { execFile } from "node:child_process";
import {
DIRECT_SSH_KEY_PATH,
DIRECT_SSH_TIMEOUT_MS,
FLEET_CONFIG,
REPO_ACCESS_ROOTS,
SWARM_HOST_WORKTREES_DIR,
SWARM_REPO_MAP_FILE,
SWARM_TASKS_FILE,
SWARM_WORKTREES_DIR,
ZEROCLAW_WEBHOOK_TIMEOUT_MS,
} from "@/lib/fleet-config";
import { findAgentByAssignmentKey } from "@/lib/agents";
import { appendTaskEvent, applyTaskCallback, findTask, updateTask } from "@/lib/tasks";
import type { DispatchState, TaskCallbackPayload } from "@/lib/types";
const execFileAsync = promisify(execFile);
type DispatchResult = {
state: DispatchState;
summary: string;
detail: string;
callback?: TaskCallbackPayload;
};
function defaultModelForAgent(agent: string) {
switch (agent) {
case "opencode":
return "zai-coding-plan/glm-4.7";
case "gemini":
return "gemini-3.1-pro";
default:
return "gpt-5.3-codex";
}
}
function readRepoMap() {
if (!fs.existsSync(SWARM_REPO_MAP_FILE)) {
throw new Error(`missing_repo_map:${SWARM_REPO_MAP_FILE}`);
}
return JSON.parse(fs.readFileSync(SWARM_REPO_MAP_FILE, "utf8")) as Record<string, string>;
}
function ensureAllowedRepoPath(repoPath: string) {
const resolved = path.resolve(repoPath);
const allowed = REPO_ACCESS_ROOTS.some((root) => resolved.startsWith(path.resolve(root)));
if (!allowed) {
throw new Error(`repo_path_not_allowed:${resolved}`);
}
return resolved;
}
function ensureSwarmRegistry() {
fs.mkdirSync(path.dirname(SWARM_TASKS_FILE), { recursive: true });
if (!fs.existsSync(SWARM_TASKS_FILE)) {
fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks: [] }, null, 2));
}
}
function extractTagValue(tags: string[], prefix: string) {
const match = tags.find((tag) => tag.startsWith(prefix));
return match ? match.slice(prefix.length) : null;
}
function truncateOutput(output: string, maxLength = 4000) {
const trimmed = output.trim();
if (trimmed.length <= maxLength) {
return trimmed;
}
return `${trimmed.slice(0, maxLength - 15)}\n...[truncated]`;
}
async function dispatchOpenClawTask(taskId: number): Promise<DispatchResult> {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
if (!task.repo_slug) {
throw new Error("repo_slug_required_for_openclaw_dispatch");
}
const repoMap = readRepoMap();
const repoPath = ensureAllowedRepoPath(repoMap[task.repo_slug] || "");
if (!repoPath || !fs.existsSync(path.join(repoPath, ".git"))) {
throw new Error(`repo_not_available:${task.repo_slug}`);
}
await execFileAsync("git", ["config", "--global", "--add", "safe.directory", repoPath]);
const agentName = task.preferred_agent || "codex";
const taskKey = `taskboard-${task.id}`;
const repoName = path.basename(repoPath);
const worktree = path.join(SWARM_WORKTREES_DIR, repoName, taskKey);
const hostWorktree = path.join(SWARM_HOST_WORKTREES_DIR, repoName, taskKey);
const branch = `feat/taskboard-${task.id}`;
const baseBranch = task.base_branch || "main";
fs.mkdirSync(path.dirname(worktree), { recursive: true });
if (!fs.existsSync(worktree)) {
try {
await execFileAsync("git", ["-C", repoPath, "fetch", "origin", baseBranch]);
} catch {
// Keep going. Many local repos already have the base branch available.
}
await execFileAsync("git", ["-C", repoPath, "worktree", "add", worktree, "-b", branch, `origin/${baseBranch}`]);
}
ensureSwarmRegistry();
const registry = JSON.parse(fs.readFileSync(SWARM_TASKS_FILE, "utf8")) as { tasks?: Array<Record<string, unknown>> };
const tasks = Array.isArray(registry.tasks) ? registry.tasks : [];
const existing = tasks.find((entry) => entry.taskboardTaskId === task.id);
if (!existing) {
tasks.push({
id: taskKey,
agent: agentName,
repo: repoName,
repoPath,
repoSlug: task.repo_slug,
worktree: hostWorktree,
branch,
baseBranch,
tmuxSession: `${agentName}-${taskKey}`,
description: task.description,
prompt: `Taskboard task #${task.id}: ${task.title}\n\n${task.description}`,
status: "queued",
notifyOnComplete: true,
attempts: 0,
maxAttempts: 3,
model: task.model_hint || defaultModelForAgent(agentName),
reasoning: task.reasoning_effort || "high",
taskboardTaskId: task.id,
startedAt: null,
completedAt: null,
createdAt: Date.now(),
checks: {
prCreated: false,
ciPassed: false,
reviewPassed: false,
},
});
} else {
existing.repoPath = repoPath;
existing.repoSlug = task.repo_slug;
existing.worktree = hostWorktree;
existing.branch = branch;
existing.baseBranch = baseBranch;
existing.agent = agentName;
existing.tmuxSession = `${agentName}-${taskKey}`;
}
fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks }, null, 2));
return {
state: "dispatched" as const,
summary: `Queued in OpenClaw swarm for ${agentName}`,
detail: `${task.repo_slug} -> ${hostWorktree}`,
};
}
async function dispatchZeroClawTask(taskId: number): Promise<DispatchResult> {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
const agent = await findAgentByAssignmentKey(task.assignee);
if (!agent) {
throw new Error("assignee_not_found");
}
const urlEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_URL" : "ZEROCLAW_ICE_URL";
const tokenEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_TOKEN" : "ZEROCLAW_ICE_TOKEN";
const baseUrl = process.env[urlEnv];
if (!baseUrl) {
throw new Error(`missing_gateway_url:${urlEnv}`);
}
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), ZEROCLAW_WEBHOOK_TIMEOUT_MS);
const response = await fetch(`${baseUrl.replace(/\/$/, "")}/webhook`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(process.env[tokenEnv] ? { Authorization: `Bearer ${process.env[tokenEnv]}` } : {}),
},
body: JSON.stringify({
message: `Taskboard task #${task.id}: ${task.title}\nHost: ${task.target_host || agent.host}\nChannel: ${task.target_channel || "n/a"}\n\n${task.description}`,
}),
signal: controller.signal,
});
clearTimeout(timeout);
if (!response.ok) {
const responseText = await response.text();
throw new Error(`webhook_failed:${response.status}:${responseText}`);
}
const responseText = await response.text();
return {
state: "dispatched" as const,
summary: `Posted to ${agent.name} webhook`,
detail: responseText || `${agent.host} webhook accepted`,
};
}
function findDirectAgentDefinition(assignmentKey: string) {
return (
FLEET_CONFIG.directAgents.find(
(agent) => agent.assignmentKey === assignmentKey || agent.aliases.includes(assignmentKey),
) || null
);
}
async function dispatchDirectTask(taskId: number): Promise<DispatchResult> {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
const directAgent = findDirectAgentDefinition(task.assignee);
if (!directAgent) {
throw new Error("direct_target_not_found");
}
const actionKey = extractTagValue(task.tags, "action:") || directAgent.dispatch.defaultAction;
const action = directAgent.dispatch.actions.find((entry) => entry.key === actionKey);
if (!action) {
throw new Error(`unsupported_direct_action:${actionKey}`);
}
const sshArgs = [
"-o",
"BatchMode=yes",
"-o",
"StrictHostKeyChecking=accept-new",
"-o",
"ConnectTimeout=15",
"-i",
DIRECT_SSH_KEY_PATH,
"-p",
String(directAgent.dispatch.port),
`${directAgent.dispatch.user}@${directAgent.dispatch.hostname}`,
action.command,
];
try {
const { stdout, stderr } = await execFileAsync("ssh", sshArgs, {
timeout: DIRECT_SSH_TIMEOUT_MS,
maxBuffer: 1024 * 1024,
});
const detail = truncateOutput([stdout, stderr].filter(Boolean).join("\n"));
return {
state: "completed" as const,
summary: `${action.successSummary}`,
detail,
callback: {
status: "Done",
dispatch_state: "completed",
summary: action.successSummary,
detail,
completed_by: `direct-ssh:${directAgent.host}`,
last_error: null,
last_dispatch_at: new Date().toISOString(),
},
};
} catch (error) {
const execError = error as Error & { stdout?: string; stderr?: string };
const detail = truncateOutput([execError.stdout, execError.stderr, execError.message].filter(Boolean).join("\n"));
throw new Error(`direct_ssh_failed:${directAgent.host}:${action.key}:${detail}`);
}
}
export async function dispatchTask(taskId: number) {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
if (!task.assignee) {
throw new Error("assignee_required");
}
const agent = await findAgentByAssignmentKey(task.assignee);
if (!agent) {
throw new Error("assignee_not_found");
}
await appendTaskEvent({
taskId,
assignee: task.assignee,
family: task.family,
host: task.target_host,
eventType: "dispatch_requested",
state: task.dispatch_state,
summary: `Dispatch requested for ${agent.name}`,
detail: task.description,
});
try {
const result =
agent.family === "openclaw"
? await dispatchOpenClawTask(taskId)
: agent.family === "zeroclaw"
? await dispatchZeroClawTask(taskId)
: await dispatchDirectTask(taskId);
if (result.callback) {
const updated = await applyTaskCallback(taskId, result.callback);
if (!updated) {
throw new Error("task_not_found_after_callback");
}
return updated;
}
const updated = await updateTask(taskId, {
status: task.status === "Backlog" ? "Todo" : task.status,
dispatch_state: result.state,
last_dispatch_at: new Date().toISOString(),
last_error: null,
});
if (!updated) {
throw new Error("task_not_found_after_dispatch");
}
await appendTaskEvent({
taskId,
assignee: updated.assignee,
family: updated.family,
host: updated.target_host,
eventType: "dispatch_succeeded",
state: updated.dispatch_state,
summary: result.summary,
detail: result.detail,
});
return updated;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const updated = await updateTask(taskId, {
dispatch_state: "failed",
last_error: message,
});
await appendTaskEvent({
taskId,
assignee: task.assignee,
family: task.family,
host: task.target_host,
eventType: "dispatch_failed",
state: "failed",
summary: "Dispatch failed",
detail: message,
});
if (!updated) {
throw error;
}
return updated;
}
}