import fs from "node:fs"; import path from "node:path"; import { promisify } from "node:util"; import { execFile } from "node:child_process"; import { DIRECT_SSH_KEY_PATH, DIRECT_SSH_TIMEOUT_MS, FLEET_CONFIG, REPO_ACCESS_ROOTS, SWARM_HOST_WORKTREES_DIR, SWARM_REPO_MAP_FILE, SWARM_TASKS_FILE, SWARM_WORKTREES_DIR, ZEROCLAW_WEBHOOK_TIMEOUT_MS, } from "@/lib/fleet-config"; import { findAgentByAssignmentKey } from "@/lib/agents"; import { appendTaskEvent, applyTaskCallback, findTask, updateTask } from "@/lib/tasks"; import type { DispatchState, TaskCallbackPayload } from "@/lib/types"; const execFileAsync = promisify(execFile); type DispatchResult = { state: DispatchState; summary: string; detail: string; callback?: TaskCallbackPayload; }; function defaultModelForAgent(agent: string) { switch (agent) { case "opencode": return "zai-coding-plan/glm-4.7"; case "gemini": return "gemini-3.1-pro"; default: return "gpt-5.3-codex"; } } function readRepoMap() { if (!fs.existsSync(SWARM_REPO_MAP_FILE)) { throw new Error(`missing_repo_map:${SWARM_REPO_MAP_FILE}`); } return JSON.parse(fs.readFileSync(SWARM_REPO_MAP_FILE, "utf8")) as Record; } function ensureAllowedRepoPath(repoPath: string) { const resolved = path.resolve(repoPath); const allowed = REPO_ACCESS_ROOTS.some((root) => resolved.startsWith(path.resolve(root))); if (!allowed) { throw new Error(`repo_path_not_allowed:${resolved}`); } return resolved; } function ensureSwarmRegistry() { fs.mkdirSync(path.dirname(SWARM_TASKS_FILE), { recursive: true }); if (!fs.existsSync(SWARM_TASKS_FILE)) { fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks: [] }, null, 2)); } } function extractTagValue(tags: string[], prefix: string) { const match = tags.find((tag) => tag.startsWith(prefix)); return match ? match.slice(prefix.length) : null; } function truncateOutput(output: string, maxLength = 4000) { const trimmed = output.trim(); if (trimmed.length <= maxLength) { return trimmed; } return `${trimmed.slice(0, maxLength - 15)}\n...[truncated]`; } async function dispatchOpenClawTask(taskId: number): Promise { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } if (!task.repo_slug) { throw new Error("repo_slug_required_for_openclaw_dispatch"); } const repoMap = readRepoMap(); const repoPath = ensureAllowedRepoPath(repoMap[task.repo_slug] || ""); if (!repoPath || !fs.existsSync(path.join(repoPath, ".git"))) { throw new Error(`repo_not_available:${task.repo_slug}`); } await execFileAsync("git", ["config", "--global", "--add", "safe.directory", repoPath]); const agentName = task.preferred_agent || "codex"; const taskKey = `taskboard-${task.id}`; const repoName = path.basename(repoPath); const worktree = path.join(SWARM_WORKTREES_DIR, repoName, taskKey); const hostWorktree = path.join(SWARM_HOST_WORKTREES_DIR, repoName, taskKey); const branch = `feat/taskboard-${task.id}`; const baseBranch = task.base_branch || "main"; fs.mkdirSync(path.dirname(worktree), { recursive: true }); if (!fs.existsSync(worktree)) { try { await execFileAsync("git", ["-C", repoPath, "fetch", "origin", baseBranch]); } catch { // Keep going. Many local repos already have the base branch available. } await execFileAsync("git", ["-C", repoPath, "worktree", "add", worktree, "-b", branch, `origin/${baseBranch}`]); } ensureSwarmRegistry(); const registry = JSON.parse(fs.readFileSync(SWARM_TASKS_FILE, "utf8")) as { tasks?: Array> }; const tasks = Array.isArray(registry.tasks) ? registry.tasks : []; const existing = tasks.find((entry) => entry.taskboardTaskId === task.id); if (!existing) { tasks.push({ id: taskKey, agent: agentName, repo: repoName, repoPath, repoSlug: task.repo_slug, worktree: hostWorktree, branch, baseBranch, tmuxSession: `${agentName}-${taskKey}`, description: task.description, prompt: `Taskboard task #${task.id}: ${task.title}\n\n${task.description}`, status: "queued", notifyOnComplete: true, attempts: 0, maxAttempts: 3, model: task.model_hint || defaultModelForAgent(agentName), reasoning: task.reasoning_effort || "high", taskboardTaskId: task.id, startedAt: null, completedAt: null, createdAt: Date.now(), checks: { prCreated: false, ciPassed: false, reviewPassed: false, }, }); } else { existing.repoPath = repoPath; existing.repoSlug = task.repo_slug; existing.worktree = hostWorktree; existing.branch = branch; existing.baseBranch = baseBranch; existing.agent = agentName; existing.tmuxSession = `${agentName}-${taskKey}`; } fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks }, null, 2)); return { state: "dispatched" as const, summary: `Queued in OpenClaw swarm for ${agentName}`, detail: `${task.repo_slug} -> ${hostWorktree}`, }; } async function dispatchZeroClawTask(taskId: number): Promise { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } const agent = await findAgentByAssignmentKey(task.assignee); if (!agent) { throw new Error("assignee_not_found"); } const urlEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_URL" : "ZEROCLAW_ICE_URL"; const tokenEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_TOKEN" : "ZEROCLAW_ICE_TOKEN"; const baseUrl = process.env[urlEnv]; if (!baseUrl) { throw new Error(`missing_gateway_url:${urlEnv}`); } const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), ZEROCLAW_WEBHOOK_TIMEOUT_MS); const response = await fetch(`${baseUrl.replace(/\/$/, "")}/webhook`, { method: "POST", headers: { "Content-Type": "application/json", ...(process.env[tokenEnv] ? { Authorization: `Bearer ${process.env[tokenEnv]}` } : {}), }, body: JSON.stringify({ message: `Taskboard task #${task.id}: ${task.title}\nHost: ${task.target_host || agent.host}\nChannel: ${task.target_channel || "n/a"}\n\n${task.description}`, }), signal: controller.signal, }); clearTimeout(timeout); if (!response.ok) { const responseText = await response.text(); throw new Error(`webhook_failed:${response.status}:${responseText}`); } const responseText = await response.text(); return { state: "dispatched" as const, summary: `Posted to ${agent.name} webhook`, detail: responseText || `${agent.host} webhook accepted`, }; } function findDirectAgentDefinition(assignmentKey: string) { return ( FLEET_CONFIG.directAgents.find( (agent) => agent.assignmentKey === assignmentKey || agent.aliases.includes(assignmentKey), ) || null ); } async function dispatchDirectTask(taskId: number): Promise { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } const directAgent = findDirectAgentDefinition(task.assignee); if (!directAgent) { throw new Error("direct_target_not_found"); } const actionKey = extractTagValue(task.tags, "action:") || directAgent.dispatch.defaultAction; const action = directAgent.dispatch.actions.find((entry) => entry.key === actionKey); if (!action) { throw new Error(`unsupported_direct_action:${actionKey}`); } const sshArgs = [ "-F", "/dev/null", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=accept-new", "-o", "ConnectTimeout=15", "-o", "IdentitiesOnly=yes", "-o", "UserKnownHostsFile=/tmp/taskboard_known_hosts", "-i", DIRECT_SSH_KEY_PATH, "-p", String(directAgent.dispatch.port), `${directAgent.dispatch.user}@${directAgent.dispatch.hostname}`, action.command, ]; try { const { stdout, stderr } = await execFileAsync("ssh", sshArgs, { timeout: DIRECT_SSH_TIMEOUT_MS, maxBuffer: 1024 * 1024, }); const detail = truncateOutput([stdout, stderr].filter(Boolean).join("\n")); return { state: "completed" as const, summary: `${action.successSummary}`, detail, callback: { status: "Done", dispatch_state: "completed", summary: action.successSummary, detail, completed_by: `direct-ssh:${directAgent.host}`, last_error: null, last_dispatch_at: new Date().toISOString(), }, }; } catch (error) { const execError = error as Error & { stdout?: string; stderr?: string }; const detail = truncateOutput([execError.stdout, execError.stderr, execError.message].filter(Boolean).join("\n")); throw new Error(`direct_ssh_failed:${directAgent.host}:${action.key}:${detail}`); } } export async function dispatchTask(taskId: number) { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } if (!task.assignee) { throw new Error("assignee_required"); } const agent = await findAgentByAssignmentKey(task.assignee); if (!agent) { throw new Error("assignee_not_found"); } await appendTaskEvent({ taskId, assignee: task.assignee, family: task.family, host: task.target_host, eventType: "dispatch_requested", state: task.dispatch_state, summary: `Dispatch requested for ${agent.name}`, detail: task.description, }); try { const result = agent.family === "openclaw" ? await dispatchOpenClawTask(taskId) : agent.family === "zeroclaw" ? await dispatchZeroClawTask(taskId) : await dispatchDirectTask(taskId); if (result.callback) { const updated = await applyTaskCallback(taskId, result.callback); if (!updated) { throw new Error("task_not_found_after_callback"); } return updated; } const updated = await updateTask(taskId, { status: task.status === "Backlog" ? "Todo" : task.status, dispatch_state: result.state, last_dispatch_at: new Date().toISOString(), last_error: null, }); if (!updated) { throw new Error("task_not_found_after_dispatch"); } await appendTaskEvent({ taskId, assignee: updated.assignee, family: updated.family, host: updated.target_host, eventType: "dispatch_succeeded", state: updated.dispatch_state, summary: result.summary, detail: result.detail, }); return updated; } catch (error) { const message = error instanceof Error ? error.message : String(error); const updated = await updateTask(taskId, { dispatch_state: "failed", last_error: message, }); await appendTaskEvent({ taskId, assignee: task.assignee, family: task.family, host: task.target_host, eventType: "dispatch_failed", state: "failed", summary: "Dispatch failed", detail: message, }); if (!updated) { throw error; } return updated; } }