'use strict'; require('dotenv').config({ path: __dirname + '/../.env' }); const { spawn } = require('child_process'); const http = require('http'); const path = require('path'); if (process.env.GOOGLE_CLIENT_ID && process.env.GOOGLE_CLIENT_ID !== 'your-google-client-id-here') { console.log('[test] OAuth enabled — skipping E2E (requires browser flow).'); process.exit(0); } const HUB_PORT = 3000; const TOTAL_TIMEOUT_MS = 15000; const POLL_INTERVAL_MS = 200; let hubProc = null; let backendProc = null; let sseReq = null; let exitCalled = false; function cleanup() { if (sseReq) { try { sseReq.destroy(); } catch (_) {} sseReq = null; } if (backendProc) { try { backendProc.kill(); } catch (_) {} backendProc = null; } if (hubProc) { try { hubProc.kill(); } catch (_) {} hubProc = null; } } function die(msg, code = 1) { if (exitCalled) return; exitCalled = true; console.error('[e2e] FAIL:', msg); cleanup(); process.exit(code); } function pass(msg) { if (exitCalled) return; exitCalled = true; console.log('[e2e] PASS:', msg); cleanup(); process.exit(0); } const globalTimer = setTimeout(() => die('Total timeout exceeded (15s)'), TOTAL_TIMEOUT_MS); globalTimer.unref(); function httpGet(url) { return new Promise((resolve, reject) => { http.get(url, (res) => { let body = ''; res.on('data', (d) => { body += d; }); res.on('end', () => { try { resolve({ status: res.statusCode, body: JSON.parse(body) }); } catch (e) { resolve({ status: res.statusCode, body }); } }); }).on('error', reject); }); } function httpPost(url, payload) { return new Promise((resolve, reject) => { const data = JSON.stringify(payload); const opts = { method: 'POST', headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(data) }, }; const u = new URL(url); opts.hostname = u.hostname; opts.port = u.port; opts.path = u.pathname + u.search; const req = http.request(opts, (res) => { let body = ''; res.on('data', (d) => { body += d; }); res.on('end', () => { try { resolve({ status: res.statusCode, body: JSON.parse(body) }); } catch (e) { resolve({ status: res.statusCode, body }); } }); }); req.on('error', reject); req.write(data); req.end(); }); } function poll(fn, label, timeoutMs = 8000) { return new Promise((resolve, reject) => { const deadline = Date.now() + timeoutMs; function attempt() { if (Date.now() > deadline) return reject(new Error('Timeout waiting for: ' + label)); fn().then((ok) => { if (ok) resolve(); else setTimeout(attempt, POLL_INTERVAL_MS); }).catch(() => setTimeout(attempt, POLL_INTERVAL_MS)); } attempt(); }); } function spawnProc(name, cmd, args, env) { const proc = spawn(cmd, args, { env: { ...process.env, ...env }, stdio: ['ignore', 'pipe', 'pipe'], }); proc.stdout.on('data', (d) => process.stdout.write(`[${name}] ${d}`)); proc.stderr.on('data', (d) => process.stderr.write(`[${name}] ${d}`)); proc.on('exit', (code) => { if (!exitCalled && code !== null && code !== 0) { die(`${name} exited with code ${code}`); } }); return proc; } // Open SSE connection and return a stream of parsed events {event, data} function openSSE(url) { return new Promise((resolve, reject) => { const u = new URL(url); const options = { hostname: u.hostname, port: u.port, path: u.pathname + u.search, headers: { Accept: 'text/event-stream' } }; const req = http.get(options, (res) => { if (res.statusCode !== 200) { return reject(new Error('SSE connect failed: ' + res.statusCode)); } const listeners = []; let buffer = ''; res.on('data', (chunk) => { buffer += chunk.toString(); const parts = buffer.split('\n\n'); buffer = parts.pop(); // keep incomplete last chunk for (const part of parts) { if (!part.trim()) continue; let event = 'message'; let data = ''; for (const line of part.split('\n')) { if (line.startsWith('event: ')) event = line.slice(7).trim(); else if (line.startsWith('data: ')) data = line.slice(6).trim(); } for (const fn of listeners) fn({ event, data }); } }); res.on('error', (err) => { if (!exitCalled) die('SSE stream error: ' + err.message); }); res.on('end', () => { if (!exitCalled) die('SSE stream ended unexpectedly'); }); resolve({ addListener: (fn) => listeners.push(fn) }); }); req.on('error', reject); sseReq = req; }); } function waitForEvent(sse, eventName, timeoutMs = 8000) { return new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error('Timeout waiting for SSE event: ' + eventName)), timeoutMs); sse.addListener(({ event, data }) => { if (event === eventName) { clearTimeout(timer); resolve(data); } }); }); } function waitForSSEResponse(sse, id, timeoutMs = 8000) { return new Promise((resolve, reject) => { const timer = setTimeout(() => reject(new Error('Timeout waiting for JSON-RPC response id=' + id)), timeoutMs); sse.addListener(({ event, data }) => { if (event !== 'message') return; try { const msg = JSON.parse(data); if (msg.id === id) { clearTimeout(timer); resolve(msg); } } catch (_) {} }); }); } async function run() { // 1. Start hub console.log('[e2e] Starting hub...'); hubProc = spawnProc('hub', 'node', [path.join(__dirname, '../src/index.js')], {}); // 2. Wait for hub /health await poll(async () => { const r = await httpGet(`http://localhost:${HUB_PORT}/health`); return r.status === 200; }, 'hub /health'); console.log('[e2e] Hub is ready'); // 3. Start sample-mcp with local hub URL console.log('[e2e] Starting sample-mcp...'); backendProc = spawnProc('sample-mcp', 'node', [path.join(__dirname, '../sample-mcp/index.js')], { HUB_URL: `ws://localhost:${HUB_PORT}/ws/register`, }); // 4. Wait for backend to register await poll(async () => { const r = await httpGet(`http://localhost:${HUB_PORT}/health`); return r.status === 200 && r.body.connectedBackends >= 1; }, 'backend registration'); console.log('[e2e] Backend registered'); // 5. Open SSE connection console.log('[e2e] Opening SSE connection...'); const sse = await openSSE(`http://localhost:${HUB_PORT}/sample-mcp/sse`); console.log('[e2e] SSE connected'); // 6. Wait for 'endpoint' event const endpointData = await waitForEvent(sse, 'endpoint'); console.log('[e2e] Got endpoint:', endpointData); const messageUrl = `http://localhost:${HUB_PORT}${endpointData}`; // 7. Send initialize const initId = 1; const initResponseP = waitForSSEResponse(sse, initId); await httpPost(messageUrl, { jsonrpc: '2.0', id: initId, method: 'initialize', params: { protocolVersion: '2024-11-05', capabilities: {}, clientInfo: { name: 'test', version: '1.0.0' }, }, }); const initResponse = await initResponseP; console.log('[e2e] initialize response:', JSON.stringify(initResponse)); if (!initResponse.result || !initResponse.result.protocolVersion) { die('initialize response missing protocolVersion'); } if (!initResponse.result.serverInfo) { die('initialize response missing serverInfo'); } console.log('[e2e] ✓ initialize OK'); // 8. Send initialized notification (no response expected) await httpPost(messageUrl, { jsonrpc: '2.0', method: 'notifications/initialized', }); console.log('[e2e] ✓ initialized notification sent'); // 9. Send tools/list const listId = 2; const listResponseP = waitForSSEResponse(sse, listId); await httpPost(messageUrl, { jsonrpc: '2.0', id: listId, method: 'tools/list', params: {}, }); const listResponse = await listResponseP; console.log('[e2e] tools/list response:', JSON.stringify(listResponse)); const tools = listResponse.result && listResponse.result.tools; if (!tools || !tools.find((t) => t.name === 'echo')) { die('tools/list response missing echo tool'); } console.log('[e2e] ✓ tools/list OK'); // 10. Send tools/call echo const callId = 3; const callResponseP = waitForSSEResponse(sse, callId); await httpPost(messageUrl, { jsonrpc: '2.0', id: callId, method: 'tools/call', params: { name: 'echo', arguments: { message: 'hello hub' } }, }); const callResponse = await callResponseP; console.log('[e2e] tools/call response:', JSON.stringify(callResponse)); const content = callResponse.result && callResponse.result.content; if (!content || !content[0] || !content[0].text.includes('Echo: hello hub')) { die('tools/call response missing "Echo: hello hub"'); } console.log('[e2e] ✓ tools/call OK'); pass('All checks passed'); } run().catch((err) => die(err.message));