mcp-hub/mcp-bridge/index.js

288 lines
8.7 KiB
JavaScript
Raw Permalink Normal View History

'use strict';
const WebSocket = require('ws');
const http = require('http');
const https = require('https');
const { URL } = require('url');
const SERVICE_ID = process.env.BRIDGE_SERVICE_ID || 'memory-mcp';
const SECRET = process.env.BRIDGE_SECRET || 'dev-secret';
const HUB_URL = process.env.BRIDGE_HUB_URL || 'ws://localhost:3000/ws/register';
const UPSTREAM_URL = process.env.BRIDGE_UPSTREAM_URL || 'https://memory-mcp.dbchat.ai/mcp/sse';
// Map of clientSessionId -> { req (IncomingMessage), postUrl, pendingRequests: Map<jsonrpcId -> requestId> }
const sessions = new Map();
let ws = null;
let reconnectDelay = 1000;
function log(...args) {
console.log(new Date().toISOString(), '[mcp-bridge]', ...args);
}
function sendToHub(obj) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(obj));
}
}
function closeSession(clientSessionId) {
const session = sessions.get(clientSessionId);
if (session) {
log(`Closing upstream session for clientSessionId=${clientSessionId}`);
try { session.req.destroy(); } catch (_) {}
sessions.delete(clientSessionId);
}
}
function openUpstreamSession(clientSessionId, onReady) {
const upstreamUrl = new URL(UPSTREAM_URL);
const isHttps = upstreamUrl.protocol === 'https:';
const lib = isHttps ? https : http;
const options = {
hostname: upstreamUrl.hostname,
port: upstreamUrl.port || (isHttps ? 443 : 80),
path: upstreamUrl.pathname + upstreamUrl.search,
method: 'GET',
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache'
}
};
log(`Opening upstream SSE connection for clientSessionId=${clientSessionId} -> ${UPSTREAM_URL}`);
const req = lib.request(options, (res) => {
if (res.statusCode !== 200) {
log(`Upstream SSE returned HTTP ${res.statusCode} for clientSessionId=${clientSessionId}`);
req.destroy();
onReady(new Error(`Upstream SSE HTTP ${res.statusCode}`));
return;
}
const session = {
req,
postUrl: null,
pendingRequests: new Map()
};
sessions.set(clientSessionId, session);
let buffer = '';
let currentEvent = null;
res.on('data', (chunk) => {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop(); // keep incomplete last line
for (const line of lines) {
if (line.startsWith('event:')) {
currentEvent = line.slice(6).trim();
} else if (line.startsWith('data:')) {
const data = line.slice(5).trim();
if (currentEvent === 'endpoint') {
// Resolve relative path against upstream origin
let postUrl;
if (data.startsWith('http://') || data.startsWith('https://')) {
postUrl = data;
} else {
postUrl = `${upstreamUrl.protocol}//${upstreamUrl.host}${data}`;
}
session.postUrl = postUrl;
log(`Got upstream POST URL for clientSessionId=${clientSessionId}: ${postUrl}`);
onReady(null);
} else if (currentEvent === 'message' || currentEvent === null) {
// JSON-RPC response from upstream
let msg;
try {
msg = JSON.parse(data);
} catch (e) {
log(`Failed to parse upstream SSE message: ${data}`);
return;
}
if (msg.id !== undefined && msg.id !== null) {
const key = `${clientSessionId}:${msg.id}`;
const requestId = session.pendingRequests.get(key);
if (requestId) {
session.pendingRequests.delete(key);
log(`Relaying response to hub: requestId=${requestId} jsonrpcId=${msg.id}`);
sendToHub({
type: 'mcp-response',
requestId,
clientSessionId,
payload: msg
});
} else {
log(`No pending request for key=${key}, dropping response`);
}
} else {
// Notification — no requestId needed, just log
log(`Received upstream notification for clientSessionId=${clientSessionId}:`, JSON.stringify(msg));
}
}
currentEvent = null;
} else if (line === '') {
currentEvent = null;
}
}
});
res.on('end', () => {
log(`Upstream SSE stream ended for clientSessionId=${clientSessionId}`);
closeSession(clientSessionId);
});
res.on('error', (err) => {
log(`Upstream SSE stream error for clientSessionId=${clientSessionId}:`, err.message);
closeSession(clientSessionId);
});
});
req.on('error', (err) => {
log(`Upstream SSE request error for clientSessionId=${clientSessionId}:`, err.message);
sessions.delete(clientSessionId);
onReady(err);
});
req.end();
}
function postToUpstream(postUrl, payload, callback) {
const body = JSON.stringify(payload);
const url = new URL(postUrl);
const isHttps = url.protocol === 'https:';
const lib = isHttps ? https : http;
const options = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname + url.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body)
}
};
const req = lib.request(options, (res) => {
let data = '';
res.on('data', (chunk) => { data += chunk; });
res.on('end', () => {
if (res.statusCode < 200 || res.statusCode >= 300) {
callback(new Error(`POST to upstream returned HTTP ${res.statusCode}: ${data}`));
} else {
callback(null);
}
});
});
req.on('error', callback);
req.write(body);
req.end();
}
function handleMcpRequest(msg) {
const { requestId, clientSessionId, payload } = msg;
const forwardRequest = () => {
const session = sessions.get(clientSessionId);
if (!session || !session.postUrl) {
log(`No postUrl available for clientSessionId=${clientSessionId}, dropping request`);
return;
}
// Track pending request if payload has an id
if (payload.id !== undefined && payload.id !== null) {
const key = `${clientSessionId}:${payload.id}`;
session.pendingRequests.set(key, requestId);
log(`Forwarding request: requestId=${requestId} clientSessionId=${clientSessionId} method=${payload.method} jsonrpcId=${payload.id}`);
} else {
log(`Forwarding notification: clientSessionId=${clientSessionId} method=${payload.method}`);
}
postToUpstream(session.postUrl, payload, (err) => {
if (err) {
log(`Error posting to upstream for clientSessionId=${clientSessionId}:`, err.message);
if (payload.id !== undefined && payload.id !== null) {
const key = `${clientSessionId}:${payload.id}`;
session.pendingRequests.delete(key);
}
}
});
};
if (!sessions.has(clientSessionId)) {
openUpstreamSession(clientSessionId, (err) => {
if (err) {
log(`Failed to open upstream session for clientSessionId=${clientSessionId}:`, err.message);
return;
}
forwardRequest();
});
} else {
const session = sessions.get(clientSessionId);
if (!session.postUrl) {
// Still waiting for endpoint event — retry shortly
log(`Session exists but postUrl not yet available for clientSessionId=${clientSessionId}, retrying...`);
setTimeout(forwardRequest, 200);
} else {
forwardRequest();
}
}
}
function connect() {
log(`Connecting to hub: ${HUB_URL}`);
ws = new WebSocket(HUB_URL);
ws.on('open', () => {
reconnectDelay = 1000;
log(`Connected to hub, registering as serviceId=${SERVICE_ID}`);
ws.send(JSON.stringify({ type: 'register', serviceId: SERVICE_ID, secret: SECRET }));
});
ws.on('message', (data) => {
let msg;
try {
msg = JSON.parse(data.toString());
} catch (e) {
log('Failed to parse hub message:', data.toString());
return;
}
if (msg.type === 'registered') {
log(`Registered with hub as serviceId=${SERVICE_ID}`);
} else if (msg.type === 'mcp-request') {
handleMcpRequest(msg);
} else {
log('Unknown message from hub:', msg.type);
}
});
ws.on('close', (code, reason) => {
log(`Hub WebSocket closed (code=${code}), cleaning up all sessions`);
for (const clientSessionId of sessions.keys()) {
closeSession(clientSessionId);
}
scheduleReconnect();
});
ws.on('error', (err) => {
log('Hub WebSocket error:', err.message);
});
}
function scheduleReconnect() {
log(`Reconnecting in ${reconnectDelay}ms...`);
setTimeout(() => {
connect();
}, reconnectDelay);
reconnectDelay = Math.min(reconnectDelay * 2, 30000);
}
connect();