agent-swarm/server.mjs
OpSpawn 836f2edd9c Agent Swarm v1.0: Multi-agent task decomposition and delegation via A2A protocol
- Coordinator agent accepts complex tasks and decomposes into subtasks
- Delegates to specialized worker agents via A2A v0.3 JSON-RPC
- Handles x402 payment flows (paid skills return payment-required)
- Free skills execute end-to-end through delegation chain
- Web dashboard with real-time swarm monitoring
- 14 tests passing
- Worker agents: screenshot, PDF, HTML (via A2A gateway)
- Event log tracks all delegations and completions
2026-02-06 13:05:01 +00:00

517 lines
16 KiB
JavaScript

import http from 'node:http';
import { readFile } from 'node:fs/promises';
import { fileURLToPath } from 'node:url';
import { dirname, join } from 'node:path';
import { randomUUID } from 'node:crypto';
const __dirname = dirname(fileURLToPath(import.meta.url));
const PORT = parseInt(process.env.PORT || '4003');
// --- Agent Registry: Known worker agents ---
const WORKER_AGENTS = {
'screenshot': {
name: 'SnapAPI Screenshot Agent',
a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002',
skills: ['screenshot', 'webpage-capture'],
costPerTask: '$0.01'
},
'pdf': {
name: 'SnapAPI PDF Agent',
a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002',
skills: ['markdown-to-pdf', 'document-generation'],
costPerTask: '$0.005'
},
'html': {
name: 'SnapAPI HTML Agent',
a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002',
skills: ['markdown-to-html'],
costPerTask: 'free'
}
};
// --- State ---
const swarms = new Map(); // swarmId -> Swarm
const tasks = new Map(); // taskId -> Task
const eventLog = []; // append-only event log
// --- Agent Card (A2A v0.3 compliant) ---
const agentCard = {
name: 'OpSpawn Agent Swarm',
description: 'Multi-agent task decomposition and delegation system. Accepts complex tasks, breaks them into subtasks, delegates to specialized worker agents via A2A, and assembles results.',
version: '1.0.0',
url: process.env.PUBLIC_URL || `http://localhost:${PORT}`,
provider: {
organization: 'OpSpawn',
url: 'https://opspawn.com'
},
capabilities: {
streaming: false,
pushNotifications: false,
extensions: [
{
uri: 'urn:x402:payment:v2',
description: 'x402 V2 micropayments for inter-agent commerce',
required: false
},
{
uri: 'urn:opspawn:swarm',
description: 'Multi-agent task decomposition and delegation',
required: false
}
]
},
skills: [
{
id: 'decompose-and-execute',
name: 'Task Decomposition & Execution',
description: 'Takes a complex task, decomposes it into subtasks, delegates to specialized worker agents, and assembles the results.',
tags: ['orchestration', 'multi-agent', 'decomposition'],
examples: [
'Create a visual report about opspawn.com including a screenshot and summary',
'Generate a PDF from this markdown with a screenshot of the rendered page',
'Capture screenshots of these 3 URLs and compile them into a report'
]
},
{
id: 'discover-agents',
name: 'Agent Discovery',
description: 'Discover available worker agents and their capabilities.',
tags: ['discovery', 'registry']
},
{
id: 'swarm-status',
name: 'Swarm Status',
description: 'Get the status of a running swarm task, including subtask progress.',
tags: ['monitoring', 'status']
}
],
defaultInputModes: ['text/plain'],
defaultOutputModes: ['application/json', 'text/plain']
};
// --- Task Planner: Decomposes complex tasks into subtasks ---
function planTask(instruction) {
const lower = instruction.toLowerCase();
const subtasks = [];
// URL detection
const urlMatch = instruction.match(/https?:\/\/[^\s"'<>]+/g);
// Screenshot requests
if (lower.includes('screenshot') || lower.includes('capture') || lower.includes('visual')) {
const urls = urlMatch || ['https://opspawn.com'];
for (const url of urls) {
subtasks.push({
type: 'screenshot',
agent: 'screenshot',
params: { url },
description: `Capture screenshot of ${url}`
});
}
}
// PDF generation
if (lower.includes('pdf') || lower.includes('report') || lower.includes('document')) {
subtasks.push({
type: 'pdf',
agent: 'pdf',
params: { markdown: instruction },
description: 'Generate PDF report from content'
});
}
// HTML rendering
if (lower.includes('html') || lower.includes('render') || lower.includes('preview')) {
subtasks.push({
type: 'html',
agent: 'html',
params: { markdown: instruction },
description: 'Render content as HTML'
});
}
// Compound tasks: if "report" is mentioned, include both screenshot + PDF
if (lower.includes('report') && !subtasks.some(s => s.type === 'screenshot')) {
const url = urlMatch?.[0] || 'https://opspawn.com';
subtasks.unshift({
type: 'screenshot',
agent: 'screenshot',
params: { url },
description: `Capture screenshot for report: ${url}`
});
}
// Default: if no subtasks identified, treat as HTML rendering
if (subtasks.length === 0) {
subtasks.push({
type: 'html',
agent: 'html',
params: { markdown: instruction },
description: 'Process text as HTML content'
});
}
return subtasks;
}
// --- Swarm Execution Engine ---
async function executeSwarm(swarmId) {
const swarm = swarms.get(swarmId);
if (!swarm) return;
swarm.state = 'running';
swarm.startedAt = new Date().toISOString();
logEvent('coordinator', 'swarm-started', { swarmId, subtaskCount: swarm.subtasks.length });
const results = [];
for (const subtask of swarm.subtasks) {
subtask.state = 'running';
subtask.startedAt = new Date().toISOString();
logEvent('coordinator', 'subtask-started', { swarmId, subtaskId: subtask.id, type: subtask.type });
try {
const result = await delegateToWorker(subtask);
subtask.state = 'completed';
subtask.result = result;
subtask.completedAt = new Date().toISOString();
results.push({ subtaskId: subtask.id, type: subtask.type, result });
logEvent('coordinator', 'subtask-completed', { swarmId, subtaskId: subtask.id, type: subtask.type });
} catch (err) {
subtask.state = 'failed';
subtask.error = err.message;
subtask.completedAt = new Date().toISOString();
results.push({ subtaskId: subtask.id, type: subtask.type, error: err.message });
logEvent('coordinator', 'subtask-failed', { swarmId, subtaskId: subtask.id, error: err.message });
}
}
// Assemble results
const completed = swarm.subtasks.filter(s => s.state === 'completed').length;
const failed = swarm.subtasks.filter(s => s.state === 'failed').length;
swarm.state = failed === swarm.subtasks.length ? 'failed' : 'completed';
swarm.completedAt = new Date().toISOString();
swarm.results = results;
swarm.summary = `Swarm completed: ${completed}/${swarm.subtasks.length} subtasks succeeded, ${failed} failed.`;
logEvent('coordinator', 'swarm-completed', { swarmId, completed, failed, total: swarm.subtasks.length });
return swarm;
}
// --- Worker Delegation via A2A ---
async function delegateToWorker(subtask) {
const worker = WORKER_AGENTS[subtask.agent];
if (!worker) throw new Error(`Unknown worker agent: ${subtask.agent}`);
// Build message text based on subtask type
let messageText;
switch (subtask.type) {
case 'screenshot':
messageText = `Take a screenshot of ${subtask.params.url}`;
break;
case 'pdf':
messageText = `Convert to PDF:\n\n${subtask.params.markdown}`;
break;
case 'html':
messageText = `Convert to HTML:\n\n${subtask.params.markdown}`;
break;
default:
messageText = subtask.params.markdown || subtask.description;
}
// A2A v0.3 JSON-RPC request — POST to root endpoint
const a2aRequest = {
jsonrpc: '2.0',
id: randomUUID(),
method: 'message/send',
params: {
message: {
messageId: randomUUID(),
role: 'user',
kind: 'message',
parts: [{ kind: 'text', text: messageText }]
}
}
};
// Send to worker agent root endpoint (A2A uses POST /)
const url = worker.a2aUrl;
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(a2aRequest)
});
if (!response.ok) {
const text = await response.text();
throw new Error(`Worker ${subtask.agent} returned ${response.status}: ${text}`);
}
const result = await response.json();
// Handle JSON-RPC error
if (result.error) {
throw new Error(`Worker ${subtask.agent} error: ${result.error.message}`);
}
const taskResult = result.result;
const state = taskResult?.status?.state;
const message = taskResult?.status?.message;
// Check if payment is required (x402 flow)
if (state === 'input-required') {
// Look for x402 payment info in message parts or metadata
const dataPart = message?.parts?.find(p => p.kind === 'data' && p.data?.['x402.payment.required']);
const accepts = dataPart?.data?.['x402.accepts'] || taskResult?.metadata?.['x402.accepts'];
if (accepts || dataPart) {
return {
state: 'payment-required',
worker: worker.name,
cost: worker.costPerTask,
paymentInfo: accepts,
skill: dataPart?.data?.skill || subtask.type,
message: `Worker ${worker.name} requires payment of ${worker.costPerTask} to complete this task.`
};
}
}
// Task completed successfully
if (state === 'completed') {
const parts = message?.parts || [];
return {
state: 'completed',
worker: worker.name,
output: parts.map(p => p.text || (p.data ? JSON.stringify(p.data) : '[binary]')).join('\n')
};
}
return { state: state || 'unknown', raw: result };
}
// --- Event Logging ---
function logEvent(agentId, action, data = {}) {
const event = {
timestamp: new Date().toISOString(),
agentId,
action,
...data
};
eventLog.push(event);
// Keep last 1000 events
if (eventLog.length > 1000) eventLog.shift();
}
// --- HTTP Request Helpers ---
function parseBody(req) {
return new Promise((resolve, reject) => {
const chunks = [];
req.on('data', c => chunks.push(c));
req.on('end', () => {
try { resolve(JSON.parse(Buffer.concat(chunks).toString())); }
catch { resolve(null); }
});
req.on('error', reject);
});
}
function json(res, data, status = 200) {
res.writeHead(status, {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
});
res.end(JSON.stringify(data, null, 2));
}
function cors(res) {
res.writeHead(204, {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, DELETE, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
});
res.end();
}
// --- HTTP Server ---
const server = http.createServer(async (req, res) => {
const url = new URL(req.url, `http://localhost:${PORT}`);
const path = url.pathname;
if (req.method === 'OPTIONS') return cors(res);
// --- A2A: Agent Card ---
if (path === '/.well-known/agent-card.json' && req.method === 'GET') {
return json(res, agentCard);
}
// --- Health ---
if (path === '/health' && req.method === 'GET') {
return json(res, {
status: 'ok',
service: 'agent-swarm',
version: '1.0.0',
uptime: process.uptime(),
activeSwarms: [...swarms.values()].filter(s => s.state === 'running').length,
totalSwarms: swarms.size,
totalEvents: eventLog.length,
workerAgents: Object.keys(WORKER_AGENTS).length
});
}
// --- A2A: message/send (create and execute swarm) ---
if (path.match(/^\/tasks\/[^/]+\/send$/) && req.method === 'POST') {
const contextId = path.split('/')[2];
const body = await parseBody(req);
if (!body?.params?.message?.parts?.[0]?.text) {
return json(res, {
jsonrpc: '2.0',
id: body?.id || null,
error: { code: -32600, message: 'Invalid request: missing message text' }
}, 400);
}
const instruction = body.params.message.parts[0].text;
const subtaskPlans = planTask(instruction);
// Create swarm
const swarmId = randomUUID();
const swarm = {
id: swarmId,
contextId,
instruction,
state: 'planned',
createdAt: new Date().toISOString(),
subtasks: subtaskPlans.map((plan, i) => ({
id: `${swarmId}-sub-${i}`,
...plan,
state: 'pending',
result: null
}))
};
swarms.set(swarmId, swarm);
logEvent('coordinator', 'swarm-created', {
swarmId,
instruction: instruction.substring(0, 200),
subtaskCount: subtaskPlans.length,
subtaskTypes: subtaskPlans.map(s => s.type)
});
// Execute asynchronously
executeSwarm(swarmId).catch(err => {
logEvent('coordinator', 'swarm-error', { swarmId, error: err.message });
});
// Return immediately with task status
return json(res, {
jsonrpc: '2.0',
id: body.id || null,
result: {
id: swarmId,
contextId,
status: {
state: 'working',
message: {
role: 'agent',
parts: [{
type: 'text',
text: `Swarm created with ${subtaskPlans.length} subtask(s): ${subtaskPlans.map(s => s.description).join(', ')}`
}],
metadata: {
'swarm.id': swarmId,
'swarm.subtaskCount': subtaskPlans.length,
'swarm.subtasks': subtaskPlans.map(s => ({
type: s.type,
agent: s.agent,
description: s.description
}))
}
},
timestamp: new Date().toISOString()
}
}
});
}
// --- Get swarm status ---
if (path.match(/^\/swarms\/[^/]+$/) && req.method === 'GET') {
const swarmId = path.split('/')[2];
const swarm = swarms.get(swarmId);
if (!swarm) return json(res, { error: 'Swarm not found' }, 404);
return json(res, swarm);
}
// --- List all swarms ---
if (path === '/swarms' && req.method === 'GET') {
const list = [...swarms.values()].map(s => ({
id: s.id,
state: s.state,
instruction: s.instruction.substring(0, 100),
subtaskCount: s.subtasks.length,
createdAt: s.createdAt,
completedAt: s.completedAt
}));
return json(res, { swarms: list, total: list.length });
}
// --- Discover worker agents ---
if (path === '/agents' && req.method === 'GET') {
return json(res, {
coordinator: {
name: agentCard.name,
version: agentCard.version,
skills: agentCard.skills.map(s => s.id)
},
workers: Object.entries(WORKER_AGENTS).map(([id, w]) => ({
id,
name: w.name,
skills: w.skills,
costPerTask: w.costPerTask,
a2aUrl: w.a2aUrl
}))
});
}
// --- Event log ---
if (path === '/events' && req.method === 'GET') {
const last = parseInt(url.searchParams.get('last') || '50');
return json(res, {
events: eventLog.slice(-last),
total: eventLog.length
});
}
// --- Dashboard ---
if (path === '/' && req.method === 'GET') {
try {
const html = await readFile(join(__dirname, 'dashboard.html'), 'utf8');
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end(html);
} catch {
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end('<html><body><h1>Agent Swarm</h1><p>Dashboard loading...</p></body></html>');
}
return;
}
// --- Favicon ---
if (path === '/favicon.ico') {
res.writeHead(204);
return res.end();
}
// --- 404 ---
json(res, { error: 'Not found' }, 404);
});
server.listen(PORT, () => {
console.log(`Agent Swarm coordinator running on port ${PORT}`);
console.log(`Agent card: http://localhost:${PORT}/.well-known/agent-card.json`);
console.log(`Dashboard: http://localhost:${PORT}/`);
console.log(`Workers: ${Object.keys(WORKER_AGENTS).length} registered`);
});
export { agentCard, WORKER_AGENTS, planTask };