- Coordinator agent accepts complex tasks and decomposes into subtasks - Delegates to specialized worker agents via A2A v0.3 JSON-RPC - Handles x402 payment flows (paid skills return payment-required) - Free skills execute end-to-end through delegation chain - Web dashboard with real-time swarm monitoring - 14 tests passing - Worker agents: screenshot, PDF, HTML (via A2A gateway) - Event log tracks all delegations and completions
517 lines
16 KiB
JavaScript
517 lines
16 KiB
JavaScript
import http from 'node:http';
|
|
import { readFile } from 'node:fs/promises';
|
|
import { fileURLToPath } from 'node:url';
|
|
import { dirname, join } from 'node:path';
|
|
import { randomUUID } from 'node:crypto';
|
|
|
|
const __dirname = dirname(fileURLToPath(import.meta.url));
|
|
const PORT = parseInt(process.env.PORT || '4003');
|
|
|
|
// --- Agent Registry: Known worker agents ---
|
|
const WORKER_AGENTS = {
|
|
'screenshot': {
|
|
name: 'SnapAPI Screenshot Agent',
|
|
a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002',
|
|
skills: ['screenshot', 'webpage-capture'],
|
|
costPerTask: '$0.01'
|
|
},
|
|
'pdf': {
|
|
name: 'SnapAPI PDF Agent',
|
|
a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002',
|
|
skills: ['markdown-to-pdf', 'document-generation'],
|
|
costPerTask: '$0.005'
|
|
},
|
|
'html': {
|
|
name: 'SnapAPI HTML Agent',
|
|
a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002',
|
|
skills: ['markdown-to-html'],
|
|
costPerTask: 'free'
|
|
}
|
|
};
|
|
|
|
// --- State ---
|
|
const swarms = new Map(); // swarmId -> Swarm
|
|
const tasks = new Map(); // taskId -> Task
|
|
const eventLog = []; // append-only event log
|
|
|
|
// --- Agent Card (A2A v0.3 compliant) ---
|
|
const agentCard = {
|
|
name: 'OpSpawn Agent Swarm',
|
|
description: 'Multi-agent task decomposition and delegation system. Accepts complex tasks, breaks them into subtasks, delegates to specialized worker agents via A2A, and assembles results.',
|
|
version: '1.0.0',
|
|
url: process.env.PUBLIC_URL || `http://localhost:${PORT}`,
|
|
provider: {
|
|
organization: 'OpSpawn',
|
|
url: 'https://opspawn.com'
|
|
},
|
|
capabilities: {
|
|
streaming: false,
|
|
pushNotifications: false,
|
|
extensions: [
|
|
{
|
|
uri: 'urn:x402:payment:v2',
|
|
description: 'x402 V2 micropayments for inter-agent commerce',
|
|
required: false
|
|
},
|
|
{
|
|
uri: 'urn:opspawn:swarm',
|
|
description: 'Multi-agent task decomposition and delegation',
|
|
required: false
|
|
}
|
|
]
|
|
},
|
|
skills: [
|
|
{
|
|
id: 'decompose-and-execute',
|
|
name: 'Task Decomposition & Execution',
|
|
description: 'Takes a complex task, decomposes it into subtasks, delegates to specialized worker agents, and assembles the results.',
|
|
tags: ['orchestration', 'multi-agent', 'decomposition'],
|
|
examples: [
|
|
'Create a visual report about opspawn.com including a screenshot and summary',
|
|
'Generate a PDF from this markdown with a screenshot of the rendered page',
|
|
'Capture screenshots of these 3 URLs and compile them into a report'
|
|
]
|
|
},
|
|
{
|
|
id: 'discover-agents',
|
|
name: 'Agent Discovery',
|
|
description: 'Discover available worker agents and their capabilities.',
|
|
tags: ['discovery', 'registry']
|
|
},
|
|
{
|
|
id: 'swarm-status',
|
|
name: 'Swarm Status',
|
|
description: 'Get the status of a running swarm task, including subtask progress.',
|
|
tags: ['monitoring', 'status']
|
|
}
|
|
],
|
|
defaultInputModes: ['text/plain'],
|
|
defaultOutputModes: ['application/json', 'text/plain']
|
|
};
|
|
|
|
// --- Task Planner: Decomposes complex tasks into subtasks ---
|
|
function planTask(instruction) {
|
|
const lower = instruction.toLowerCase();
|
|
const subtasks = [];
|
|
|
|
// URL detection
|
|
const urlMatch = instruction.match(/https?:\/\/[^\s"'<>]+/g);
|
|
|
|
// Screenshot requests
|
|
if (lower.includes('screenshot') || lower.includes('capture') || lower.includes('visual')) {
|
|
const urls = urlMatch || ['https://opspawn.com'];
|
|
for (const url of urls) {
|
|
subtasks.push({
|
|
type: 'screenshot',
|
|
agent: 'screenshot',
|
|
params: { url },
|
|
description: `Capture screenshot of ${url}`
|
|
});
|
|
}
|
|
}
|
|
|
|
// PDF generation
|
|
if (lower.includes('pdf') || lower.includes('report') || lower.includes('document')) {
|
|
subtasks.push({
|
|
type: 'pdf',
|
|
agent: 'pdf',
|
|
params: { markdown: instruction },
|
|
description: 'Generate PDF report from content'
|
|
});
|
|
}
|
|
|
|
// HTML rendering
|
|
if (lower.includes('html') || lower.includes('render') || lower.includes('preview')) {
|
|
subtasks.push({
|
|
type: 'html',
|
|
agent: 'html',
|
|
params: { markdown: instruction },
|
|
description: 'Render content as HTML'
|
|
});
|
|
}
|
|
|
|
// Compound tasks: if "report" is mentioned, include both screenshot + PDF
|
|
if (lower.includes('report') && !subtasks.some(s => s.type === 'screenshot')) {
|
|
const url = urlMatch?.[0] || 'https://opspawn.com';
|
|
subtasks.unshift({
|
|
type: 'screenshot',
|
|
agent: 'screenshot',
|
|
params: { url },
|
|
description: `Capture screenshot for report: ${url}`
|
|
});
|
|
}
|
|
|
|
// Default: if no subtasks identified, treat as HTML rendering
|
|
if (subtasks.length === 0) {
|
|
subtasks.push({
|
|
type: 'html',
|
|
agent: 'html',
|
|
params: { markdown: instruction },
|
|
description: 'Process text as HTML content'
|
|
});
|
|
}
|
|
|
|
return subtasks;
|
|
}
|
|
|
|
// --- Swarm Execution Engine ---
|
|
async function executeSwarm(swarmId) {
|
|
const swarm = swarms.get(swarmId);
|
|
if (!swarm) return;
|
|
|
|
swarm.state = 'running';
|
|
swarm.startedAt = new Date().toISOString();
|
|
logEvent('coordinator', 'swarm-started', { swarmId, subtaskCount: swarm.subtasks.length });
|
|
|
|
const results = [];
|
|
for (const subtask of swarm.subtasks) {
|
|
subtask.state = 'running';
|
|
subtask.startedAt = new Date().toISOString();
|
|
logEvent('coordinator', 'subtask-started', { swarmId, subtaskId: subtask.id, type: subtask.type });
|
|
|
|
try {
|
|
const result = await delegateToWorker(subtask);
|
|
subtask.state = 'completed';
|
|
subtask.result = result;
|
|
subtask.completedAt = new Date().toISOString();
|
|
results.push({ subtaskId: subtask.id, type: subtask.type, result });
|
|
logEvent('coordinator', 'subtask-completed', { swarmId, subtaskId: subtask.id, type: subtask.type });
|
|
} catch (err) {
|
|
subtask.state = 'failed';
|
|
subtask.error = err.message;
|
|
subtask.completedAt = new Date().toISOString();
|
|
results.push({ subtaskId: subtask.id, type: subtask.type, error: err.message });
|
|
logEvent('coordinator', 'subtask-failed', { swarmId, subtaskId: subtask.id, error: err.message });
|
|
}
|
|
}
|
|
|
|
// Assemble results
|
|
const completed = swarm.subtasks.filter(s => s.state === 'completed').length;
|
|
const failed = swarm.subtasks.filter(s => s.state === 'failed').length;
|
|
|
|
swarm.state = failed === swarm.subtasks.length ? 'failed' : 'completed';
|
|
swarm.completedAt = new Date().toISOString();
|
|
swarm.results = results;
|
|
swarm.summary = `Swarm completed: ${completed}/${swarm.subtasks.length} subtasks succeeded, ${failed} failed.`;
|
|
|
|
logEvent('coordinator', 'swarm-completed', { swarmId, completed, failed, total: swarm.subtasks.length });
|
|
return swarm;
|
|
}
|
|
|
|
// --- Worker Delegation via A2A ---
|
|
async function delegateToWorker(subtask) {
|
|
const worker = WORKER_AGENTS[subtask.agent];
|
|
if (!worker) throw new Error(`Unknown worker agent: ${subtask.agent}`);
|
|
|
|
// Build message text based on subtask type
|
|
let messageText;
|
|
switch (subtask.type) {
|
|
case 'screenshot':
|
|
messageText = `Take a screenshot of ${subtask.params.url}`;
|
|
break;
|
|
case 'pdf':
|
|
messageText = `Convert to PDF:\n\n${subtask.params.markdown}`;
|
|
break;
|
|
case 'html':
|
|
messageText = `Convert to HTML:\n\n${subtask.params.markdown}`;
|
|
break;
|
|
default:
|
|
messageText = subtask.params.markdown || subtask.description;
|
|
}
|
|
|
|
// A2A v0.3 JSON-RPC request — POST to root endpoint
|
|
const a2aRequest = {
|
|
jsonrpc: '2.0',
|
|
id: randomUUID(),
|
|
method: 'message/send',
|
|
params: {
|
|
message: {
|
|
messageId: randomUUID(),
|
|
role: 'user',
|
|
kind: 'message',
|
|
parts: [{ kind: 'text', text: messageText }]
|
|
}
|
|
}
|
|
};
|
|
|
|
// Send to worker agent root endpoint (A2A uses POST /)
|
|
const url = worker.a2aUrl;
|
|
const response = await fetch(url, {
|
|
method: 'POST',
|
|
headers: { 'Content-Type': 'application/json' },
|
|
body: JSON.stringify(a2aRequest)
|
|
});
|
|
|
|
if (!response.ok) {
|
|
const text = await response.text();
|
|
throw new Error(`Worker ${subtask.agent} returned ${response.status}: ${text}`);
|
|
}
|
|
|
|
const result = await response.json();
|
|
|
|
// Handle JSON-RPC error
|
|
if (result.error) {
|
|
throw new Error(`Worker ${subtask.agent} error: ${result.error.message}`);
|
|
}
|
|
|
|
const taskResult = result.result;
|
|
const state = taskResult?.status?.state;
|
|
const message = taskResult?.status?.message;
|
|
|
|
// Check if payment is required (x402 flow)
|
|
if (state === 'input-required') {
|
|
// Look for x402 payment info in message parts or metadata
|
|
const dataPart = message?.parts?.find(p => p.kind === 'data' && p.data?.['x402.payment.required']);
|
|
const accepts = dataPart?.data?.['x402.accepts'] || taskResult?.metadata?.['x402.accepts'];
|
|
if (accepts || dataPart) {
|
|
return {
|
|
state: 'payment-required',
|
|
worker: worker.name,
|
|
cost: worker.costPerTask,
|
|
paymentInfo: accepts,
|
|
skill: dataPart?.data?.skill || subtask.type,
|
|
message: `Worker ${worker.name} requires payment of ${worker.costPerTask} to complete this task.`
|
|
};
|
|
}
|
|
}
|
|
|
|
// Task completed successfully
|
|
if (state === 'completed') {
|
|
const parts = message?.parts || [];
|
|
return {
|
|
state: 'completed',
|
|
worker: worker.name,
|
|
output: parts.map(p => p.text || (p.data ? JSON.stringify(p.data) : '[binary]')).join('\n')
|
|
};
|
|
}
|
|
|
|
return { state: state || 'unknown', raw: result };
|
|
}
|
|
|
|
// --- Event Logging ---
|
|
function logEvent(agentId, action, data = {}) {
|
|
const event = {
|
|
timestamp: new Date().toISOString(),
|
|
agentId,
|
|
action,
|
|
...data
|
|
};
|
|
eventLog.push(event);
|
|
// Keep last 1000 events
|
|
if (eventLog.length > 1000) eventLog.shift();
|
|
}
|
|
|
|
// --- HTTP Request Helpers ---
|
|
function parseBody(req) {
|
|
return new Promise((resolve, reject) => {
|
|
const chunks = [];
|
|
req.on('data', c => chunks.push(c));
|
|
req.on('end', () => {
|
|
try { resolve(JSON.parse(Buffer.concat(chunks).toString())); }
|
|
catch { resolve(null); }
|
|
});
|
|
req.on('error', reject);
|
|
});
|
|
}
|
|
|
|
function json(res, data, status = 200) {
|
|
res.writeHead(status, {
|
|
'Content-Type': 'application/json',
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
|
|
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
|
|
});
|
|
res.end(JSON.stringify(data, null, 2));
|
|
}
|
|
|
|
function cors(res) {
|
|
res.writeHead(204, {
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Methods': 'GET, POST, DELETE, OPTIONS',
|
|
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
|
|
});
|
|
res.end();
|
|
}
|
|
|
|
// --- HTTP Server ---
|
|
const server = http.createServer(async (req, res) => {
|
|
const url = new URL(req.url, `http://localhost:${PORT}`);
|
|
const path = url.pathname;
|
|
|
|
if (req.method === 'OPTIONS') return cors(res);
|
|
|
|
// --- A2A: Agent Card ---
|
|
if (path === '/.well-known/agent-card.json' && req.method === 'GET') {
|
|
return json(res, agentCard);
|
|
}
|
|
|
|
// --- Health ---
|
|
if (path === '/health' && req.method === 'GET') {
|
|
return json(res, {
|
|
status: 'ok',
|
|
service: 'agent-swarm',
|
|
version: '1.0.0',
|
|
uptime: process.uptime(),
|
|
activeSwarms: [...swarms.values()].filter(s => s.state === 'running').length,
|
|
totalSwarms: swarms.size,
|
|
totalEvents: eventLog.length,
|
|
workerAgents: Object.keys(WORKER_AGENTS).length
|
|
});
|
|
}
|
|
|
|
// --- A2A: message/send (create and execute swarm) ---
|
|
if (path.match(/^\/tasks\/[^/]+\/send$/) && req.method === 'POST') {
|
|
const contextId = path.split('/')[2];
|
|
const body = await parseBody(req);
|
|
|
|
if (!body?.params?.message?.parts?.[0]?.text) {
|
|
return json(res, {
|
|
jsonrpc: '2.0',
|
|
id: body?.id || null,
|
|
error: { code: -32600, message: 'Invalid request: missing message text' }
|
|
}, 400);
|
|
}
|
|
|
|
const instruction = body.params.message.parts[0].text;
|
|
const subtaskPlans = planTask(instruction);
|
|
|
|
// Create swarm
|
|
const swarmId = randomUUID();
|
|
const swarm = {
|
|
id: swarmId,
|
|
contextId,
|
|
instruction,
|
|
state: 'planned',
|
|
createdAt: new Date().toISOString(),
|
|
subtasks: subtaskPlans.map((plan, i) => ({
|
|
id: `${swarmId}-sub-${i}`,
|
|
...plan,
|
|
state: 'pending',
|
|
result: null
|
|
}))
|
|
};
|
|
swarms.set(swarmId, swarm);
|
|
|
|
logEvent('coordinator', 'swarm-created', {
|
|
swarmId,
|
|
instruction: instruction.substring(0, 200),
|
|
subtaskCount: subtaskPlans.length,
|
|
subtaskTypes: subtaskPlans.map(s => s.type)
|
|
});
|
|
|
|
// Execute asynchronously
|
|
executeSwarm(swarmId).catch(err => {
|
|
logEvent('coordinator', 'swarm-error', { swarmId, error: err.message });
|
|
});
|
|
|
|
// Return immediately with task status
|
|
return json(res, {
|
|
jsonrpc: '2.0',
|
|
id: body.id || null,
|
|
result: {
|
|
id: swarmId,
|
|
contextId,
|
|
status: {
|
|
state: 'working',
|
|
message: {
|
|
role: 'agent',
|
|
parts: [{
|
|
type: 'text',
|
|
text: `Swarm created with ${subtaskPlans.length} subtask(s): ${subtaskPlans.map(s => s.description).join(', ')}`
|
|
}],
|
|
metadata: {
|
|
'swarm.id': swarmId,
|
|
'swarm.subtaskCount': subtaskPlans.length,
|
|
'swarm.subtasks': subtaskPlans.map(s => ({
|
|
type: s.type,
|
|
agent: s.agent,
|
|
description: s.description
|
|
}))
|
|
}
|
|
},
|
|
timestamp: new Date().toISOString()
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// --- Get swarm status ---
|
|
if (path.match(/^\/swarms\/[^/]+$/) && req.method === 'GET') {
|
|
const swarmId = path.split('/')[2];
|
|
const swarm = swarms.get(swarmId);
|
|
if (!swarm) return json(res, { error: 'Swarm not found' }, 404);
|
|
return json(res, swarm);
|
|
}
|
|
|
|
// --- List all swarms ---
|
|
if (path === '/swarms' && req.method === 'GET') {
|
|
const list = [...swarms.values()].map(s => ({
|
|
id: s.id,
|
|
state: s.state,
|
|
instruction: s.instruction.substring(0, 100),
|
|
subtaskCount: s.subtasks.length,
|
|
createdAt: s.createdAt,
|
|
completedAt: s.completedAt
|
|
}));
|
|
return json(res, { swarms: list, total: list.length });
|
|
}
|
|
|
|
// --- Discover worker agents ---
|
|
if (path === '/agents' && req.method === 'GET') {
|
|
return json(res, {
|
|
coordinator: {
|
|
name: agentCard.name,
|
|
version: agentCard.version,
|
|
skills: agentCard.skills.map(s => s.id)
|
|
},
|
|
workers: Object.entries(WORKER_AGENTS).map(([id, w]) => ({
|
|
id,
|
|
name: w.name,
|
|
skills: w.skills,
|
|
costPerTask: w.costPerTask,
|
|
a2aUrl: w.a2aUrl
|
|
}))
|
|
});
|
|
}
|
|
|
|
// --- Event log ---
|
|
if (path === '/events' && req.method === 'GET') {
|
|
const last = parseInt(url.searchParams.get('last') || '50');
|
|
return json(res, {
|
|
events: eventLog.slice(-last),
|
|
total: eventLog.length
|
|
});
|
|
}
|
|
|
|
// --- Dashboard ---
|
|
if (path === '/' && req.method === 'GET') {
|
|
try {
|
|
const html = await readFile(join(__dirname, 'dashboard.html'), 'utf8');
|
|
res.writeHead(200, { 'Content-Type': 'text/html' });
|
|
res.end(html);
|
|
} catch {
|
|
res.writeHead(200, { 'Content-Type': 'text/html' });
|
|
res.end('<html><body><h1>Agent Swarm</h1><p>Dashboard loading...</p></body></html>');
|
|
}
|
|
return;
|
|
}
|
|
|
|
// --- Favicon ---
|
|
if (path === '/favicon.ico') {
|
|
res.writeHead(204);
|
|
return res.end();
|
|
}
|
|
|
|
// --- 404 ---
|
|
json(res, { error: 'Not found' }, 404);
|
|
});
|
|
|
|
server.listen(PORT, () => {
|
|
console.log(`Agent Swarm coordinator running on port ${PORT}`);
|
|
console.log(`Agent card: http://localhost:${PORT}/.well-known/agent-card.json`);
|
|
console.log(`Dashboard: http://localhost:${PORT}/`);
|
|
console.log(`Workers: ${Object.keys(WORKER_AGENTS).length} registered`);
|
|
});
|
|
|
|
export { agentCard, WORKER_AGENTS, planTask };
|