Files
openclaw-taskboard/lib/dispatch.ts

711 lines
20 KiB
TypeScript

import fs from "node:fs";
import path from "node:path";
import { promisify } from "node:util";
import { execFile } from "node:child_process";
import {
DIRECT_SSH_KEY_PATH,
DIRECT_SSH_TIMEOUT_MS,
FLEET_CONFIG,
REPO_ACCESS_ROOTS,
SWARM_HOST_WORKTREES_DIR,
SWARM_REPO_MAP_FILE,
SWARM_TASKS_FILE,
SWARM_WORKTREES_DIR,
ZEROCLAW_WEBHOOK_TIMEOUT_MS,
} from "@/lib/fleet-config";
import { findAgentByAssignmentKey } from "@/lib/agents";
import { appendTaskEvent, applyTaskCallback, findTask, updateTask } from "@/lib/tasks";
import type { DispatchState, TaskCallbackPayload } from "@/lib/types";
const execFileAsync = promisify(execFile);
type DispatchResult = {
state: DispatchState;
summary: string;
detail: string;
callback?: TaskCallbackPayload;
};
function defaultModelForAgent(agent: string) {
switch (agent) {
case "opencode":
return "zai-coding-plan/glm-4.7";
case "gemini":
return "gemini-3.1-pro";
default:
return "gpt-5.3-codex";
}
}
function readRepoMap() {
if (!fs.existsSync(SWARM_REPO_MAP_FILE)) {
throw new Error(`missing_repo_map:${SWARM_REPO_MAP_FILE}`);
}
return JSON.parse(fs.readFileSync(SWARM_REPO_MAP_FILE, "utf8")) as Record<string, string>;
}
function ensureAllowedRepoPath(repoPath: string) {
const resolved = path.resolve(repoPath);
const allowed = REPO_ACCESS_ROOTS.some((root) => resolved.startsWith(path.resolve(root)));
if (!allowed) {
throw new Error(`repo_path_not_allowed:${resolved}`);
}
return resolved;
}
function ensureSwarmRegistry() {
fs.mkdirSync(path.dirname(SWARM_TASKS_FILE), { recursive: true });
if (!fs.existsSync(SWARM_TASKS_FILE)) {
fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks: [] }, null, 2));
}
}
function extractTagValue(tags: string[], prefix: string) {
const match = tags.find((tag) => tag.startsWith(prefix));
return match ? match.slice(prefix.length) : null;
}
function truncateOutput(output: string, maxLength = 4000) {
const trimmed = output
.split("\n")
.filter((line) => !line.startsWith("Warning: Permanently added "))
.join("\n")
.trim();
if (trimmed.length <= maxLength) {
return trimmed;
}
return `${trimmed.slice(0, maxLength - 15)}\n...[truncated]`;
}
type TrueNasDataset = {
name: string;
used: string;
avail: string;
mountpoint: string;
};
type DatasetSignal = {
source: string;
dataset: string;
signalType: "active" | "legacy";
matchedText: string;
};
const TRUENAS_SIGNAL_PATTERNS: Array<{
dataset: string;
patterns: string[];
signalType: "active" | "legacy";
}> = [
{
dataset: "TrueNAS/NetworkMediaShare",
patterns: [
"/mnt/truenas/mediadata",
"/mnt/TrueNAS/NetworkMediaShare",
"/mnt/TrueNAS/NetworkMediaShare/mediadata",
],
signalType: "active",
},
{
dataset: "RPiPool/PersonalMediaLibrary",
patterns: [
"/mnt/PersonalMediaLibrary",
"/mnt/RPiPool/PersonalMediaLibrary",
],
signalType: "active",
},
{
dataset: "TrueNAS/backups",
patterns: [
"/mnt/truenas-backup",
"/mnt/TrueNAS/backups",
],
signalType: "active",
},
{
dataset: "TrueNAS/container-config",
patterns: ["/mnt/TrueNAS/container-config"],
signalType: "active",
},
{
dataset: "TrueNAS/databases",
patterns: ["/mnt/TrueNAS/databases"],
signalType: "active",
},
{
dataset: "TrueNAS/homelab/databases",
patterns: ["/mnt/TrueNAS/homelab/databases"],
signalType: "active",
},
{
dataset: "TrueNAS/homelab/hosts/ubuntu/docker-data",
patterns: ["/mnt/TrueNAS/homelab/hosts/ubuntu/docker-data"],
signalType: "active",
},
{
dataset: "TrueNAS/homelab/hosts/grizzley/docker-data",
patterns: ["/mnt/TrueNAS/homelab/hosts/grizzley/docker-data"],
signalType: "active",
},
{
dataset: "TrueNAS/homelab/hosts/ice/docker-data",
patterns: ["/mnt/TrueNAS/homelab/hosts/ice/docker-data"],
signalType: "active",
},
{
dataset: "TrueNAS/RPiPool-backup",
patterns: ["/mnt/TrueNAS/RPiPool-backup"],
signalType: "legacy",
},
{
dataset: "TrueNAS/PersonalMediaLibraryBackup",
patterns: ["/mnt/TrueNAS/PersonalMediaLibraryBackup"],
signalType: "legacy",
},
{
dataset: "TrueNAS/TimeMachine",
patterns: ["/mnt/TrueNAS/TimeMachine", "TimeMachine"],
signalType: "legacy",
},
{
dataset: "TrueNAS/UserShares",
patterns: ["/mnt/TrueNAS/UserShares", "UserShares"],
signalType: "legacy",
},
{
dataset: "TrueNAS/UserShares/RedVelvet",
patterns: ["/mnt/TrueNAS/UserShares/RedVelvet", "TrueNAS/UserShares/RedVelvet"],
signalType: "legacy",
},
{
dataset: "TrueNAS/UserShares/Vanilla",
patterns: ["/mnt/TrueNAS/UserShares/Vanilla", "TrueNAS/UserShares/Vanilla"],
signalType: "legacy",
},
{
dataset: "TrueNAS/traefik-certs",
patterns: ["/mnt/truenas/traefik-certs", "/mnt/TrueNAS/traefik-certs"],
signalType: "active",
},
];
const ACTIVE_TRUENAS_SCAN_PATHS = [
"homelab/ubuntu",
"homelab/grizzley",
"homelab/truenas/AGENTS.md",
"homelab/AGENTS.md",
"homelab/catalog",
"ansible/playbooks",
];
const LEGACY_TRUENAS_SCAN_PATHS = [
"homelab/inventory/truenas.json",
"homelab/proxmox/truenas",
"obsidian-vault/homelab",
];
function sshConnectionArgs(host: string, user: string, port: number) {
return [
"-F",
"/dev/null",
"-o",
"BatchMode=yes",
"-o",
"StrictHostKeyChecking=accept-new",
"-o",
"ConnectTimeout=15",
"-o",
"IdentitiesOnly=yes",
"-o",
"UserKnownHostsFile=/tmp/taskboard_known_hosts",
"-i",
DIRECT_SSH_KEY_PATH,
"-p",
String(port),
`${user}@${host}`,
];
}
function existingHomelabRoots() {
const candidates = [
process.env.HOMELAB_REPO_ROOT,
"/home/bear/homelabagentroot",
"/home/christopher/opencode-home",
].filter((entry): entry is string => Boolean(entry));
return [...new Set(candidates)].filter((candidate) => fs.existsSync(candidate));
}
function collectTextFiles(targetPath: string, collected: string[]) {
if (!fs.existsSync(targetPath)) {
return;
}
const stat = fs.statSync(targetPath);
if (stat.isFile()) {
if (stat.size <= 1024 * 1024) {
collected.push(targetPath);
}
return;
}
for (const entry of fs.readdirSync(targetPath, { withFileTypes: true })) {
if (entry.name.startsWith(".git") || entry.name === "node_modules" || entry.name === "code-server-ai") {
continue;
}
collectTextFiles(path.join(targetPath, entry.name), collected);
}
}
function scanTrueNasSignals(repoRoot: string, relativePaths: string[], signalType: "active" | "legacy") {
const files: string[] = [];
for (const relativePath of relativePaths) {
collectTextFiles(path.join(repoRoot, relativePath), files);
}
const signals: DatasetSignal[] = [];
for (const filePath of files) {
let content = "";
try {
content = fs.readFileSync(filePath, "utf8");
} catch {
continue;
}
for (const mapping of TRUENAS_SIGNAL_PATTERNS.filter((entry) => entry.signalType === signalType)) {
const matchedPattern = mapping.patterns.find((pattern) => content.includes(pattern));
if (matchedPattern) {
signals.push({
source: filePath,
dataset: mapping.dataset,
signalType,
matchedText: matchedPattern,
});
}
}
}
return signals;
}
function datasetHierarchyName(datasetName: string) {
return datasetName.replace(/\/+$/, "");
}
function summarizeSignals(signals: DatasetSignal[], dataset: string) {
return signals.filter((signal) => signal.dataset === dataset);
}
async function runTrueNasDatasetAudit(taskId: number, host: string, user: string, port: number): Promise<DispatchResult> {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
const sshArgs = sshConnectionArgs(host, user, port);
const { stdout } = await execFileAsync(
"ssh",
[...sshArgs, "zfs list -H -o name,used,avail,mountpoint"],
{
timeout: DIRECT_SSH_TIMEOUT_MS,
maxBuffer: 1024 * 1024,
},
);
const datasets = stdout
.split("\n")
.map((line) => line.trim())
.filter(Boolean)
.map((line) => {
const [name, used, avail, mountpoint] = line.split("\t");
return { name, used, avail, mountpoint } satisfies TrueNasDataset;
});
const repoRoots = existingHomelabRoots();
const activeSignals = repoRoots.flatMap((repoRoot) =>
scanTrueNasSignals(repoRoot, ACTIVE_TRUENAS_SCAN_PATHS, "active"),
);
const legacySignals = repoRoots.flatMap((repoRoot) =>
scanTrueNasSignals(repoRoot, LEGACY_TRUENAS_SCAN_PATHS, "legacy"),
);
const childActiveMap = new Map<string, number>();
for (const dataset of datasets) {
const parentNames = dataset.name.split("/").map((_, index, parts) => parts.slice(0, index + 1).join("/"));
for (const parentName of parentNames.slice(0, -1)) {
childActiveMap.set(parentName, (childActiveMap.get(parentName) || 0) + 1);
}
}
const activeDatasets = datasets
.map((dataset) => ({
dataset,
activeRefs: summarizeSignals(activeSignals, dataset.name),
legacyRefs: summarizeSignals(legacySignals, dataset.name),
}))
.filter(({ activeRefs }) => activeRefs.length > 0);
const reviewCandidates = datasets
.map((dataset) => ({
dataset,
activeRefs: summarizeSignals(activeSignals, dataset.name),
legacyRefs: summarizeSignals(legacySignals, dataset.name),
hasActiveChild:
datasets.some(
(candidate) =>
candidate.name !== dataset.name &&
datasetHierarchyName(candidate.name).startsWith(`${datasetHierarchyName(dataset.name)}/`) &&
summarizeSignals(activeSignals, candidate.name).length > 0,
),
}))
.filter(({ dataset, activeRefs, hasActiveChild }) => {
if (dataset.name === "TrueNAS" || dataset.name === "RPiPool" || dataset.name.includes("/.system")) {
return false;
}
return activeRefs.length === 0 && !hasActiveChild;
});
const detailSections = [
`Task: #${task.id} ${task.title}`,
"",
"Active dependency signals",
...(
activeDatasets.length > 0
? activeDatasets.map(({ dataset, activeRefs }) =>
`- ${dataset.name} (${dataset.used}, mount ${dataset.mountpoint}) <- ${activeRefs
.slice(0, 4)
.map((ref) => path.relative(repoRoots[0] || process.cwd(), ref.source))
.join(", ")}`,
)
: ["- none detected"]
),
"",
"Review candidates with no active dependency signal",
...(
reviewCandidates.length > 0
? reviewCandidates.map(({ dataset, legacyRefs }) =>
`- ${dataset.name} (${dataset.used}, mount ${dataset.mountpoint})${
legacyRefs.length > 0 ? ` [legacy refs: ${legacyRefs.length}]` : ""
}`,
)
: ["- none"]
),
"",
"Legacy-only references",
...(
legacySignals.length > 0
? legacySignals.map((signal) => `- ${signal.dataset} <- ${path.relative(repoRoots[0] || process.cwd(), signal.source)}`)
: ["- none"]
),
"",
"Live datasets",
...datasets.map((dataset) => `- ${dataset.name} | used ${dataset.used} | mount ${dataset.mountpoint}`),
];
return {
state: "completed" as const,
summary: "TrueNAS dataset dependency audit completed",
detail: truncateOutput(detailSections.join("\n"), 12000),
callback: {
status: "Review",
dispatch_state: "completed",
summary: "TrueNAS dataset dependency audit completed",
detail: truncateOutput(detailSections.join("\n"), 12000),
completed_by: "direct-ssh:truenas-audit",
last_error: null,
last_dispatch_at: new Date().toISOString(),
},
};
}
async function dispatchOpenClawTask(taskId: number): Promise<DispatchResult> {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
if (!task.repo_slug) {
throw new Error("repo_slug_required_for_openclaw_dispatch");
}
const repoMap = readRepoMap();
const repoPath = ensureAllowedRepoPath(repoMap[task.repo_slug] || "");
if (!repoPath || !fs.existsSync(path.join(repoPath, ".git"))) {
throw new Error(`repo_not_available:${task.repo_slug}`);
}
await execFileAsync("git", ["config", "--global", "--add", "safe.directory", repoPath]);
const agentName = task.preferred_agent || "codex";
const taskKey = `taskboard-${task.id}`;
const repoName = path.basename(repoPath);
const worktree = path.join(SWARM_WORKTREES_DIR, repoName, taskKey);
const hostWorktree = path.join(SWARM_HOST_WORKTREES_DIR, repoName, taskKey);
const branch = `feat/taskboard-${task.id}`;
const baseBranch = task.base_branch || "main";
fs.mkdirSync(path.dirname(worktree), { recursive: true });
if (!fs.existsSync(worktree)) {
try {
await execFileAsync("git", ["-C", repoPath, "fetch", "origin", baseBranch]);
} catch {
// Keep going. Many local repos already have the base branch available.
}
await execFileAsync("git", ["-C", repoPath, "worktree", "add", worktree, "-b", branch, `origin/${baseBranch}`]);
}
ensureSwarmRegistry();
const registry = JSON.parse(fs.readFileSync(SWARM_TASKS_FILE, "utf8")) as { tasks?: Array<Record<string, unknown>> };
const tasks = Array.isArray(registry.tasks) ? registry.tasks : [];
const existing = tasks.find((entry) => entry.taskboardTaskId === task.id);
if (!existing) {
tasks.push({
id: taskKey,
agent: agentName,
repo: repoName,
repoPath,
repoSlug: task.repo_slug,
worktree: hostWorktree,
branch,
baseBranch,
tmuxSession: `${agentName}-${taskKey}`,
description: task.description,
prompt: `Taskboard task #${task.id}: ${task.title}\n\n${task.description}`,
status: "queued",
notifyOnComplete: true,
attempts: 0,
maxAttempts: 3,
model: task.model_hint || defaultModelForAgent(agentName),
reasoning: task.reasoning_effort || "high",
taskboardTaskId: task.id,
startedAt: null,
completedAt: null,
createdAt: Date.now(),
checks: {
prCreated: false,
ciPassed: false,
reviewPassed: false,
},
});
} else {
existing.repoPath = repoPath;
existing.repoSlug = task.repo_slug;
existing.worktree = hostWorktree;
existing.branch = branch;
existing.baseBranch = baseBranch;
existing.agent = agentName;
existing.tmuxSession = `${agentName}-${taskKey}`;
}
fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks }, null, 2));
return {
state: "dispatched" as const,
summary: `Queued in OpenClaw swarm for ${agentName}`,
detail: `${task.repo_slug} -> ${hostWorktree}`,
};
}
async function dispatchZeroClawTask(taskId: number): Promise<DispatchResult> {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
const agent = await findAgentByAssignmentKey(task.assignee);
if (!agent) {
throw new Error("assignee_not_found");
}
const urlEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_URL" : "ZEROCLAW_ICE_URL";
const tokenEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_TOKEN" : "ZEROCLAW_ICE_TOKEN";
const baseUrl = process.env[urlEnv];
if (!baseUrl) {
throw new Error(`missing_gateway_url:${urlEnv}`);
}
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), ZEROCLAW_WEBHOOK_TIMEOUT_MS);
const response = await fetch(`${baseUrl.replace(/\/$/, "")}/webhook`, {
method: "POST",
headers: {
"Content-Type": "application/json",
...(process.env[tokenEnv] ? { Authorization: `Bearer ${process.env[tokenEnv]}` } : {}),
},
body: JSON.stringify({
message: `Taskboard task #${task.id}: ${task.title}\nHost: ${task.target_host || agent.host}\nChannel: ${task.target_channel || "n/a"}\n\n${task.description}`,
}),
signal: controller.signal,
});
clearTimeout(timeout);
if (!response.ok) {
const responseText = await response.text();
throw new Error(`webhook_failed:${response.status}:${responseText}`);
}
const responseText = await response.text();
return {
state: "dispatched" as const,
summary: `Posted to ${agent.name} webhook`,
detail: responseText || `${agent.host} webhook accepted`,
};
}
function findDirectAgentDefinition(assignmentKey: string) {
return (
FLEET_CONFIG.directAgents.find(
(agent) => agent.assignmentKey === assignmentKey || agent.aliases.includes(assignmentKey),
) || null
);
}
async function dispatchDirectTask(taskId: number): Promise<DispatchResult> {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
const directAgent = findDirectAgentDefinition(task.assignee);
if (!directAgent) {
throw new Error("direct_target_not_found");
}
const actionKey = extractTagValue(task.tags, "action:") || directAgent.dispatch.defaultAction;
const action = directAgent.dispatch.actions.find((entry) => entry.key === actionKey);
if (!action) {
throw new Error(`unsupported_direct_action:${actionKey}`);
}
if (action.command === "builtin:truenas-dataset-audit") {
return runTrueNasDatasetAudit(
taskId,
directAgent.dispatch.hostname,
directAgent.dispatch.user,
directAgent.dispatch.port,
);
}
const sshArgs = [
...sshConnectionArgs(
directAgent.dispatch.hostname,
directAgent.dispatch.user,
directAgent.dispatch.port,
),
action.command,
];
try {
const { stdout, stderr } = await execFileAsync("ssh", sshArgs, {
timeout: DIRECT_SSH_TIMEOUT_MS,
maxBuffer: 1024 * 1024,
});
const detail = truncateOutput([stdout, stderr].filter(Boolean).join("\n"));
return {
state: "completed" as const,
summary: `${action.successSummary}`,
detail,
callback: {
status: "Done",
dispatch_state: "completed",
summary: action.successSummary,
detail,
completed_by: `direct-ssh:${directAgent.host}`,
last_error: null,
last_dispatch_at: new Date().toISOString(),
},
};
} catch (error) {
const execError = error as Error & { stdout?: string; stderr?: string };
const detail = truncateOutput([execError.stdout, execError.stderr, execError.message].filter(Boolean).join("\n"));
throw new Error(`direct_ssh_failed:${directAgent.host}:${action.key}:${detail}`);
}
}
export async function dispatchTask(taskId: number) {
const task = await findTask(taskId);
if (!task) {
throw new Error("task_not_found");
}
if (!task.assignee) {
throw new Error("assignee_required");
}
const agent = await findAgentByAssignmentKey(task.assignee);
if (!agent) {
throw new Error("assignee_not_found");
}
await appendTaskEvent({
taskId,
assignee: task.assignee,
family: task.family,
host: task.target_host,
eventType: "dispatch_requested",
state: task.dispatch_state,
summary: `Dispatch requested for ${agent.name}`,
detail: task.description,
});
try {
const result =
agent.family === "openclaw"
? await dispatchOpenClawTask(taskId)
: agent.family === "zeroclaw"
? await dispatchZeroClawTask(taskId)
: await dispatchDirectTask(taskId);
if (result.callback) {
const updated = await applyTaskCallback(taskId, result.callback);
if (!updated) {
throw new Error("task_not_found_after_callback");
}
return updated;
}
const updated = await updateTask(taskId, {
status: task.status === "Backlog" ? "Todo" : task.status,
dispatch_state: result.state,
last_dispatch_at: new Date().toISOString(),
last_error: null,
});
if (!updated) {
throw new Error("task_not_found_after_dispatch");
}
await appendTaskEvent({
taskId,
assignee: updated.assignee,
family: updated.family,
host: updated.target_host,
eventType: "dispatch_succeeded",
state: updated.dispatch_state,
summary: result.summary,
detail: result.detail,
});
return updated;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const updated = await updateTask(taskId, {
dispatch_state: "failed",
last_error: message,
});
await appendTaskEvent({
taskId,
assignee: task.assignee,
family: task.family,
host: task.target_host,
eventType: "dispatch_failed",
state: "failed",
summary: "Dispatch failed",
detail: message,
});
if (!updated) {
throw error;
}
return updated;
}
}