import http from 'node:http'; import { readFile } from 'node:fs/promises'; import { fileURLToPath } from 'node:url'; import { dirname, join } from 'node:path'; import { randomUUID } from 'node:crypto'; const __dirname = dirname(fileURLToPath(import.meta.url)); const PORT = parseInt(process.env.PORT || '4003'); // --- Agent Registry: Known worker agents --- const WORKER_AGENTS = { 'screenshot': { name: 'SnapAPI Screenshot Agent', a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002', skills: ['screenshot', 'webpage-capture'], costPerTask: '$0.01' }, 'pdf': { name: 'SnapAPI PDF Agent', a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002', skills: ['markdown-to-pdf', 'document-generation'], costPerTask: '$0.005' }, 'html': { name: 'SnapAPI HTML Agent', a2aUrl: process.env.SNAPAPI_A2A_URL || 'http://localhost:4002', skills: ['markdown-to-html'], costPerTask: 'free' } }; // --- State --- const swarms = new Map(); // swarmId -> Swarm const tasks = new Map(); // taskId -> Task const eventLog = []; // append-only event log // --- Agent Card (A2A v0.3 compliant) --- const agentCard = { name: 'OpSpawn Agent Swarm', description: 'Multi-agent task decomposition and delegation system. Accepts complex tasks, breaks them into subtasks, delegates to specialized worker agents via A2A, and assembles results.', version: '1.0.0', url: process.env.PUBLIC_URL || `http://localhost:${PORT}`, provider: { organization: 'OpSpawn', url: 'https://opspawn.com' }, capabilities: { streaming: false, pushNotifications: false, extensions: [ { uri: 'urn:x402:payment:v2', description: 'x402 V2 micropayments for inter-agent commerce', required: false }, { uri: 'urn:opspawn:swarm', description: 'Multi-agent task decomposition and delegation', required: false } ] }, skills: [ { id: 'decompose-and-execute', name: 'Task Decomposition & Execution', description: 'Takes a complex task, decomposes it into subtasks, delegates to specialized worker agents, and assembles the results.', tags: ['orchestration', 'multi-agent', 'decomposition'], examples: [ 'Create a visual report about opspawn.com including a screenshot and summary', 'Generate a PDF from this markdown with a screenshot of the rendered page', 'Capture screenshots of these 3 URLs and compile them into a report' ] }, { id: 'discover-agents', name: 'Agent Discovery', description: 'Discover available worker agents and their capabilities.', tags: ['discovery', 'registry'] }, { id: 'swarm-status', name: 'Swarm Status', description: 'Get the status of a running swarm task, including subtask progress.', tags: ['monitoring', 'status'] } ], defaultInputModes: ['text/plain'], defaultOutputModes: ['application/json', 'text/plain'] }; // --- Task Planner: Decomposes complex tasks into subtasks --- function planTask(instruction) { const lower = instruction.toLowerCase(); const subtasks = []; // URL detection const urlMatch = instruction.match(/https?:\/\/[^\s"'<>]+/g); // Screenshot requests if (lower.includes('screenshot') || lower.includes('capture') || lower.includes('visual')) { const urls = urlMatch || ['https://opspawn.com']; for (const url of urls) { subtasks.push({ type: 'screenshot', agent: 'screenshot', params: { url }, description: `Capture screenshot of ${url}` }); } } // PDF generation if (lower.includes('pdf') || lower.includes('report') || lower.includes('document')) { subtasks.push({ type: 'pdf', agent: 'pdf', params: { markdown: instruction }, description: 'Generate PDF report from content' }); } // HTML rendering if (lower.includes('html') || lower.includes('render') || lower.includes('preview')) { subtasks.push({ type: 'html', agent: 'html', params: { markdown: instruction }, description: 'Render content as HTML' }); } // Compound tasks: if "report" is mentioned, include both screenshot + PDF if (lower.includes('report') && !subtasks.some(s => s.type === 'screenshot')) { const url = urlMatch?.[0] || 'https://opspawn.com'; subtasks.unshift({ type: 'screenshot', agent: 'screenshot', params: { url }, description: `Capture screenshot for report: ${url}` }); } // Default: if no subtasks identified, treat as HTML rendering if (subtasks.length === 0) { subtasks.push({ type: 'html', agent: 'html', params: { markdown: instruction }, description: 'Process text as HTML content' }); } return subtasks; } // --- Swarm Execution Engine --- async function executeSwarm(swarmId) { const swarm = swarms.get(swarmId); if (!swarm) return; swarm.state = 'running'; swarm.startedAt = new Date().toISOString(); logEvent('coordinator', 'swarm-started', { swarmId, subtaskCount: swarm.subtasks.length }); const results = []; for (const subtask of swarm.subtasks) { subtask.state = 'running'; subtask.startedAt = new Date().toISOString(); logEvent('coordinator', 'subtask-started', { swarmId, subtaskId: subtask.id, type: subtask.type }); try { const result = await delegateToWorker(subtask); subtask.state = 'completed'; subtask.result = result; subtask.completedAt = new Date().toISOString(); results.push({ subtaskId: subtask.id, type: subtask.type, result }); logEvent('coordinator', 'subtask-completed', { swarmId, subtaskId: subtask.id, type: subtask.type }); } catch (err) { subtask.state = 'failed'; subtask.error = err.message; subtask.completedAt = new Date().toISOString(); results.push({ subtaskId: subtask.id, type: subtask.type, error: err.message }); logEvent('coordinator', 'subtask-failed', { swarmId, subtaskId: subtask.id, error: err.message }); } } // Assemble results const completed = swarm.subtasks.filter(s => s.state === 'completed').length; const failed = swarm.subtasks.filter(s => s.state === 'failed').length; swarm.state = failed === swarm.subtasks.length ? 'failed' : 'completed'; swarm.completedAt = new Date().toISOString(); swarm.results = results; swarm.summary = `Swarm completed: ${completed}/${swarm.subtasks.length} subtasks succeeded, ${failed} failed.`; logEvent('coordinator', 'swarm-completed', { swarmId, completed, failed, total: swarm.subtasks.length }); return swarm; } // --- Worker Delegation via A2A --- async function delegateToWorker(subtask) { const worker = WORKER_AGENTS[subtask.agent]; if (!worker) throw new Error(`Unknown worker agent: ${subtask.agent}`); // Build message text based on subtask type let messageText; switch (subtask.type) { case 'screenshot': messageText = `Take a screenshot of ${subtask.params.url}`; break; case 'pdf': messageText = `Convert to PDF:\n\n${subtask.params.markdown}`; break; case 'html': messageText = `Convert to HTML:\n\n${subtask.params.markdown}`; break; default: messageText = subtask.params.markdown || subtask.description; } // A2A v0.3 JSON-RPC request — POST to root endpoint const a2aRequest = { jsonrpc: '2.0', id: randomUUID(), method: 'message/send', params: { message: { messageId: randomUUID(), role: 'user', kind: 'message', parts: [{ kind: 'text', text: messageText }] } } }; // Send to worker agent root endpoint (A2A uses POST /) const url = worker.a2aUrl; const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(a2aRequest) }); if (!response.ok) { const text = await response.text(); throw new Error(`Worker ${subtask.agent} returned ${response.status}: ${text}`); } const result = await response.json(); // Handle JSON-RPC error if (result.error) { throw new Error(`Worker ${subtask.agent} error: ${result.error.message}`); } const taskResult = result.result; const state = taskResult?.status?.state; const message = taskResult?.status?.message; // Check if payment is required (x402 flow) if (state === 'input-required') { // Look for x402 payment info in message parts or metadata const dataPart = message?.parts?.find(p => p.kind === 'data' && p.data?.['x402.payment.required']); const accepts = dataPart?.data?.['x402.accepts'] || taskResult?.metadata?.['x402.accepts']; if (accepts || dataPart) { return { state: 'payment-required', worker: worker.name, cost: worker.costPerTask, paymentInfo: accepts, skill: dataPart?.data?.skill || subtask.type, message: `Worker ${worker.name} requires payment of ${worker.costPerTask} to complete this task.` }; } } // Task completed successfully if (state === 'completed') { const parts = message?.parts || []; return { state: 'completed', worker: worker.name, output: parts.map(p => p.text || (p.data ? JSON.stringify(p.data) : '[binary]')).join('\n') }; } return { state: state || 'unknown', raw: result }; } // --- Event Logging --- function logEvent(agentId, action, data = {}) { const event = { timestamp: new Date().toISOString(), agentId, action, ...data }; eventLog.push(event); // Keep last 1000 events if (eventLog.length > 1000) eventLog.shift(); } // --- HTTP Request Helpers --- function parseBody(req) { return new Promise((resolve, reject) => { const chunks = []; req.on('data', c => chunks.push(c)); req.on('end', () => { try { resolve(JSON.parse(Buffer.concat(chunks).toString())); } catch { resolve(null); } }); req.on('error', reject); }); } function json(res, data, status = 200) { res.writeHead(status, { 'Content-Type': 'application/json', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', 'Access-Control-Allow-Headers': 'Content-Type, Authorization' }); res.end(JSON.stringify(data, null, 2)); } function cors(res) { res.writeHead(204, { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'GET, POST, DELETE, OPTIONS', 'Access-Control-Allow-Headers': 'Content-Type, Authorization' }); res.end(); } // --- HTTP Server --- const server = http.createServer(async (req, res) => { const url = new URL(req.url, `http://localhost:${PORT}`); const path = url.pathname; if (req.method === 'OPTIONS') return cors(res); // --- A2A: Agent Card --- if (path === '/.well-known/agent-card.json' && req.method === 'GET') { return json(res, agentCard); } // --- Health --- if (path === '/health' && req.method === 'GET') { return json(res, { status: 'ok', service: 'agent-swarm', version: '1.0.0', uptime: process.uptime(), activeSwarms: [...swarms.values()].filter(s => s.state === 'running').length, totalSwarms: swarms.size, totalEvents: eventLog.length, workerAgents: Object.keys(WORKER_AGENTS).length }); } // --- A2A: message/send (create and execute swarm) --- if (path.match(/^\/tasks\/[^/]+\/send$/) && req.method === 'POST') { const contextId = path.split('/')[2]; const body = await parseBody(req); if (!body?.params?.message?.parts?.[0]?.text) { return json(res, { jsonrpc: '2.0', id: body?.id || null, error: { code: -32600, message: 'Invalid request: missing message text' } }, 400); } const instruction = body.params.message.parts[0].text; const subtaskPlans = planTask(instruction); // Create swarm const swarmId = randomUUID(); const swarm = { id: swarmId, contextId, instruction, state: 'planned', createdAt: new Date().toISOString(), subtasks: subtaskPlans.map((plan, i) => ({ id: `${swarmId}-sub-${i}`, ...plan, state: 'pending', result: null })) }; swarms.set(swarmId, swarm); logEvent('coordinator', 'swarm-created', { swarmId, instruction: instruction.substring(0, 200), subtaskCount: subtaskPlans.length, subtaskTypes: subtaskPlans.map(s => s.type) }); // Execute asynchronously executeSwarm(swarmId).catch(err => { logEvent('coordinator', 'swarm-error', { swarmId, error: err.message }); }); // Return immediately with task status return json(res, { jsonrpc: '2.0', id: body.id || null, result: { id: swarmId, contextId, status: { state: 'working', message: { role: 'agent', parts: [{ type: 'text', text: `Swarm created with ${subtaskPlans.length} subtask(s): ${subtaskPlans.map(s => s.description).join(', ')}` }], metadata: { 'swarm.id': swarmId, 'swarm.subtaskCount': subtaskPlans.length, 'swarm.subtasks': subtaskPlans.map(s => ({ type: s.type, agent: s.agent, description: s.description })) } }, timestamp: new Date().toISOString() } } }); } // --- Get swarm status --- if (path.match(/^\/swarms\/[^/]+$/) && req.method === 'GET') { const swarmId = path.split('/')[2]; const swarm = swarms.get(swarmId); if (!swarm) return json(res, { error: 'Swarm not found' }, 404); return json(res, swarm); } // --- List all swarms --- if (path === '/swarms' && req.method === 'GET') { const list = [...swarms.values()].map(s => ({ id: s.id, state: s.state, instruction: s.instruction.substring(0, 100), subtaskCount: s.subtasks.length, createdAt: s.createdAt, completedAt: s.completedAt })); return json(res, { swarms: list, total: list.length }); } // --- Discover worker agents --- if (path === '/agents' && req.method === 'GET') { return json(res, { coordinator: { name: agentCard.name, version: agentCard.version, skills: agentCard.skills.map(s => s.id) }, workers: Object.entries(WORKER_AGENTS).map(([id, w]) => ({ id, name: w.name, skills: w.skills, costPerTask: w.costPerTask, a2aUrl: w.a2aUrl })) }); } // --- Event log --- if (path === '/events' && req.method === 'GET') { const last = parseInt(url.searchParams.get('last') || '50'); return json(res, { events: eventLog.slice(-last), total: eventLog.length }); } // --- Dashboard --- if (path === '/' && req.method === 'GET') { try { const html = await readFile(join(__dirname, 'dashboard.html'), 'utf8'); res.writeHead(200, { 'Content-Type': 'text/html' }); res.end(html); } catch { res.writeHead(200, { 'Content-Type': 'text/html' }); res.end('
Dashboard loading...
'); } return; } // --- Favicon --- if (path === '/favicon.ico') { res.writeHead(204); return res.end(); } // --- 404 --- json(res, { error: 'Not found' }, 404); }); server.listen(PORT, () => { console.log(`Agent Swarm coordinator running on port ${PORT}`); console.log(`Agent card: http://localhost:${PORT}/.well-known/agent-card.json`); console.log(`Dashboard: http://localhost:${PORT}/`); console.log(`Workers: ${Object.keys(WORKER_AGENTS).length} registered`); }); export { agentCard, WORKER_AGENTS, planTask };