diff --git a/.gitignore b/.gitignore index e6c5e5d..4cd0ae1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ agent_gateway.db -node_modules/ .env +mcp-bridge/node_modules +node_modules/ diff --git a/.vscode/mcp.json b/.vscode/mcp.json new file mode 100644 index 0000000..7cc2ea9 --- /dev/null +++ b/.vscode/mcp.json @@ -0,0 +1,12 @@ +{ + "servers": { + "test-mcp": { + "url": "https://mcp.arik.work/sample-mcp/sse", + "type": "sse" + } + } + , + "inputs": [] +} + + diff --git a/ecosystem.config.js b/ecosystem.config.js new file mode 100644 index 0000000..b9dced1 --- /dev/null +++ b/ecosystem.config.js @@ -0,0 +1,51 @@ +require('dotenv').config({ path: __dirname + '/.env' }); + +const HUB_SECRET = process.env.HUB_SECRET; +if (!HUB_SECRET) throw new Error('HUB_SECRET not set in .env'); + +module.exports = { + apps: [ + { + name: 'mcp-hub', + script: 'src/index.js', + cwd: '/workspace', + env: { + NODE_ENV: 'development', + PORT: 3000, + HUB_AUTH: JSON.stringify({ 'sample-mcp': HUB_SECRET, 'memory-mcp': HUB_SECRET }) + }, + max_restarts: 10, + restart_delay: 1000, + log_date_format: 'YYYY-MM-DD HH:mm:ss Z', + merge_logs: true + }, + { + name: 'sample-mcp', + script: 'sample-mcp/index.js', + cwd: '/workspace', + env: { + NODE_ENV: 'development', + MCP_SECRET: HUB_SECRET + }, + max_restarts: 10, + restart_delay: 2000, + log_date_format: 'YYYY-MM-DD HH:mm:ss Z', + merge_logs: true + }, + { + name: 'mcp-bridge-memory', + script: 'mcp-bridge/index.js', + cwd: '/workspace', + env: { + BRIDGE_SERVICE_ID: 'memory-mcp', + BRIDGE_SECRET: HUB_SECRET, + BRIDGE_HUB_URL: 'ws://localhost:3000/ws/register', + BRIDGE_UPSTREAM_URL: 'https://memory-mcp.dbchat.ai/mcp/sse' + }, + max_restarts: 10, + restart_delay: 2000, + log_date_format: 'YYYY-MM-DD HH:mm:ss Z', + merge_logs: true + } + ] +}; diff --git a/mcp-bridge/index.js b/mcp-bridge/index.js new file mode 100644 index 0000000..d720347 --- /dev/null +++ b/mcp-bridge/index.js @@ -0,0 +1,287 @@ +'use strict'; + +const WebSocket = require('ws'); +const http = require('http'); +const https = require('https'); +const { URL } = require('url'); + +const SERVICE_ID = process.env.BRIDGE_SERVICE_ID || 'memory-mcp'; +const SECRET = process.env.BRIDGE_SECRET || 'dev-secret'; +const HUB_URL = process.env.BRIDGE_HUB_URL || 'ws://localhost:3000/ws/register'; +const UPSTREAM_URL = process.env.BRIDGE_UPSTREAM_URL || 'https://memory-mcp.dbchat.ai/mcp/sse'; + +// Map of clientSessionId -> { req (IncomingMessage), postUrl, pendingRequests: Map requestId> } +const sessions = new Map(); + +let ws = null; +let reconnectDelay = 1000; + +function log(...args) { + console.log(new Date().toISOString(), '[mcp-bridge]', ...args); +} + +function sendToHub(obj) { + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(obj)); + } +} + +function closeSession(clientSessionId) { + const session = sessions.get(clientSessionId); + if (session) { + log(`Closing upstream session for clientSessionId=${clientSessionId}`); + try { session.req.destroy(); } catch (_) {} + sessions.delete(clientSessionId); + } +} + +function openUpstreamSession(clientSessionId, onReady) { + const upstreamUrl = new URL(UPSTREAM_URL); + const isHttps = upstreamUrl.protocol === 'https:'; + const lib = isHttps ? https : http; + + const options = { + hostname: upstreamUrl.hostname, + port: upstreamUrl.port || (isHttps ? 443 : 80), + path: upstreamUrl.pathname + upstreamUrl.search, + method: 'GET', + headers: { + 'Accept': 'text/event-stream', + 'Cache-Control': 'no-cache' + } + }; + + log(`Opening upstream SSE connection for clientSessionId=${clientSessionId} -> ${UPSTREAM_URL}`); + + const req = lib.request(options, (res) => { + if (res.statusCode !== 200) { + log(`Upstream SSE returned HTTP ${res.statusCode} for clientSessionId=${clientSessionId}`); + req.destroy(); + onReady(new Error(`Upstream SSE HTTP ${res.statusCode}`)); + return; + } + + const session = { + req, + postUrl: null, + pendingRequests: new Map() + }; + sessions.set(clientSessionId, session); + + let buffer = ''; + let currentEvent = null; + + res.on('data', (chunk) => { + buffer += chunk.toString(); + const lines = buffer.split('\n'); + buffer = lines.pop(); // keep incomplete last line + + for (const line of lines) { + if (line.startsWith('event:')) { + currentEvent = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + const data = line.slice(5).trim(); + + if (currentEvent === 'endpoint') { + // Resolve relative path against upstream origin + let postUrl; + if (data.startsWith('http://') || data.startsWith('https://')) { + postUrl = data; + } else { + postUrl = `${upstreamUrl.protocol}//${upstreamUrl.host}${data}`; + } + session.postUrl = postUrl; + log(`Got upstream POST URL for clientSessionId=${clientSessionId}: ${postUrl}`); + onReady(null); + } else if (currentEvent === 'message' || currentEvent === null) { + // JSON-RPC response from upstream + let msg; + try { + msg = JSON.parse(data); + } catch (e) { + log(`Failed to parse upstream SSE message: ${data}`); + return; + } + + if (msg.id !== undefined && msg.id !== null) { + const key = `${clientSessionId}:${msg.id}`; + const requestId = session.pendingRequests.get(key); + if (requestId) { + session.pendingRequests.delete(key); + log(`Relaying response to hub: requestId=${requestId} jsonrpcId=${msg.id}`); + sendToHub({ + type: 'mcp-response', + requestId, + clientSessionId, + payload: msg + }); + } else { + log(`No pending request for key=${key}, dropping response`); + } + } else { + // Notification — no requestId needed, just log + log(`Received upstream notification for clientSessionId=${clientSessionId}:`, JSON.stringify(msg)); + } + } + + currentEvent = null; + } else if (line === '') { + currentEvent = null; + } + } + }); + + res.on('end', () => { + log(`Upstream SSE stream ended for clientSessionId=${clientSessionId}`); + closeSession(clientSessionId); + }); + + res.on('error', (err) => { + log(`Upstream SSE stream error for clientSessionId=${clientSessionId}:`, err.message); + closeSession(clientSessionId); + }); + }); + + req.on('error', (err) => { + log(`Upstream SSE request error for clientSessionId=${clientSessionId}:`, err.message); + sessions.delete(clientSessionId); + onReady(err); + }); + + req.end(); +} + +function postToUpstream(postUrl, payload, callback) { + const body = JSON.stringify(payload); + const url = new URL(postUrl); + const isHttps = url.protocol === 'https:'; + const lib = isHttps ? https : http; + + const options = { + hostname: url.hostname, + port: url.port || (isHttps ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body) + } + }; + + const req = lib.request(options, (res) => { + let data = ''; + res.on('data', (chunk) => { data += chunk; }); + res.on('end', () => { + if (res.statusCode < 200 || res.statusCode >= 300) { + callback(new Error(`POST to upstream returned HTTP ${res.statusCode}: ${data}`)); + } else { + callback(null); + } + }); + }); + + req.on('error', callback); + req.write(body); + req.end(); +} + +function handleMcpRequest(msg) { + const { requestId, clientSessionId, payload } = msg; + + const forwardRequest = () => { + const session = sessions.get(clientSessionId); + if (!session || !session.postUrl) { + log(`No postUrl available for clientSessionId=${clientSessionId}, dropping request`); + return; + } + + // Track pending request if payload has an id + if (payload.id !== undefined && payload.id !== null) { + const key = `${clientSessionId}:${payload.id}`; + session.pendingRequests.set(key, requestId); + log(`Forwarding request: requestId=${requestId} clientSessionId=${clientSessionId} method=${payload.method} jsonrpcId=${payload.id}`); + } else { + log(`Forwarding notification: clientSessionId=${clientSessionId} method=${payload.method}`); + } + + postToUpstream(session.postUrl, payload, (err) => { + if (err) { + log(`Error posting to upstream for clientSessionId=${clientSessionId}:`, err.message); + if (payload.id !== undefined && payload.id !== null) { + const key = `${clientSessionId}:${payload.id}`; + session.pendingRequests.delete(key); + } + } + }); + }; + + if (!sessions.has(clientSessionId)) { + openUpstreamSession(clientSessionId, (err) => { + if (err) { + log(`Failed to open upstream session for clientSessionId=${clientSessionId}:`, err.message); + return; + } + forwardRequest(); + }); + } else { + const session = sessions.get(clientSessionId); + if (!session.postUrl) { + // Still waiting for endpoint event — retry shortly + log(`Session exists but postUrl not yet available for clientSessionId=${clientSessionId}, retrying...`); + setTimeout(forwardRequest, 200); + } else { + forwardRequest(); + } + } +} + +function connect() { + log(`Connecting to hub: ${HUB_URL}`); + ws = new WebSocket(HUB_URL); + + ws.on('open', () => { + reconnectDelay = 1000; + log(`Connected to hub, registering as serviceId=${SERVICE_ID}`); + ws.send(JSON.stringify({ type: 'register', serviceId: SERVICE_ID, secret: SECRET })); + }); + + ws.on('message', (data) => { + let msg; + try { + msg = JSON.parse(data.toString()); + } catch (e) { + log('Failed to parse hub message:', data.toString()); + return; + } + + if (msg.type === 'registered') { + log(`Registered with hub as serviceId=${SERVICE_ID}`); + } else if (msg.type === 'mcp-request') { + handleMcpRequest(msg); + } else { + log('Unknown message from hub:', msg.type); + } + }); + + ws.on('close', (code, reason) => { + log(`Hub WebSocket closed (code=${code}), cleaning up all sessions`); + for (const clientSessionId of sessions.keys()) { + closeSession(clientSessionId); + } + scheduleReconnect(); + }); + + ws.on('error', (err) => { + log('Hub WebSocket error:', err.message); + }); +} + +function scheduleReconnect() { + log(`Reconnecting in ${reconnectDelay}ms...`); + setTimeout(() => { + connect(); + }, reconnectDelay); + reconnectDelay = Math.min(reconnectDelay * 2, 30000); +} + +connect(); diff --git a/mcp-bridge/package-lock.json b/mcp-bridge/package-lock.json new file mode 100644 index 0000000..3ff6b5e --- /dev/null +++ b/mcp-bridge/package-lock.json @@ -0,0 +1,37 @@ +{ + "name": "mcp-bridge", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "mcp-bridge", + "version": "1.0.0", + "license": "ISC", + "dependencies": { + "ws": "^8.19.0" + } + }, + "node_modules/ws": { + "version": "8.19.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", + "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + } + } +} diff --git a/mcp-bridge/package.json b/mcp-bridge/package.json new file mode 100644 index 0000000..a5e882a --- /dev/null +++ b/mcp-bridge/package.json @@ -0,0 +1,13 @@ +{ + "name": "mcp-bridge", + "version": "1.0.0", + "description": "Generic MCP SSE bridge for the MCP hub", + "main": "index.js", + "scripts": { + "start": "node index.js" + }, + "dependencies": { + "ws": "^8.19.0" + }, + "license": "ISC" +} diff --git a/package-lock.json b/package-lock.json index 6566ff3..5565d3c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "1.0.0", "license": "ISC", "dependencies": { + "dotenv": "^17.3.1", "express": "^5.2.1", "uuid": "^13.0.0", "ws": "^8.19.0" @@ -155,6 +156,18 @@ "node": ">= 0.8" } }, + "node_modules/dotenv": { + "version": "17.3.1", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.3.1.tgz", + "integrity": "sha512-IO8C/dzEb6O3F9/twg6ZLXz164a2fhTnEWb95H23Dm4OuN+92NmEAlTrupP9VW6Jm3sO26tQlqyvyi4CsnY9GA==", + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, "node_modules/dunder-proto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz", diff --git a/package.json b/package.json index c43456e..b4d6406 100644 --- a/package.json +++ b/package.json @@ -5,13 +5,19 @@ "scripts": { "test": "node test/e2e.js", "start": "node src/index.js", - "sample-mcp": "node sample-mcp/index.js" + "bridge": "node mcp-bridge/index.js", + "sample-mcp": "node sample-mcp/index.js", + "pm2:start": "pm2 start ecosystem.config.js", + "pm2:stop": "pm2 stop all", + "pm2:logs": "pm2 logs", + "pm2:status": "pm2 status" }, "keywords": [], "author": "", "license": "ISC", "description": "", "dependencies": { + "dotenv": "^17.3.1", "express": "^5.2.1", "uuid": "^13.0.0", "ws": "^8.19.0" diff --git a/sample-mcp/index.js b/sample-mcp/index.js index 7eb3ef1..41c5aba 100644 --- a/sample-mcp/index.js +++ b/sample-mcp/index.js @@ -5,7 +5,7 @@ const WebSocket = require('ws'); const HUB_URL = process.env.HUB_URL || 'wss://mcp.arik.work/ws/register'; const HUB_URL_FALLBACK = 'ws://mcp.arik.work/ws/register'; const SERVICE_ID = 'sample-mcp'; -const SECRET = 'dev-secret'; +const SECRET = process.env.MCP_SECRET || 'dev-secret'; let reconnectDelay = 1000; let ws = null; diff --git a/src/config.js b/src/config.js index c34c9c5..ed90dc8 100644 --- a/src/config.js +++ b/src/config.js @@ -1,4 +1,27 @@ +const DEV_SECRET = 'dev-secret'; + +let serviceAuthMap = null; +if (process.env.HUB_AUTH) { + try { + serviceAuthMap = JSON.parse(process.env.HUB_AUTH); + } catch (e) { + console.error('[config] Failed to parse HUB_AUTH JSON:', e.message); + process.exit(1); + } +} else if (process.env.NODE_ENV === 'production') { + console.error('[config] HUB_AUTH must be set in production'); + process.exit(1); +} + +function getServiceSecret(serviceId) { + if (serviceAuthMap) { + return serviceAuthMap[serviceId] !== undefined ? serviceAuthMap[serviceId] : null; + } + // Dev fallback: accept dev-secret for any service + return DEV_SECRET; +} + module.exports = { PORT: parseInt(process.env.PORT, 10) || 3000, - HUB_SECRET: process.env.HUB_SECRET || 'dev-secret', + getServiceSecret, }; diff --git a/src/event-bus.js b/src/event-bus.js new file mode 100644 index 0000000..6d8e37a --- /dev/null +++ b/src/event-bus.js @@ -0,0 +1,12 @@ +const { EventEmitter } = require('events'); + +const eventBus = new EventEmitter(); +eventBus.setMaxListeners(50); + +function log(level, message) { + console[level]?.(message) ?? console.log(message); + eventBus.emit('log', { level, message, ts: new Date().toISOString() }); +} + +module.exports = eventBus; +module.exports.log = log; diff --git a/src/relay.js b/src/relay.js index 6155902..1cc4378 100644 --- a/src/relay.js +++ b/src/relay.js @@ -3,9 +3,21 @@ const registry = require('./backend-registry'); const pendingRequests = new Map(); -function sendToBackend(serviceId, message, clientSessionId) { +function sendToBackend(serviceId, message, clientSessionId, sessions) { const ws = registry.get(serviceId); - if (!ws) return null; + if (!ws) { + if (sessions && message.id !== undefined && message.id !== null) { + const session = sessions.get(clientSessionId); + if (session) { + session.res.write('event: message\ndata: ' + JSON.stringify({ + jsonrpc: '2.0', + id: message.id, + error: { code: -32603, message: 'Backend unavailable' } + }) + '\n\n'); + } + } + return null; + } const requestId = uuidv4(); pendingRequests.set(requestId, { serviceId, clientSessionId }); @@ -53,4 +65,37 @@ function handleBackendMessage(serviceId, data, sessions) { } } -module.exports = { sendToBackend, handleBackendMessage }; +function cleanupBackend(serviceId, sessions) { + // Clean up pending requests for this backend + for (const [requestId, pending] of pendingRequests) { + if (pending.serviceId === serviceId) { + const session = sessions.get(pending.clientSessionId); + if (session) { + // Write error notification event + session.res.write('event: message\ndata: ' + JSON.stringify({ + jsonrpc: '2.0', + id: null, + error: { code: -32603, message: 'Backend disconnected' } + }) + '\n\n'); + } + pendingRequests.delete(requestId); + } + } + + // Clean up active SSE sessions for this backend + for (const [clientSessionId, session] of sessions) { + if (session.serviceId === serviceId) { + // Write final error event + session.res.write('event: message\ndata: ' + JSON.stringify({ + jsonrpc: '2.0', + id: null, + error: { code: -32603, message: 'Backend disconnected' } + }) + '\n\n'); + // End the response + session.res.end(); + sessions.delete(clientSessionId); + } + } +} + +module.exports = { sendToBackend, handleBackendMessage, cleanupBackend }; diff --git a/src/routes/mcp-proxy.js b/src/routes/mcp-proxy.js index 8e92b89..973f04a 100644 --- a/src/routes/mcp-proxy.js +++ b/src/routes/mcp-proxy.js @@ -42,7 +42,7 @@ router.post('/:serviceId/message', (req, res) => { return res.status(502).json({ error: 'backend not connected' }); } - relay.sendToBackend(serviceId, req.body, sessionId); + relay.sendToBackend(serviceId, req.body, sessionId, sessions); return res.status(202).json({ status: 'accepted' }); }); diff --git a/src/ws-server.js b/src/ws-server.js index e3847ef..9f4be65 100644 --- a/src/ws-server.js +++ b/src/ws-server.js @@ -2,6 +2,8 @@ const WebSocket = require('ws'); const config = require('./config'); const registry = require('./backend-registry'); const relay = require('./relay'); +const eventBus = require('./event-bus'); +const { log } = require('./event-bus'); const PING_INTERVAL_MS = 30000; const MAX_MISSED_PONGS = 2; @@ -19,7 +21,7 @@ function setupWsServer(httpServer) { }); }); - wss.on('connection', (ws) => { + wss.on('connection', (ws, req) => { let serviceId = null; let authenticated = false; let missedPongs = 0; @@ -34,7 +36,14 @@ function setupWsServer(httpServer) { return; } - if (msg.type !== 'register' || !msg.serviceId || msg.secret !== config.HUB_SECRET) { + if (msg.type !== 'register' || !msg.serviceId) { + ws.close(4001, 'unauthorized'); + return; + } + + const expectedSecret = config.getServiceSecret(msg.serviceId); + if (expectedSecret === null || msg.secret !== expectedSecret) { + console.log(`[ws] auth failed for serviceId=${msg.serviceId} from ${req.socket.remoteAddress}`); ws.close(4001, 'unauthorized'); return; } @@ -71,6 +80,9 @@ function setupWsServer(httpServer) { if (pingTimer) clearInterval(pingTimer); if (authenticated && serviceId) { registry.unregister(serviceId); + const { cleanupBackend } = require('./relay'); + const { sessions } = require('./routes/mcp-proxy'); + cleanupBackend(serviceId, sessions); } });