'use strict'; const WebSocket = require('ws'); const http = require('http'); const https = require('https'); const { URL } = require('url'); const SERVICE_ID = process.env.BRIDGE_SERVICE_ID || 'memory-mcp'; const SECRET = process.env.BRIDGE_SECRET || 'dev-secret'; const HUB_URL = process.env.BRIDGE_HUB_URL || 'ws://localhost:3000/ws/register'; const UPSTREAM_URL = process.env.BRIDGE_UPSTREAM_URL || 'https://memory-mcp.dbchat.ai/mcp/sse'; // Map of clientSessionId -> { req (IncomingMessage), postUrl, pendingRequests: Map requestId> } const sessions = new Map(); let ws = null; let reconnectDelay = 1000; function log(...args) { console.log(new Date().toISOString(), '[mcp-bridge]', ...args); } function sendToHub(obj) { if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(obj)); } } function closeSession(clientSessionId) { const session = sessions.get(clientSessionId); if (session) { log(`Closing upstream session for clientSessionId=${clientSessionId}`); try { session.req.destroy(); } catch (_) {} sessions.delete(clientSessionId); } } function openUpstreamSession(clientSessionId, onReady) { const upstreamUrl = new URL(UPSTREAM_URL); const isHttps = upstreamUrl.protocol === 'https:'; const lib = isHttps ? https : http; const options = { hostname: upstreamUrl.hostname, port: upstreamUrl.port || (isHttps ? 443 : 80), path: upstreamUrl.pathname + upstreamUrl.search, method: 'GET', headers: { 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache' } }; log(`Opening upstream SSE connection for clientSessionId=${clientSessionId} -> ${UPSTREAM_URL}`); const req = lib.request(options, (res) => { if (res.statusCode !== 200) { log(`Upstream SSE returned HTTP ${res.statusCode} for clientSessionId=${clientSessionId}`); req.destroy(); onReady(new Error(`Upstream SSE HTTP ${res.statusCode}`)); return; } const session = { req, postUrl: null, pendingRequests: new Map() }; sessions.set(clientSessionId, session); let buffer = ''; let currentEvent = null; res.on('data', (chunk) => { buffer += chunk.toString(); const lines = buffer.split('\n'); buffer = lines.pop(); // keep incomplete last line for (const line of lines) { if (line.startsWith('event:')) { currentEvent = line.slice(6).trim(); } else if (line.startsWith('data:')) { const data = line.slice(5).trim(); if (currentEvent === 'endpoint') { // Resolve relative path against upstream origin let postUrl; if (data.startsWith('http://') || data.startsWith('https://')) { postUrl = data; } else { postUrl = `${upstreamUrl.protocol}//${upstreamUrl.host}${data}`; } session.postUrl = postUrl; log(`Got upstream POST URL for clientSessionId=${clientSessionId}: ${postUrl}`); onReady(null); } else if (currentEvent === 'message' || currentEvent === null) { // JSON-RPC response from upstream let msg; try { msg = JSON.parse(data); } catch (e) { log(`Failed to parse upstream SSE message: ${data}`); return; } if (msg.id !== undefined && msg.id !== null) { const key = `${clientSessionId}:${msg.id}`; const requestId = session.pendingRequests.get(key); if (requestId) { session.pendingRequests.delete(key); log(`Relaying response to hub: requestId=${requestId} jsonrpcId=${msg.id}`); sendToHub({ type: 'mcp-response', requestId, clientSessionId, payload: msg }); } else { log(`No pending request for key=${key}, dropping response`); } } else { // Notification — no requestId needed, just log log(`Received upstream notification for clientSessionId=${clientSessionId}:`, JSON.stringify(msg)); } } currentEvent = null; } else if (line === '') { currentEvent = null; } } }); res.on('end', () => { log(`Upstream SSE stream ended for clientSessionId=${clientSessionId}`); closeSession(clientSessionId); }); res.on('error', (err) => { log(`Upstream SSE stream error for clientSessionId=${clientSessionId}:`, err.message); closeSession(clientSessionId); }); }); req.on('error', (err) => { log(`Upstream SSE request error for clientSessionId=${clientSessionId}:`, err.message); sessions.delete(clientSessionId); onReady(err); }); req.end(); } function postToUpstream(postUrl, payload, callback) { const body = JSON.stringify(payload); const url = new URL(postUrl); const isHttps = url.protocol === 'https:'; const lib = isHttps ? https : http; const options = { hostname: url.hostname, port: url.port || (isHttps ? 443 : 80), path: url.pathname + url.search, method: 'POST', headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) } }; const req = lib.request(options, (res) => { let data = ''; res.on('data', (chunk) => { data += chunk; }); res.on('end', () => { if (res.statusCode < 200 || res.statusCode >= 300) { callback(new Error(`POST to upstream returned HTTP ${res.statusCode}: ${data}`)); } else { callback(null); } }); }); req.on('error', callback); req.write(body); req.end(); } function handleMcpRequest(msg) { const { requestId, clientSessionId, payload } = msg; const forwardRequest = () => { const session = sessions.get(clientSessionId); if (!session || !session.postUrl) { log(`No postUrl available for clientSessionId=${clientSessionId}, dropping request`); return; } // Track pending request if payload has an id if (payload.id !== undefined && payload.id !== null) { const key = `${clientSessionId}:${payload.id}`; session.pendingRequests.set(key, requestId); log(`Forwarding request: requestId=${requestId} clientSessionId=${clientSessionId} method=${payload.method} jsonrpcId=${payload.id}`); } else { log(`Forwarding notification: clientSessionId=${clientSessionId} method=${payload.method}`); } postToUpstream(session.postUrl, payload, (err) => { if (err) { log(`Error posting to upstream for clientSessionId=${clientSessionId}:`, err.message); if (payload.id !== undefined && payload.id !== null) { const key = `${clientSessionId}:${payload.id}`; session.pendingRequests.delete(key); } } }); }; if (!sessions.has(clientSessionId)) { openUpstreamSession(clientSessionId, (err) => { if (err) { log(`Failed to open upstream session for clientSessionId=${clientSessionId}:`, err.message); return; } forwardRequest(); }); } else { const session = sessions.get(clientSessionId); if (!session.postUrl) { // Still waiting for endpoint event — retry shortly log(`Session exists but postUrl not yet available for clientSessionId=${clientSessionId}, retrying...`); setTimeout(forwardRequest, 200); } else { forwardRequest(); } } } function connect() { log(`Connecting to hub: ${HUB_URL}`); ws = new WebSocket(HUB_URL); ws.on('open', () => { reconnectDelay = 1000; log(`Connected to hub, registering as serviceId=${SERVICE_ID}`); ws.send(JSON.stringify({ type: 'register', serviceId: SERVICE_ID, secret: SECRET })); }); ws.on('message', (data) => { let msg; try { msg = JSON.parse(data.toString()); } catch (e) { log('Failed to parse hub message:', data.toString()); return; } if (msg.type === 'registered') { log(`Registered with hub as serviceId=${SERVICE_ID}`); } else if (msg.type === 'mcp-request') { handleMcpRequest(msg); } else { log('Unknown message from hub:', msg.type); } }); ws.on('close', (code, reason) => { log(`Hub WebSocket closed (code=${code}), cleaning up all sessions`); for (const clientSessionId of sessions.keys()) { closeSession(clientSessionId); } scheduleReconnect(); }); ws.on('error', (err) => { log('Hub WebSocket error:', err.message); }); } function scheduleReconnect() { log(`Reconnecting in ${reconnectDelay}ms...`); setTimeout(() => { connect(); }, reconnectDelay); reconnectDelay = Math.min(reconnectDelay * 2, 30000); } connect();