diff --git a/package.json b/package.json index 5260d78..c43456e 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "version": "1.0.0", "main": "index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1", + "test": "node test/e2e.js", "start": "node src/index.js", "sample-mcp": "node sample-mcp/index.js" }, diff --git a/sample-mcp/index.js b/sample-mcp/index.js index dd3d59b..7eb3ef1 100644 --- a/sample-mcp/index.js +++ b/sample-mcp/index.js @@ -2,16 +2,19 @@ const WebSocket = require('ws'); -const HUB_URL = 'wss://mcp.arik.work/ws/register'; +const HUB_URL = process.env.HUB_URL || 'wss://mcp.arik.work/ws/register'; const HUB_URL_FALLBACK = 'ws://mcp.arik.work/ws/register'; const SERVICE_ID = 'sample-mcp'; const SECRET = 'dev-secret'; let reconnectDelay = 1000; let ws = null; -let useTLS = true; +// If HUB_URL is explicitly set via env, skip TLS fallback logic +const envOverride = !!process.env.HUB_URL; +let useTLS = !envOverride && HUB_URL.startsWith('wss://'); function getHubUrl() { + if (envOverride) return HUB_URL; return useTLS ? HUB_URL : HUB_URL_FALLBACK; } @@ -120,7 +123,7 @@ function connect() { ws.on('error', (err) => { console.error(`[sample-mcp] WebSocket error: ${err.message}`); - if (useTLS && err.message && (err.message.includes('ECONNREFUSED') || err.message.includes('certificate') || err.message.includes('connect'))) { + if (!envOverride && useTLS && err.message && (err.message.includes('ECONNREFUSED') || err.message.includes('certificate') || err.message.includes('connect'))) { console.log('[sample-mcp] TLS connection failed, will try ws:// on next attempt'); useTLS = false; } diff --git a/test/e2e.js b/test/e2e.js new file mode 100644 index 0000000..5dd31d0 --- /dev/null +++ b/test/e2e.js @@ -0,0 +1,283 @@ +'use strict'; + +const { spawn } = require('child_process'); +const http = require('http'); +const path = require('path'); + +const HUB_PORT = 3000; +const TOTAL_TIMEOUT_MS = 15000; +const POLL_INTERVAL_MS = 200; + +let hubProc = null; +let backendProc = null; +let sseReq = null; +let exitCalled = false; + +function cleanup() { + if (sseReq) { try { sseReq.destroy(); } catch (_) {} sseReq = null; } + if (backendProc) { try { backendProc.kill(); } catch (_) {} backendProc = null; } + if (hubProc) { try { hubProc.kill(); } catch (_) {} hubProc = null; } +} + +function die(msg, code = 1) { + if (exitCalled) return; + exitCalled = true; + console.error('[e2e] FAIL:', msg); + cleanup(); + process.exit(code); +} + +function pass(msg) { + if (exitCalled) return; + exitCalled = true; + console.log('[e2e] PASS:', msg); + cleanup(); + process.exit(0); +} + +const globalTimer = setTimeout(() => die('Total timeout exceeded (15s)'), TOTAL_TIMEOUT_MS); +globalTimer.unref(); + +function httpGet(url) { + return new Promise((resolve, reject) => { + http.get(url, (res) => { + let body = ''; + res.on('data', (d) => { body += d; }); + res.on('end', () => { + try { resolve({ status: res.statusCode, body: JSON.parse(body) }); } + catch (e) { resolve({ status: res.statusCode, body }); } + }); + }).on('error', reject); + }); +} + +function httpPost(url, payload) { + return new Promise((resolve, reject) => { + const data = JSON.stringify(payload); + const opts = { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(data) }, + }; + const u = new URL(url); + opts.hostname = u.hostname; + opts.port = u.port; + opts.path = u.pathname + u.search; + + const req = http.request(opts, (res) => { + let body = ''; + res.on('data', (d) => { body += d; }); + res.on('end', () => { + try { resolve({ status: res.statusCode, body: JSON.parse(body) }); } + catch (e) { resolve({ status: res.statusCode, body }); } + }); + }); + req.on('error', reject); + req.write(data); + req.end(); + }); +} + +function poll(fn, label, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const deadline = Date.now() + timeoutMs; + function attempt() { + if (Date.now() > deadline) return reject(new Error('Timeout waiting for: ' + label)); + fn().then((ok) => { + if (ok) resolve(); + else setTimeout(attempt, POLL_INTERVAL_MS); + }).catch(() => setTimeout(attempt, POLL_INTERVAL_MS)); + } + attempt(); + }); +} + +function spawnProc(name, cmd, args, env) { + const proc = spawn(cmd, args, { + env: { ...process.env, ...env }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + proc.stdout.on('data', (d) => process.stdout.write(`[${name}] ${d}`)); + proc.stderr.on('data', (d) => process.stderr.write(`[${name}] ${d}`)); + proc.on('exit', (code) => { + if (!exitCalled && code !== null && code !== 0) { + die(`${name} exited with code ${code}`); + } + }); + return proc; +} + +// Open SSE connection and return a stream of parsed events {event, data} +function openSSE(url) { + return new Promise((resolve, reject) => { + const u = new URL(url); + const options = { hostname: u.hostname, port: u.port, path: u.pathname + u.search, headers: { Accept: 'text/event-stream' } }; + + const req = http.get(options, (res) => { + if (res.statusCode !== 200) { + return reject(new Error('SSE connect failed: ' + res.statusCode)); + } + + const listeners = []; + let buffer = ''; + + res.on('data', (chunk) => { + buffer += chunk.toString(); + const parts = buffer.split('\n\n'); + buffer = parts.pop(); // keep incomplete last chunk + for (const part of parts) { + if (!part.trim()) continue; + let event = 'message'; + let data = ''; + for (const line of part.split('\n')) { + if (line.startsWith('event: ')) event = line.slice(7).trim(); + else if (line.startsWith('data: ')) data = line.slice(6).trim(); + } + for (const fn of listeners) fn({ event, data }); + } + }); + + res.on('error', (err) => { if (!exitCalled) die('SSE stream error: ' + err.message); }); + res.on('end', () => { if (!exitCalled) die('SSE stream ended unexpectedly'); }); + + resolve({ addListener: (fn) => listeners.push(fn) }); + }); + + req.on('error', reject); + sseReq = req; + }); +} + +function waitForEvent(sse, eventName, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('Timeout waiting for SSE event: ' + eventName)), timeoutMs); + sse.addListener(({ event, data }) => { + if (event === eventName) { + clearTimeout(timer); + resolve(data); + } + }); + }); +} + +function waitForSSEResponse(sse, id, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('Timeout waiting for JSON-RPC response id=' + id)), timeoutMs); + sse.addListener(({ event, data }) => { + if (event !== 'message') return; + try { + const msg = JSON.parse(data); + if (msg.id === id) { + clearTimeout(timer); + resolve(msg); + } + } catch (_) {} + }); + }); +} + +async function run() { + // 1. Start hub + console.log('[e2e] Starting hub...'); + hubProc = spawnProc('hub', 'node', [path.join(__dirname, '../src/index.js')], {}); + + // 2. Wait for hub /health + await poll(async () => { + const r = await httpGet(`http://localhost:${HUB_PORT}/health`); + return r.status === 200; + }, 'hub /health'); + console.log('[e2e] Hub is ready'); + + // 3. Start sample-mcp with local hub URL + console.log('[e2e] Starting sample-mcp...'); + backendProc = spawnProc('sample-mcp', 'node', [path.join(__dirname, '../sample-mcp/index.js')], { + HUB_URL: `ws://localhost:${HUB_PORT}/ws/register`, + }); + + // 4. Wait for backend to register + await poll(async () => { + const r = await httpGet(`http://localhost:${HUB_PORT}/health`); + return r.status === 200 && r.body.connectedBackends >= 1; + }, 'backend registration'); + console.log('[e2e] Backend registered'); + + // 5. Open SSE connection + console.log('[e2e] Opening SSE connection...'); + const sse = await openSSE(`http://localhost:${HUB_PORT}/sample-mcp/sse`); + console.log('[e2e] SSE connected'); + + // 6. Wait for 'endpoint' event + const endpointData = await waitForEvent(sse, 'endpoint'); + console.log('[e2e] Got endpoint:', endpointData); + const messageUrl = `http://localhost:${HUB_PORT}${endpointData}`; + + // 7. Send initialize + const initId = 1; + const initResponseP = waitForSSEResponse(sse, initId); + await httpPost(messageUrl, { + jsonrpc: '2.0', + id: initId, + method: 'initialize', + params: { + protocolVersion: '2024-11-05', + capabilities: {}, + clientInfo: { name: 'test', version: '1.0.0' }, + }, + }); + + const initResponse = await initResponseP; + console.log('[e2e] initialize response:', JSON.stringify(initResponse)); + if (!initResponse.result || !initResponse.result.protocolVersion) { + die('initialize response missing protocolVersion'); + } + if (!initResponse.result.serverInfo) { + die('initialize response missing serverInfo'); + } + console.log('[e2e] ✓ initialize OK'); + + // 8. Send initialized notification (no response expected) + await httpPost(messageUrl, { + jsonrpc: '2.0', + method: 'notifications/initialized', + }); + console.log('[e2e] ✓ initialized notification sent'); + + // 9. Send tools/list + const listId = 2; + const listResponseP = waitForSSEResponse(sse, listId); + await httpPost(messageUrl, { + jsonrpc: '2.0', + id: listId, + method: 'tools/list', + params: {}, + }); + + const listResponse = await listResponseP; + console.log('[e2e] tools/list response:', JSON.stringify(listResponse)); + const tools = listResponse.result && listResponse.result.tools; + if (!tools || !tools.find((t) => t.name === 'echo')) { + die('tools/list response missing echo tool'); + } + console.log('[e2e] ✓ tools/list OK'); + + // 10. Send tools/call echo + const callId = 3; + const callResponseP = waitForSSEResponse(sse, callId); + await httpPost(messageUrl, { + jsonrpc: '2.0', + id: callId, + method: 'tools/call', + params: { name: 'echo', arguments: { message: 'hello hub' } }, + }); + + const callResponse = await callResponseP; + console.log('[e2e] tools/call response:', JSON.stringify(callResponse)); + const content = callResponse.result && callResponse.result.content; + if (!content || !content[0] || !content[0].text.includes('Echo: hello hub')) { + die('tools/call response missing "Echo: hello hub"'); + } + console.log('[e2e] ✓ tools/call OK'); + + pass('All checks passed'); +} + +run().catch((err) => die(err.message));