diff --git a/.gitignore b/.gitignore index e6c5e5d..021954c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ agent_gateway.db node_modules/ .env +mcp-bridge/node_modules diff --git a/ecosystem.config.js b/ecosystem.config.js index 87ebd57..608fc2f 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -19,6 +19,17 @@ module.exports = { restart_delay: 2000, log_date_format: 'YYYY-MM-DD HH:mm:ss Z', merge_logs: true + }, + { + name: 'mcp-bridge-memory', + script: 'mcp-bridge/index.js', + cwd: '/workspace', + env: { + BRIDGE_SERVICE_ID: 'memory-mcp', + BRIDGE_SECRET: 'dev-secret', + BRIDGE_HUB_URL: 'ws://localhost:3000/ws/register', + BRIDGE_UPSTREAM_URL: 'https://memory-mcp.dbchat.ai/mcp/sse' + } } ] }; diff --git a/mcp-bridge/index.js b/mcp-bridge/index.js new file mode 100644 index 0000000..d720347 --- /dev/null +++ b/mcp-bridge/index.js @@ -0,0 +1,287 @@ +'use strict'; + +const WebSocket = require('ws'); +const http = require('http'); +const https = require('https'); +const { URL } = require('url'); + +const SERVICE_ID = process.env.BRIDGE_SERVICE_ID || 'memory-mcp'; +const SECRET = process.env.BRIDGE_SECRET || 'dev-secret'; +const HUB_URL = process.env.BRIDGE_HUB_URL || 'ws://localhost:3000/ws/register'; +const UPSTREAM_URL = process.env.BRIDGE_UPSTREAM_URL || 'https://memory-mcp.dbchat.ai/mcp/sse'; + +// Map of clientSessionId -> { req (IncomingMessage), postUrl, pendingRequests: Map requestId> } +const sessions = new Map(); + +let ws = null; +let reconnectDelay = 1000; + +function log(...args) { + console.log(new Date().toISOString(), '[mcp-bridge]', ...args); +} + +function sendToHub(obj) { + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(obj)); + } +} + +function closeSession(clientSessionId) { + const session = sessions.get(clientSessionId); + if (session) { + log(`Closing upstream session for clientSessionId=${clientSessionId}`); + try { session.req.destroy(); } catch (_) {} + sessions.delete(clientSessionId); + } +} + +function openUpstreamSession(clientSessionId, onReady) { + const upstreamUrl = new URL(UPSTREAM_URL); + const isHttps = upstreamUrl.protocol === 'https:'; + const lib = isHttps ? https : http; + + const options = { + hostname: upstreamUrl.hostname, + port: upstreamUrl.port || (isHttps ? 443 : 80), + path: upstreamUrl.pathname + upstreamUrl.search, + method: 'GET', + headers: { + 'Accept': 'text/event-stream', + 'Cache-Control': 'no-cache' + } + }; + + log(`Opening upstream SSE connection for clientSessionId=${clientSessionId} -> ${UPSTREAM_URL}`); + + const req = lib.request(options, (res) => { + if (res.statusCode !== 200) { + log(`Upstream SSE returned HTTP ${res.statusCode} for clientSessionId=${clientSessionId}`); + req.destroy(); + onReady(new Error(`Upstream SSE HTTP ${res.statusCode}`)); + return; + } + + const session = { + req, + postUrl: null, + pendingRequests: new Map() + }; + sessions.set(clientSessionId, session); + + let buffer = ''; + let currentEvent = null; + + res.on('data', (chunk) => { + buffer += chunk.toString(); + const lines = buffer.split('\n'); + buffer = lines.pop(); // keep incomplete last line + + for (const line of lines) { + if (line.startsWith('event:')) { + currentEvent = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + const data = line.slice(5).trim(); + + if (currentEvent === 'endpoint') { + // Resolve relative path against upstream origin + let postUrl; + if (data.startsWith('http://') || data.startsWith('https://')) { + postUrl = data; + } else { + postUrl = `${upstreamUrl.protocol}//${upstreamUrl.host}${data}`; + } + session.postUrl = postUrl; + log(`Got upstream POST URL for clientSessionId=${clientSessionId}: ${postUrl}`); + onReady(null); + } else if (currentEvent === 'message' || currentEvent === null) { + // JSON-RPC response from upstream + let msg; + try { + msg = JSON.parse(data); + } catch (e) { + log(`Failed to parse upstream SSE message: ${data}`); + return; + } + + if (msg.id !== undefined && msg.id !== null) { + const key = `${clientSessionId}:${msg.id}`; + const requestId = session.pendingRequests.get(key); + if (requestId) { + session.pendingRequests.delete(key); + log(`Relaying response to hub: requestId=${requestId} jsonrpcId=${msg.id}`); + sendToHub({ + type: 'mcp-response', + requestId, + clientSessionId, + payload: msg + }); + } else { + log(`No pending request for key=${key}, dropping response`); + } + } else { + // Notification — no requestId needed, just log + log(`Received upstream notification for clientSessionId=${clientSessionId}:`, JSON.stringify(msg)); + } + } + + currentEvent = null; + } else if (line === '') { + currentEvent = null; + } + } + }); + + res.on('end', () => { + log(`Upstream SSE stream ended for clientSessionId=${clientSessionId}`); + closeSession(clientSessionId); + }); + + res.on('error', (err) => { + log(`Upstream SSE stream error for clientSessionId=${clientSessionId}:`, err.message); + closeSession(clientSessionId); + }); + }); + + req.on('error', (err) => { + log(`Upstream SSE request error for clientSessionId=${clientSessionId}:`, err.message); + sessions.delete(clientSessionId); + onReady(err); + }); + + req.end(); +} + +function postToUpstream(postUrl, payload, callback) { + const body = JSON.stringify(payload); + const url = new URL(postUrl); + const isHttps = url.protocol === 'https:'; + const lib = isHttps ? https : http; + + const options = { + hostname: url.hostname, + port: url.port || (isHttps ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body) + } + }; + + const req = lib.request(options, (res) => { + let data = ''; + res.on('data', (chunk) => { data += chunk; }); + res.on('end', () => { + if (res.statusCode < 200 || res.statusCode >= 300) { + callback(new Error(`POST to upstream returned HTTP ${res.statusCode}: ${data}`)); + } else { + callback(null); + } + }); + }); + + req.on('error', callback); + req.write(body); + req.end(); +} + +function handleMcpRequest(msg) { + const { requestId, clientSessionId, payload } = msg; + + const forwardRequest = () => { + const session = sessions.get(clientSessionId); + if (!session || !session.postUrl) { + log(`No postUrl available for clientSessionId=${clientSessionId}, dropping request`); + return; + } + + // Track pending request if payload has an id + if (payload.id !== undefined && payload.id !== null) { + const key = `${clientSessionId}:${payload.id}`; + session.pendingRequests.set(key, requestId); + log(`Forwarding request: requestId=${requestId} clientSessionId=${clientSessionId} method=${payload.method} jsonrpcId=${payload.id}`); + } else { + log(`Forwarding notification: clientSessionId=${clientSessionId} method=${payload.method}`); + } + + postToUpstream(session.postUrl, payload, (err) => { + if (err) { + log(`Error posting to upstream for clientSessionId=${clientSessionId}:`, err.message); + if (payload.id !== undefined && payload.id !== null) { + const key = `${clientSessionId}:${payload.id}`; + session.pendingRequests.delete(key); + } + } + }); + }; + + if (!sessions.has(clientSessionId)) { + openUpstreamSession(clientSessionId, (err) => { + if (err) { + log(`Failed to open upstream session for clientSessionId=${clientSessionId}:`, err.message); + return; + } + forwardRequest(); + }); + } else { + const session = sessions.get(clientSessionId); + if (!session.postUrl) { + // Still waiting for endpoint event — retry shortly + log(`Session exists but postUrl not yet available for clientSessionId=${clientSessionId}, retrying...`); + setTimeout(forwardRequest, 200); + } else { + forwardRequest(); + } + } +} + +function connect() { + log(`Connecting to hub: ${HUB_URL}`); + ws = new WebSocket(HUB_URL); + + ws.on('open', () => { + reconnectDelay = 1000; + log(`Connected to hub, registering as serviceId=${SERVICE_ID}`); + ws.send(JSON.stringify({ type: 'register', serviceId: SERVICE_ID, secret: SECRET })); + }); + + ws.on('message', (data) => { + let msg; + try { + msg = JSON.parse(data.toString()); + } catch (e) { + log('Failed to parse hub message:', data.toString()); + return; + } + + if (msg.type === 'registered') { + log(`Registered with hub as serviceId=${SERVICE_ID}`); + } else if (msg.type === 'mcp-request') { + handleMcpRequest(msg); + } else { + log('Unknown message from hub:', msg.type); + } + }); + + ws.on('close', (code, reason) => { + log(`Hub WebSocket closed (code=${code}), cleaning up all sessions`); + for (const clientSessionId of sessions.keys()) { + closeSession(clientSessionId); + } + scheduleReconnect(); + }); + + ws.on('error', (err) => { + log('Hub WebSocket error:', err.message); + }); +} + +function scheduleReconnect() { + log(`Reconnecting in ${reconnectDelay}ms...`); + setTimeout(() => { + connect(); + }, reconnectDelay); + reconnectDelay = Math.min(reconnectDelay * 2, 30000); +} + +connect(); diff --git a/mcp-bridge/package-lock.json b/mcp-bridge/package-lock.json new file mode 100644 index 0000000..3ff6b5e --- /dev/null +++ b/mcp-bridge/package-lock.json @@ -0,0 +1,37 @@ +{ + "name": "mcp-bridge", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "mcp-bridge", + "version": "1.0.0", + "license": "ISC", + "dependencies": { + "ws": "^8.19.0" + } + }, + "node_modules/ws": { + "version": "8.19.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", + "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + } + } +} diff --git a/mcp-bridge/package.json b/mcp-bridge/package.json new file mode 100644 index 0000000..a5e882a --- /dev/null +++ b/mcp-bridge/package.json @@ -0,0 +1,13 @@ +{ + "name": "mcp-bridge", + "version": "1.0.0", + "description": "Generic MCP SSE bridge for the MCP hub", + "main": "index.js", + "scripts": { + "start": "node index.js" + }, + "dependencies": { + "ws": "^8.19.0" + }, + "license": "ISC" +} diff --git a/package.json b/package.json index 1d7a7d2..e8d1180 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "scripts": { "test": "node test/e2e.js", "start": "node src/index.js", + "bridge": "node mcp-bridge/index.js", "sample-mcp": "node sample-mcp/index.js", "pm2:start": "pm2 start ecosystem.config.js", "pm2:stop": "pm2 stop all",