382 lines
10 KiB
JavaScript
382 lines
10 KiB
JavaScript
/**
|
|
* OpSpawn Orchestrator
|
|
*
|
|
* Lightweight agent coordination system. Manages shared state, task assignment,
|
|
* event logging, and resource locking across multiple sub-agents.
|
|
*
|
|
* Usage:
|
|
* const orc = require('./orchestrator');
|
|
*
|
|
* // Create a workstream
|
|
* orc.createWorkstream('bounty', { description: 'Bounty hunting', priority: 1 });
|
|
*
|
|
* // Add tasks
|
|
* orc.addTask('bounty', { title: 'Research Archestra', estimate: '2h' });
|
|
*
|
|
* // Register agent and claim work
|
|
* orc.registerAgent('agent-1', { type: 'research' });
|
|
* orc.claimTask('bounty', taskId, 'agent-1');
|
|
*
|
|
* // Log events
|
|
* orc.logEvent('agent-1', 'completed', { task: taskId, result: '...' });
|
|
*
|
|
* // View status
|
|
* orc.status();
|
|
*/
|
|
|
|
const fs = require('fs');
|
|
const path = require('path');
|
|
const crypto = require('crypto');
|
|
|
|
// Data directory: ORCHESTRATOR_DIR env var, or .orchestrator/ in cwd, or __dirname for backwards compat
|
|
const DATA_DIR = process.env.ORCHESTRATOR_DIR || (
|
|
fs.existsSync(path.join(__dirname, 'state.json')) ? __dirname :
|
|
path.join(process.cwd(), '.orchestrator')
|
|
);
|
|
|
|
const STATE_PATH = path.join(DATA_DIR, 'state.json');
|
|
const EVENTS_PATH = path.join(DATA_DIR, 'events.jsonl');
|
|
const KNOWLEDGE_DIR = path.join(DATA_DIR, 'knowledge');
|
|
|
|
// Ensure data directories exist
|
|
if (!fs.existsSync(DATA_DIR)) {
|
|
fs.mkdirSync(DATA_DIR, { recursive: true });
|
|
}
|
|
if (!fs.existsSync(KNOWLEDGE_DIR)) {
|
|
fs.mkdirSync(KNOWLEDGE_DIR, { recursive: true });
|
|
}
|
|
|
|
// Initialize state file if it doesn't exist
|
|
if (!fs.existsSync(STATE_PATH)) {
|
|
fs.writeFileSync(STATE_PATH, JSON.stringify({
|
|
version: 0,
|
|
updated_at: new Date().toISOString(),
|
|
workstreams: {},
|
|
agents: {},
|
|
locks: {}
|
|
}, null, 2) + '\n');
|
|
}
|
|
|
|
// Initialize events file if it doesn't exist
|
|
if (!fs.existsSync(EVENTS_PATH)) {
|
|
fs.writeFileSync(EVENTS_PATH, '');
|
|
}
|
|
|
|
function loadState() {
|
|
return JSON.parse(fs.readFileSync(STATE_PATH, 'utf8'));
|
|
}
|
|
|
|
function saveState(state) {
|
|
state.updated_at = new Date().toISOString();
|
|
state.version++;
|
|
fs.writeFileSync(STATE_PATH, JSON.stringify(state, null, 2) + '\n');
|
|
}
|
|
|
|
function genId() {
|
|
return crypto.randomBytes(4).toString('hex');
|
|
}
|
|
|
|
// --- Event Log ---
|
|
|
|
function logEvent(agentId, action, data = {}) {
|
|
const event = {
|
|
ts: new Date().toISOString(),
|
|
agent: agentId,
|
|
action,
|
|
...data
|
|
};
|
|
fs.appendFileSync(EVENTS_PATH, JSON.stringify(event) + '\n');
|
|
return event;
|
|
}
|
|
|
|
function getEvents(opts = {}) {
|
|
const raw = fs.readFileSync(EVENTS_PATH, 'utf8').trim();
|
|
if (!raw) return [];
|
|
let events = raw.split('\n').map(line => JSON.parse(line));
|
|
|
|
if (opts.agent) events = events.filter(e => e.agent === opts.agent);
|
|
if (opts.action) events = events.filter(e => e.action === opts.action);
|
|
if (opts.since) {
|
|
const since = new Date(opts.since).getTime();
|
|
events = events.filter(e => new Date(e.ts).getTime() >= since);
|
|
}
|
|
if (opts.last) events = events.slice(-opts.last);
|
|
|
|
return events;
|
|
}
|
|
|
|
// --- Workstreams ---
|
|
|
|
function createWorkstream(name, opts = {}) {
|
|
const state = loadState();
|
|
if (state.workstreams[name]) {
|
|
throw new Error(`Workstream "${name}" already exists`);
|
|
}
|
|
state.workstreams[name] = {
|
|
description: opts.description || '',
|
|
priority: opts.priority || 5,
|
|
status: 'active',
|
|
tasks: [],
|
|
created_at: new Date().toISOString()
|
|
};
|
|
saveState(state);
|
|
logEvent('system', 'workstream_created', { workstream: name });
|
|
return state.workstreams[name];
|
|
}
|
|
|
|
function listWorkstreams() {
|
|
const state = loadState();
|
|
return Object.entries(state.workstreams).map(([name, ws]) => ({
|
|
name,
|
|
...ws,
|
|
task_count: ws.tasks.length,
|
|
pending: ws.tasks.filter(t => t.status === 'pending').length,
|
|
in_progress: ws.tasks.filter(t => t.status === 'in_progress').length,
|
|
done: ws.tasks.filter(t => t.status === 'done').length
|
|
})).sort((a, b) => a.priority - b.priority);
|
|
}
|
|
|
|
// --- Tasks ---
|
|
|
|
function addTask(workstream, opts) {
|
|
const state = loadState();
|
|
const ws = state.workstreams[workstream];
|
|
if (!ws) throw new Error(`Workstream "${workstream}" not found`);
|
|
|
|
const task = {
|
|
id: genId(),
|
|
title: opts.title,
|
|
description: opts.description || '',
|
|
status: 'pending',
|
|
priority: opts.priority || 5,
|
|
estimate: opts.estimate || null,
|
|
assigned_to: null,
|
|
created_at: new Date().toISOString(),
|
|
updated_at: new Date().toISOString(),
|
|
result: null
|
|
};
|
|
ws.tasks.push(task);
|
|
saveState(state);
|
|
logEvent('system', 'task_created', { workstream, task_id: task.id, title: task.title });
|
|
return task;
|
|
}
|
|
|
|
function claimTask(workstream, taskId, agentId) {
|
|
const state = loadState();
|
|
const ws = state.workstreams[workstream];
|
|
if (!ws) throw new Error(`Workstream "${workstream}" not found`);
|
|
|
|
const task = ws.tasks.find(t => t.id === taskId);
|
|
if (!task) throw new Error(`Task "${taskId}" not found`);
|
|
if (task.status !== 'pending') throw new Error(`Task "${taskId}" is ${task.status}, not pending`);
|
|
|
|
task.status = 'in_progress';
|
|
task.assigned_to = agentId;
|
|
task.updated_at = new Date().toISOString();
|
|
saveState(state);
|
|
logEvent(agentId, 'task_claimed', { workstream, task_id: taskId });
|
|
return task;
|
|
}
|
|
|
|
function completeTask(workstream, taskId, result = null) {
|
|
const state = loadState();
|
|
const ws = state.workstreams[workstream];
|
|
if (!ws) throw new Error(`Workstream "${workstream}" not found`);
|
|
|
|
const task = ws.tasks.find(t => t.id === taskId);
|
|
if (!task) throw new Error(`Task "${taskId}" not found`);
|
|
|
|
task.status = 'done';
|
|
task.result = result;
|
|
task.updated_at = new Date().toISOString();
|
|
saveState(state);
|
|
logEvent(task.assigned_to || 'system', 'task_completed', { workstream, task_id: taskId, result });
|
|
return task;
|
|
}
|
|
|
|
function getNextTask(workstream, agentId) {
|
|
const state = loadState();
|
|
const ws = state.workstreams[workstream];
|
|
if (!ws) throw new Error(`Workstream "${workstream}" not found`);
|
|
|
|
const pending = ws.tasks
|
|
.filter(t => t.status === 'pending')
|
|
.sort((a, b) => a.priority - b.priority);
|
|
|
|
if (pending.length === 0) return null;
|
|
return claimTask(workstream, pending[0].id, agentId);
|
|
}
|
|
|
|
// --- Agents ---
|
|
|
|
function registerAgent(agentId, opts = {}) {
|
|
const state = loadState();
|
|
state.agents[agentId] = {
|
|
type: opts.type || 'general',
|
|
status: 'active',
|
|
capabilities: opts.capabilities || [],
|
|
registered_at: new Date().toISOString(),
|
|
last_seen: new Date().toISOString()
|
|
};
|
|
saveState(state);
|
|
logEvent(agentId, 'agent_registered', { type: opts.type });
|
|
return state.agents[agentId];
|
|
}
|
|
|
|
function heartbeat(agentId) {
|
|
const state = loadState();
|
|
if (state.agents[agentId]) {
|
|
state.agents[agentId].last_seen = new Date().toISOString();
|
|
state.agents[agentId].status = 'active';
|
|
saveState(state);
|
|
}
|
|
}
|
|
|
|
// --- Locks ---
|
|
|
|
function acquireLock(resource, agentId, ttlMs = 60000) {
|
|
const state = loadState();
|
|
const existing = state.locks[resource];
|
|
|
|
if (existing) {
|
|
const expiresAt = new Date(existing.acquired_at).getTime() + existing.ttl_ms;
|
|
if (Date.now() < expiresAt && existing.agent !== agentId) {
|
|
return false; // Lock held by another agent
|
|
}
|
|
}
|
|
|
|
state.locks[resource] = {
|
|
agent: agentId,
|
|
acquired_at: new Date().toISOString(),
|
|
ttl_ms: ttlMs
|
|
};
|
|
saveState(state);
|
|
logEvent(agentId, 'lock_acquired', { resource });
|
|
return true;
|
|
}
|
|
|
|
function releaseLock(resource, agentId) {
|
|
const state = loadState();
|
|
if (state.locks[resource] && state.locks[resource].agent === agentId) {
|
|
delete state.locks[resource];
|
|
saveState(state);
|
|
logEvent(agentId, 'lock_released', { resource });
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
// --- Knowledge Base ---
|
|
|
|
function writeKnowledge(topic, content, agentId = 'system') {
|
|
const filePath = path.join(KNOWLEDGE_DIR, `${topic}.md`);
|
|
const header = `<!-- Updated by ${agentId} at ${new Date().toISOString()} -->\n`;
|
|
fs.writeFileSync(filePath, header + content);
|
|
logEvent(agentId, 'knowledge_written', { topic });
|
|
}
|
|
|
|
function readKnowledge(topic) {
|
|
const filePath = path.join(KNOWLEDGE_DIR, `${topic}.md`);
|
|
if (!fs.existsSync(filePath)) return null;
|
|
return fs.readFileSync(filePath, 'utf8');
|
|
}
|
|
|
|
function listKnowledge() {
|
|
return fs.readdirSync(KNOWLEDGE_DIR)
|
|
.filter(f => f.endsWith('.md'))
|
|
.map(f => f.replace('.md', ''));
|
|
}
|
|
|
|
// --- Status Dashboard ---
|
|
|
|
function status() {
|
|
const state = loadState();
|
|
const workstreams = listWorkstreams();
|
|
const recentEvents = getEvents({ last: 10 });
|
|
|
|
const agents = Object.entries(state.agents).map(([id, a]) => ({
|
|
id,
|
|
...a,
|
|
stale: (Date.now() - new Date(a.last_seen).getTime()) > 300000 // 5 min
|
|
}));
|
|
|
|
const activeLocks = Object.entries(state.locks)
|
|
.filter(([, lock]) => {
|
|
const expiresAt = new Date(lock.acquired_at).getTime() + lock.ttl_ms;
|
|
return Date.now() < expiresAt;
|
|
})
|
|
.map(([resource, lock]) => ({ resource, ...lock }));
|
|
|
|
return {
|
|
version: state.version,
|
|
updated_at: state.updated_at,
|
|
workstreams,
|
|
agents,
|
|
active_locks: activeLocks,
|
|
recent_events: recentEvents,
|
|
knowledge_topics: listKnowledge()
|
|
};
|
|
}
|
|
|
|
function statusText() {
|
|
const s = status();
|
|
const lines = [];
|
|
|
|
lines.push('=== OpSpawn Orchestrator Status ===');
|
|
lines.push(`State version: ${s.version} | Updated: ${s.updated_at}`);
|
|
lines.push('');
|
|
|
|
lines.push('--- Workstreams ---');
|
|
for (const ws of s.workstreams) {
|
|
lines.push(`[P${ws.priority}] ${ws.name}: ${ws.pending} pending, ${ws.in_progress} active, ${ws.done} done`);
|
|
}
|
|
lines.push('');
|
|
|
|
lines.push('--- Agents ---');
|
|
for (const a of s.agents) {
|
|
lines.push(`${a.id} (${a.type}): ${a.status}${a.stale ? ' [STALE]' : ''}`);
|
|
}
|
|
if (s.agents.length === 0) lines.push('(none registered)');
|
|
lines.push('');
|
|
|
|
lines.push('--- Active Locks ---');
|
|
for (const lock of s.active_locks) {
|
|
lines.push(`${lock.resource}: held by ${lock.agent}`);
|
|
}
|
|
if (s.active_locks.length === 0) lines.push('(none)');
|
|
lines.push('');
|
|
|
|
lines.push('--- Recent Events ---');
|
|
for (const e of s.recent_events) {
|
|
const time = e.ts.split('T')[1].split('.')[0];
|
|
lines.push(`[${time}] ${e.agent}: ${e.action}`);
|
|
}
|
|
lines.push('');
|
|
|
|
lines.push('--- Knowledge Base ---');
|
|
lines.push(s.knowledge_topics.join(', ') || '(empty)');
|
|
|
|
return lines.join('\n');
|
|
}
|
|
|
|
module.exports = {
|
|
loadState,
|
|
logEvent,
|
|
getEvents,
|
|
createWorkstream,
|
|
listWorkstreams,
|
|
addTask,
|
|
claimTask,
|
|
completeTask,
|
|
getNextTask,
|
|
registerAgent,
|
|
heartbeat,
|
|
acquireLock,
|
|
releaseLock,
|
|
writeKnowledge,
|
|
readKnowledge,
|
|
listKnowledge,
|
|
status,
|
|
statusText
|
|
};
|