import fs from "node:fs"; import path from "node:path"; import { promisify } from "node:util"; import { execFile } from "node:child_process"; import { DIRECT_SSH_KEY_PATH, DIRECT_SSH_TIMEOUT_MS, FLEET_CONFIG, REPO_ACCESS_ROOTS, SWARM_HOST_WORKTREES_DIR, SWARM_REPO_MAP_FILE, SWARM_TASKS_FILE, SWARM_WORKTREES_DIR, ZEROCLAW_WEBHOOK_TIMEOUT_MS, } from "@/lib/fleet-config"; import { findAgentByAssignmentKey } from "@/lib/agents"; import { appendTaskEvent, applyTaskCallback, findTask, updateTask } from "@/lib/tasks"; import type { DispatchState, TaskCallbackPayload } from "@/lib/types"; const execFileAsync = promisify(execFile); type DispatchResult = { state: DispatchState; summary: string; detail: string; callback?: TaskCallbackPayload; }; function defaultModelForAgent(agent: string) { switch (agent) { case "opencode": return "zai-coding-plan/glm-4.7"; case "gemini": return "gemini-3.1-pro"; default: return "gpt-5.3-codex"; } } function readRepoMap() { if (!fs.existsSync(SWARM_REPO_MAP_FILE)) { throw new Error(`missing_repo_map:${SWARM_REPO_MAP_FILE}`); } return JSON.parse(fs.readFileSync(SWARM_REPO_MAP_FILE, "utf8")) as Record; } function ensureAllowedRepoPath(repoPath: string) { const resolved = path.resolve(repoPath); const allowed = REPO_ACCESS_ROOTS.some((root) => resolved.startsWith(path.resolve(root))); if (!allowed) { throw new Error(`repo_path_not_allowed:${resolved}`); } return resolved; } function ensureSwarmRegistry() { fs.mkdirSync(path.dirname(SWARM_TASKS_FILE), { recursive: true }); if (!fs.existsSync(SWARM_TASKS_FILE)) { fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks: [] }, null, 2)); } } function extractTagValue(tags: string[], prefix: string) { const match = tags.find((tag) => tag.startsWith(prefix)); return match ? match.slice(prefix.length) : null; } function truncateOutput(output: string, maxLength = 4000) { const trimmed = output .split("\n") .filter((line) => !line.startsWith("Warning: Permanently added ")) .join("\n") .trim(); if (trimmed.length <= maxLength) { return trimmed; } return `${trimmed.slice(0, maxLength - 15)}\n...[truncated]`; } type TrueNasDataset = { name: string; used: string; avail: string; mountpoint: string; }; type DatasetSignal = { source: string; dataset: string; signalType: "active" | "legacy"; matchedText: string; }; const TRUENAS_SIGNAL_PATTERNS: Array<{ dataset: string; patterns: string[]; signalType: "active" | "legacy"; }> = [ { dataset: "TrueNAS/NetworkMediaShare", patterns: [ "/mnt/truenas/mediadata", "/mnt/TrueNAS/NetworkMediaShare", "/mnt/TrueNAS/NetworkMediaShare/mediadata", ], signalType: "active", }, { dataset: "RPiPool/PersonalMediaLibrary", patterns: [ "/mnt/PersonalMediaLibrary", "/mnt/RPiPool/PersonalMediaLibrary", ], signalType: "active", }, { dataset: "TrueNAS/backups", patterns: [ "/mnt/truenas-backup", "/mnt/TrueNAS/backups", ], signalType: "active", }, { dataset: "TrueNAS/container-config", patterns: ["/mnt/TrueNAS/container-config"], signalType: "active", }, { dataset: "TrueNAS/databases", patterns: ["/mnt/TrueNAS/databases"], signalType: "active", }, { dataset: "TrueNAS/homelab/databases", patterns: ["/mnt/TrueNAS/homelab/databases"], signalType: "active", }, { dataset: "TrueNAS/homelab/hosts/ubuntu/docker-data", patterns: ["/mnt/TrueNAS/homelab/hosts/ubuntu/docker-data"], signalType: "active", }, { dataset: "TrueNAS/homelab/hosts/grizzley/docker-data", patterns: ["/mnt/TrueNAS/homelab/hosts/grizzley/docker-data"], signalType: "active", }, { dataset: "TrueNAS/homelab/hosts/ice/docker-data", patterns: ["/mnt/TrueNAS/homelab/hosts/ice/docker-data"], signalType: "active", }, { dataset: "TrueNAS/RPiPool-backup", patterns: ["/mnt/TrueNAS/RPiPool-backup"], signalType: "legacy", }, { dataset: "TrueNAS/PersonalMediaLibraryBackup", patterns: ["/mnt/TrueNAS/PersonalMediaLibraryBackup"], signalType: "legacy", }, { dataset: "TrueNAS/TimeMachine", patterns: ["/mnt/TrueNAS/TimeMachine", "TimeMachine"], signalType: "legacy", }, { dataset: "TrueNAS/UserShares", patterns: ["/mnt/TrueNAS/UserShares", "UserShares"], signalType: "legacy", }, { dataset: "TrueNAS/UserShares/RedVelvet", patterns: ["/mnt/TrueNAS/UserShares/RedVelvet", "TrueNAS/UserShares/RedVelvet"], signalType: "legacy", }, { dataset: "TrueNAS/UserShares/Vanilla", patterns: ["/mnt/TrueNAS/UserShares/Vanilla", "TrueNAS/UserShares/Vanilla"], signalType: "legacy", }, { dataset: "TrueNAS/traefik-certs", patterns: ["/mnt/truenas/traefik-certs", "/mnt/TrueNAS/traefik-certs"], signalType: "active", }, ]; const ACTIVE_TRUENAS_SCAN_PATHS = [ "homelab/ubuntu", "homelab/grizzley", "homelab/inventory/ubuntu.json", "homelab/inventory/grizzley.json", "homelab/truenas/AGENTS.md", "homelab/AGENTS.md", "homelab/catalog", "ansible/playbooks", ]; const LEGACY_TRUENAS_SCAN_PATHS = [ "homelab/inventory/truenas.json", "homelab/proxmox/truenas", "obsidian-vault/homelab", ]; function sshConnectionArgs(host: string, user: string, port: number) { return [ "-F", "/dev/null", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=accept-new", "-o", "ConnectTimeout=15", "-o", "IdentitiesOnly=yes", "-o", "UserKnownHostsFile=/tmp/taskboard_known_hosts", "-i", DIRECT_SSH_KEY_PATH, "-p", String(port), `${user}@${host}`, ]; } function existingHomelabRoots() { const candidates = [ process.env.HOMELAB_REPO_ROOT, "/home/bear/homelabagentroot", "/home/christopher/opencode-home", ].filter((entry): entry is string => Boolean(entry)); return [...new Set(candidates)].filter((candidate) => fs.existsSync(candidate)); } function collectTextFiles(targetPath: string, collected: string[]) { if (!fs.existsSync(targetPath)) { return; } const stat = fs.statSync(targetPath); if (stat.isFile()) { if (stat.size <= 1024 * 1024) { collected.push(targetPath); } return; } for (const entry of fs.readdirSync(targetPath, { withFileTypes: true })) { if (entry.name.startsWith(".git") || entry.name === "node_modules" || entry.name === "code-server-ai") { continue; } collectTextFiles(path.join(targetPath, entry.name), collected); } } function scanTrueNasSignals(repoRoot: string, relativePaths: string[], signalType: "active" | "legacy") { const files: string[] = []; for (const relativePath of relativePaths) { collectTextFiles(path.join(repoRoot, relativePath), files); } const signals: DatasetSignal[] = []; for (const filePath of files) { let content = ""; try { content = fs.readFileSync(filePath, "utf8"); } catch { continue; } for (const mapping of TRUENAS_SIGNAL_PATTERNS.filter((entry) => entry.signalType === signalType)) { const matchedPattern = mapping.patterns.find((pattern) => content.includes(pattern)); if (matchedPattern) { signals.push({ source: filePath, dataset: mapping.dataset, signalType, matchedText: matchedPattern, }); } } } return signals; } function datasetHierarchyName(datasetName: string) { return datasetName.replace(/\/+$/, ""); } function summarizeSignals(signals: DatasetSignal[], dataset: string) { return signals.filter((signal) => signal.dataset === dataset); } async function runTrueNasDatasetAudit(taskId: number, host: string, user: string, port: number): Promise { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } const sshArgs = sshConnectionArgs(host, user, port); const { stdout } = await execFileAsync( "ssh", [...sshArgs, "zfs list -H -o name,used,avail,mountpoint"], { timeout: DIRECT_SSH_TIMEOUT_MS, maxBuffer: 1024 * 1024, }, ); const datasets = stdout .split("\n") .map((line) => line.trim()) .filter(Boolean) .map((line) => { const [name, used, avail, mountpoint] = line.split("\t"); return { name, used, avail, mountpoint } satisfies TrueNasDataset; }); const repoRoots = existingHomelabRoots(); const activeSignals = repoRoots.flatMap((repoRoot) => scanTrueNasSignals(repoRoot, ACTIVE_TRUENAS_SCAN_PATHS, "active"), ); const legacySignals = repoRoots.flatMap((repoRoot) => scanTrueNasSignals(repoRoot, LEGACY_TRUENAS_SCAN_PATHS, "legacy"), ); const childActiveMap = new Map(); for (const dataset of datasets) { const parentNames = dataset.name.split("/").map((_, index, parts) => parts.slice(0, index + 1).join("/")); for (const parentName of parentNames.slice(0, -1)) { childActiveMap.set(parentName, (childActiveMap.get(parentName) || 0) + 1); } } const activeDatasets = datasets .map((dataset) => ({ dataset, activeRefs: summarizeSignals(activeSignals, dataset.name), legacyRefs: summarizeSignals(legacySignals, dataset.name), })) .filter(({ activeRefs }) => activeRefs.length > 0); const activeDatasetNames = new Set(activeDatasets.map(({ dataset }) => dataset.name)); const reviewCandidates = datasets .map((dataset) => ({ dataset, activeRefs: summarizeSignals(activeSignals, dataset.name), legacyRefs: summarizeSignals(legacySignals, dataset.name), hasActiveChild: datasets.some( (candidate) => candidate.name !== dataset.name && datasetHierarchyName(candidate.name).startsWith(`${datasetHierarchyName(dataset.name)}/`) && summarizeSignals(activeSignals, candidate.name).length > 0, ), hasActiveAncestor: [...activeDatasetNames].some( (activeDatasetName) => dataset.name !== activeDatasetName && datasetHierarchyName(dataset.name).startsWith(`${datasetHierarchyName(activeDatasetName)}/`), ), })) .filter(({ dataset, activeRefs, hasActiveChild, hasActiveAncestor }) => { if ( dataset.name === "TrueNAS" || dataset.name === "RPiPool" || dataset.name.startsWith("boot-pool") || dataset.name.includes("/.system") ) { return false; } return activeRefs.length === 0 && !hasActiveChild && !hasActiveAncestor; }); const detailSections = [ `Task: #${task.id} ${task.title}`, "", "Active dependency signals", ...( activeDatasets.length > 0 ? activeDatasets.map(({ dataset, activeRefs }) => `- ${dataset.name} (${dataset.used}, mount ${dataset.mountpoint}) <- ${activeRefs .slice(0, 4) .map((ref) => path.relative(repoRoots[0] || process.cwd(), ref.source)) .join(", ")}`, ) : ["- none detected"] ), "", "Review candidates with no active dependency signal", ...( reviewCandidates.length > 0 ? reviewCandidates.map(({ dataset, legacyRefs }) => `- ${dataset.name} (${dataset.used}, mount ${dataset.mountpoint})${ legacyRefs.length > 0 ? ` [legacy refs: ${legacyRefs.length}]` : "" }`, ) : ["- none"] ), "", "Legacy-only references", ...( legacySignals.length > 0 ? legacySignals.map((signal) => `- ${signal.dataset} <- ${path.relative(repoRoots[0] || process.cwd(), signal.source)}`) : ["- none"] ), "", "Live datasets", ...datasets.map((dataset) => `- ${dataset.name} | used ${dataset.used} | mount ${dataset.mountpoint}`), ]; return { state: "completed" as const, summary: "TrueNAS dataset dependency audit completed", detail: truncateOutput(detailSections.join("\n"), 12000), callback: { status: "Review", dispatch_state: "completed", summary: "TrueNAS dataset dependency audit completed", detail: truncateOutput(detailSections.join("\n"), 12000), completed_by: "direct-ssh:truenas-audit", last_error: null, last_dispatch_at: new Date().toISOString(), }, }; } async function dispatchOpenClawTask(taskId: number): Promise { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } if (!task.repo_slug) { throw new Error("repo_slug_required_for_openclaw_dispatch"); } const repoMap = readRepoMap(); const repoPath = ensureAllowedRepoPath(repoMap[task.repo_slug] || ""); if (!repoPath || !fs.existsSync(path.join(repoPath, ".git"))) { throw new Error(`repo_not_available:${task.repo_slug}`); } await execFileAsync("git", ["config", "--global", "--add", "safe.directory", repoPath]); const agentName = task.preferred_agent || "codex"; const taskKey = `taskboard-${task.id}`; const repoName = path.basename(repoPath); const worktree = path.join(SWARM_WORKTREES_DIR, repoName, taskKey); const hostWorktree = path.join(SWARM_HOST_WORKTREES_DIR, repoName, taskKey); const branch = `feat/taskboard-${task.id}`; const baseBranch = task.base_branch || "main"; fs.mkdirSync(path.dirname(worktree), { recursive: true }); if (!fs.existsSync(worktree)) { try { await execFileAsync("git", ["-C", repoPath, "fetch", "origin", baseBranch]); } catch { // Keep going. Many local repos already have the base branch available. } await execFileAsync("git", ["-C", repoPath, "worktree", "add", worktree, "-b", branch, `origin/${baseBranch}`]); } ensureSwarmRegistry(); const registry = JSON.parse(fs.readFileSync(SWARM_TASKS_FILE, "utf8")) as { tasks?: Array> }; const tasks = Array.isArray(registry.tasks) ? registry.tasks : []; const existing = tasks.find((entry) => entry.taskboardTaskId === task.id); if (!existing) { tasks.push({ id: taskKey, agent: agentName, repo: repoName, repoPath, repoSlug: task.repo_slug, worktree: hostWorktree, branch, baseBranch, tmuxSession: `${agentName}-${taskKey}`, description: task.description, prompt: `Taskboard task #${task.id}: ${task.title}\n\n${task.description}`, status: "queued", notifyOnComplete: true, attempts: 0, maxAttempts: 3, model: task.model_hint || defaultModelForAgent(agentName), reasoning: task.reasoning_effort || "high", taskboardTaskId: task.id, startedAt: null, completedAt: null, createdAt: Date.now(), checks: { prCreated: false, ciPassed: false, reviewPassed: false, }, }); } else { existing.repoPath = repoPath; existing.repoSlug = task.repo_slug; existing.worktree = hostWorktree; existing.branch = branch; existing.baseBranch = baseBranch; existing.agent = agentName; existing.tmuxSession = `${agentName}-${taskKey}`; } fs.writeFileSync(SWARM_TASKS_FILE, JSON.stringify({ tasks }, null, 2)); return { state: "dispatched" as const, summary: `Queued in OpenClaw swarm for ${agentName}`, detail: `${task.repo_slug} -> ${hostWorktree}`, }; } async function dispatchZeroClawTask(taskId: number): Promise { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } const agent = await findAgentByAssignmentKey(task.assignee); if (!agent) { throw new Error("assignee_not_found"); } const urlEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_URL" : "ZEROCLAW_ICE_URL"; const tokenEnv = agent.slug === "grizzley-zeroclaw" ? "ZEROCLAW_GRIZZLEY_TOKEN" : "ZEROCLAW_ICE_TOKEN"; const baseUrl = process.env[urlEnv]; if (!baseUrl) { throw new Error(`missing_gateway_url:${urlEnv}`); } const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), ZEROCLAW_WEBHOOK_TIMEOUT_MS); const response = await fetch(`${baseUrl.replace(/\/$/, "")}/webhook`, { method: "POST", headers: { "Content-Type": "application/json", ...(process.env[tokenEnv] ? { Authorization: `Bearer ${process.env[tokenEnv]}` } : {}), }, body: JSON.stringify({ message: `Taskboard task #${task.id}: ${task.title}\nHost: ${task.target_host || agent.host}\nChannel: ${task.target_channel || "n/a"}\n\n${task.description}`, }), signal: controller.signal, }); clearTimeout(timeout); if (!response.ok) { const responseText = await response.text(); throw new Error(`webhook_failed:${response.status}:${responseText}`); } const responseText = await response.text(); return { state: "dispatched" as const, summary: `Posted to ${agent.name} webhook`, detail: responseText || `${agent.host} webhook accepted`, }; } function findDirectAgentDefinition(assignmentKey: string) { return ( FLEET_CONFIG.directAgents.find( (agent) => agent.assignmentKey === assignmentKey || agent.aliases.includes(assignmentKey), ) || null ); } async function dispatchDirectTask(taskId: number): Promise { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } const directAgent = findDirectAgentDefinition(task.assignee); if (!directAgent) { throw new Error("direct_target_not_found"); } const actionKey = extractTagValue(task.tags, "action:") || directAgent.dispatch.defaultAction; const action = directAgent.dispatch.actions.find((entry) => entry.key === actionKey); if (!action) { throw new Error(`unsupported_direct_action:${actionKey}`); } if (action.command === "builtin:truenas-dataset-audit") { return runTrueNasDatasetAudit( taskId, directAgent.dispatch.hostname, directAgent.dispatch.user, directAgent.dispatch.port, ); } const sshArgs = [ ...sshConnectionArgs( directAgent.dispatch.hostname, directAgent.dispatch.user, directAgent.dispatch.port, ), action.command, ]; try { const { stdout, stderr } = await execFileAsync("ssh", sshArgs, { timeout: DIRECT_SSH_TIMEOUT_MS, maxBuffer: 1024 * 1024, }); const detail = truncateOutput([stdout, stderr].filter(Boolean).join("\n")); return { state: "completed" as const, summary: `${action.successSummary}`, detail, callback: { status: "Done", dispatch_state: "completed", summary: action.successSummary, detail, completed_by: `direct-ssh:${directAgent.host}`, last_error: null, last_dispatch_at: new Date().toISOString(), }, }; } catch (error) { const execError = error as Error & { stdout?: string; stderr?: string }; const detail = truncateOutput([execError.stdout, execError.stderr, execError.message].filter(Boolean).join("\n")); throw new Error(`direct_ssh_failed:${directAgent.host}:${action.key}:${detail}`); } } export async function dispatchTask(taskId: number) { const task = await findTask(taskId); if (!task) { throw new Error("task_not_found"); } if (!task.assignee) { throw new Error("assignee_required"); } const agent = await findAgentByAssignmentKey(task.assignee); if (!agent) { throw new Error("assignee_not_found"); } await appendTaskEvent({ taskId, assignee: task.assignee, family: task.family, host: task.target_host, eventType: "dispatch_requested", state: task.dispatch_state, summary: `Dispatch requested for ${agent.name}`, detail: task.description, }); try { const result = agent.family === "openclaw" ? await dispatchOpenClawTask(taskId) : agent.family === "zeroclaw" ? await dispatchZeroClawTask(taskId) : await dispatchDirectTask(taskId); if (result.callback) { const updated = await applyTaskCallback(taskId, result.callback); if (!updated) { throw new Error("task_not_found_after_callback"); } return updated; } const updated = await updateTask(taskId, { status: task.status === "Backlog" ? "Todo" : task.status, dispatch_state: result.state, last_dispatch_at: new Date().toISOString(), last_error: null, }); if (!updated) { throw new Error("task_not_found_after_dispatch"); } await appendTaskEvent({ taskId, assignee: updated.assignee, family: updated.family, host: updated.target_host, eventType: "dispatch_succeeded", state: updated.dispatch_state, summary: result.summary, detail: result.detail, }); return updated; } catch (error) { const message = error instanceof Error ? error.message : String(error); const updated = await updateTask(taskId, { dispatch_state: "failed", last_error: message, }); await appendTaskEvent({ taskId, assignee: task.assignee, family: task.family, host: task.target_host, eventType: "dispatch_failed", state: "failed", summary: "Dispatch failed", detail: message, }); if (!updated) { throw error; } return updated; } }