feat: add generic mcp-bridge for proxying upstream MCP SSE servers
- Add mcp-bridge/index.js: WebSocket backend for hub, relays requests to upstream MCP SSE servers via SSE+POST transport - Add mcp-bridge/package.json with 'ws' dependency - Add 'mcp-bridge-memory' entry to ecosystem.config.js (not started) - Add 'bridge' script to root package.json - Add mcp-bridge/node_modules to .gitignore Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
parent
85b3f5b6e2
commit
f2f64ec392
6 changed files with 350 additions and 0 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -1,3 +1,4 @@
|
|||
agent_gateway.db
|
||||
node_modules/
|
||||
.env
|
||||
mcp-bridge/node_modules
|
||||
|
|
|
|||
|
|
@ -19,6 +19,17 @@ module.exports = {
|
|||
restart_delay: 2000,
|
||||
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
|
||||
merge_logs: true
|
||||
},
|
||||
{
|
||||
name: 'mcp-bridge-memory',
|
||||
script: 'mcp-bridge/index.js',
|
||||
cwd: '/workspace',
|
||||
env: {
|
||||
BRIDGE_SERVICE_ID: 'memory-mcp',
|
||||
BRIDGE_SECRET: 'dev-secret',
|
||||
BRIDGE_HUB_URL: 'ws://localhost:3000/ws/register',
|
||||
BRIDGE_UPSTREAM_URL: 'https://memory-mcp.dbchat.ai/mcp/sse'
|
||||
}
|
||||
}
|
||||
]
|
||||
};
|
||||
|
|
|
|||
287
mcp-bridge/index.js
Normal file
287
mcp-bridge/index.js
Normal file
|
|
@ -0,0 +1,287 @@
|
|||
'use strict';
|
||||
|
||||
const WebSocket = require('ws');
|
||||
const http = require('http');
|
||||
const https = require('https');
|
||||
const { URL } = require('url');
|
||||
|
||||
const SERVICE_ID = process.env.BRIDGE_SERVICE_ID || 'memory-mcp';
|
||||
const SECRET = process.env.BRIDGE_SECRET || 'dev-secret';
|
||||
const HUB_URL = process.env.BRIDGE_HUB_URL || 'ws://localhost:3000/ws/register';
|
||||
const UPSTREAM_URL = process.env.BRIDGE_UPSTREAM_URL || 'https://memory-mcp.dbchat.ai/mcp/sse';
|
||||
|
||||
// Map of clientSessionId -> { req (IncomingMessage), postUrl, pendingRequests: Map<jsonrpcId -> requestId> }
|
||||
const sessions = new Map();
|
||||
|
||||
let ws = null;
|
||||
let reconnectDelay = 1000;
|
||||
|
||||
function log(...args) {
|
||||
console.log(new Date().toISOString(), '[mcp-bridge]', ...args);
|
||||
}
|
||||
|
||||
function sendToHub(obj) {
|
||||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||||
ws.send(JSON.stringify(obj));
|
||||
}
|
||||
}
|
||||
|
||||
function closeSession(clientSessionId) {
|
||||
const session = sessions.get(clientSessionId);
|
||||
if (session) {
|
||||
log(`Closing upstream session for clientSessionId=${clientSessionId}`);
|
||||
try { session.req.destroy(); } catch (_) {}
|
||||
sessions.delete(clientSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
function openUpstreamSession(clientSessionId, onReady) {
|
||||
const upstreamUrl = new URL(UPSTREAM_URL);
|
||||
const isHttps = upstreamUrl.protocol === 'https:';
|
||||
const lib = isHttps ? https : http;
|
||||
|
||||
const options = {
|
||||
hostname: upstreamUrl.hostname,
|
||||
port: upstreamUrl.port || (isHttps ? 443 : 80),
|
||||
path: upstreamUrl.pathname + upstreamUrl.search,
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Accept': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache'
|
||||
}
|
||||
};
|
||||
|
||||
log(`Opening upstream SSE connection for clientSessionId=${clientSessionId} -> ${UPSTREAM_URL}`);
|
||||
|
||||
const req = lib.request(options, (res) => {
|
||||
if (res.statusCode !== 200) {
|
||||
log(`Upstream SSE returned HTTP ${res.statusCode} for clientSessionId=${clientSessionId}`);
|
||||
req.destroy();
|
||||
onReady(new Error(`Upstream SSE HTTP ${res.statusCode}`));
|
||||
return;
|
||||
}
|
||||
|
||||
const session = {
|
||||
req,
|
||||
postUrl: null,
|
||||
pendingRequests: new Map()
|
||||
};
|
||||
sessions.set(clientSessionId, session);
|
||||
|
||||
let buffer = '';
|
||||
let currentEvent = null;
|
||||
|
||||
res.on('data', (chunk) => {
|
||||
buffer += chunk.toString();
|
||||
const lines = buffer.split('\n');
|
||||
buffer = lines.pop(); // keep incomplete last line
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('event:')) {
|
||||
currentEvent = line.slice(6).trim();
|
||||
} else if (line.startsWith('data:')) {
|
||||
const data = line.slice(5).trim();
|
||||
|
||||
if (currentEvent === 'endpoint') {
|
||||
// Resolve relative path against upstream origin
|
||||
let postUrl;
|
||||
if (data.startsWith('http://') || data.startsWith('https://')) {
|
||||
postUrl = data;
|
||||
} else {
|
||||
postUrl = `${upstreamUrl.protocol}//${upstreamUrl.host}${data}`;
|
||||
}
|
||||
session.postUrl = postUrl;
|
||||
log(`Got upstream POST URL for clientSessionId=${clientSessionId}: ${postUrl}`);
|
||||
onReady(null);
|
||||
} else if (currentEvent === 'message' || currentEvent === null) {
|
||||
// JSON-RPC response from upstream
|
||||
let msg;
|
||||
try {
|
||||
msg = JSON.parse(data);
|
||||
} catch (e) {
|
||||
log(`Failed to parse upstream SSE message: ${data}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.id !== undefined && msg.id !== null) {
|
||||
const key = `${clientSessionId}:${msg.id}`;
|
||||
const requestId = session.pendingRequests.get(key);
|
||||
if (requestId) {
|
||||
session.pendingRequests.delete(key);
|
||||
log(`Relaying response to hub: requestId=${requestId} jsonrpcId=${msg.id}`);
|
||||
sendToHub({
|
||||
type: 'mcp-response',
|
||||
requestId,
|
||||
clientSessionId,
|
||||
payload: msg
|
||||
});
|
||||
} else {
|
||||
log(`No pending request for key=${key}, dropping response`);
|
||||
}
|
||||
} else {
|
||||
// Notification — no requestId needed, just log
|
||||
log(`Received upstream notification for clientSessionId=${clientSessionId}:`, JSON.stringify(msg));
|
||||
}
|
||||
}
|
||||
|
||||
currentEvent = null;
|
||||
} else if (line === '') {
|
||||
currentEvent = null;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
res.on('end', () => {
|
||||
log(`Upstream SSE stream ended for clientSessionId=${clientSessionId}`);
|
||||
closeSession(clientSessionId);
|
||||
});
|
||||
|
||||
res.on('error', (err) => {
|
||||
log(`Upstream SSE stream error for clientSessionId=${clientSessionId}:`, err.message);
|
||||
closeSession(clientSessionId);
|
||||
});
|
||||
});
|
||||
|
||||
req.on('error', (err) => {
|
||||
log(`Upstream SSE request error for clientSessionId=${clientSessionId}:`, err.message);
|
||||
sessions.delete(clientSessionId);
|
||||
onReady(err);
|
||||
});
|
||||
|
||||
req.end();
|
||||
}
|
||||
|
||||
function postToUpstream(postUrl, payload, callback) {
|
||||
const body = JSON.stringify(payload);
|
||||
const url = new URL(postUrl);
|
||||
const isHttps = url.protocol === 'https:';
|
||||
const lib = isHttps ? https : http;
|
||||
|
||||
const options = {
|
||||
hostname: url.hostname,
|
||||
port: url.port || (isHttps ? 443 : 80),
|
||||
path: url.pathname + url.search,
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': Buffer.byteLength(body)
|
||||
}
|
||||
};
|
||||
|
||||
const req = lib.request(options, (res) => {
|
||||
let data = '';
|
||||
res.on('data', (chunk) => { data += chunk; });
|
||||
res.on('end', () => {
|
||||
if (res.statusCode < 200 || res.statusCode >= 300) {
|
||||
callback(new Error(`POST to upstream returned HTTP ${res.statusCode}: ${data}`));
|
||||
} else {
|
||||
callback(null);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
req.on('error', callback);
|
||||
req.write(body);
|
||||
req.end();
|
||||
}
|
||||
|
||||
function handleMcpRequest(msg) {
|
||||
const { requestId, clientSessionId, payload } = msg;
|
||||
|
||||
const forwardRequest = () => {
|
||||
const session = sessions.get(clientSessionId);
|
||||
if (!session || !session.postUrl) {
|
||||
log(`No postUrl available for clientSessionId=${clientSessionId}, dropping request`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Track pending request if payload has an id
|
||||
if (payload.id !== undefined && payload.id !== null) {
|
||||
const key = `${clientSessionId}:${payload.id}`;
|
||||
session.pendingRequests.set(key, requestId);
|
||||
log(`Forwarding request: requestId=${requestId} clientSessionId=${clientSessionId} method=${payload.method} jsonrpcId=${payload.id}`);
|
||||
} else {
|
||||
log(`Forwarding notification: clientSessionId=${clientSessionId} method=${payload.method}`);
|
||||
}
|
||||
|
||||
postToUpstream(session.postUrl, payload, (err) => {
|
||||
if (err) {
|
||||
log(`Error posting to upstream for clientSessionId=${clientSessionId}:`, err.message);
|
||||
if (payload.id !== undefined && payload.id !== null) {
|
||||
const key = `${clientSessionId}:${payload.id}`;
|
||||
session.pendingRequests.delete(key);
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
if (!sessions.has(clientSessionId)) {
|
||||
openUpstreamSession(clientSessionId, (err) => {
|
||||
if (err) {
|
||||
log(`Failed to open upstream session for clientSessionId=${clientSessionId}:`, err.message);
|
||||
return;
|
||||
}
|
||||
forwardRequest();
|
||||
});
|
||||
} else {
|
||||
const session = sessions.get(clientSessionId);
|
||||
if (!session.postUrl) {
|
||||
// Still waiting for endpoint event — retry shortly
|
||||
log(`Session exists but postUrl not yet available for clientSessionId=${clientSessionId}, retrying...`);
|
||||
setTimeout(forwardRequest, 200);
|
||||
} else {
|
||||
forwardRequest();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function connect() {
|
||||
log(`Connecting to hub: ${HUB_URL}`);
|
||||
ws = new WebSocket(HUB_URL);
|
||||
|
||||
ws.on('open', () => {
|
||||
reconnectDelay = 1000;
|
||||
log(`Connected to hub, registering as serviceId=${SERVICE_ID}`);
|
||||
ws.send(JSON.stringify({ type: 'register', serviceId: SERVICE_ID, secret: SECRET }));
|
||||
});
|
||||
|
||||
ws.on('message', (data) => {
|
||||
let msg;
|
||||
try {
|
||||
msg = JSON.parse(data.toString());
|
||||
} catch (e) {
|
||||
log('Failed to parse hub message:', data.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === 'registered') {
|
||||
log(`Registered with hub as serviceId=${SERVICE_ID}`);
|
||||
} else if (msg.type === 'mcp-request') {
|
||||
handleMcpRequest(msg);
|
||||
} else {
|
||||
log('Unknown message from hub:', msg.type);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', (code, reason) => {
|
||||
log(`Hub WebSocket closed (code=${code}), cleaning up all sessions`);
|
||||
for (const clientSessionId of sessions.keys()) {
|
||||
closeSession(clientSessionId);
|
||||
}
|
||||
scheduleReconnect();
|
||||
});
|
||||
|
||||
ws.on('error', (err) => {
|
||||
log('Hub WebSocket error:', err.message);
|
||||
});
|
||||
}
|
||||
|
||||
function scheduleReconnect() {
|
||||
log(`Reconnecting in ${reconnectDelay}ms...`);
|
||||
setTimeout(() => {
|
||||
connect();
|
||||
}, reconnectDelay);
|
||||
reconnectDelay = Math.min(reconnectDelay * 2, 30000);
|
||||
}
|
||||
|
||||
connect();
|
||||
37
mcp-bridge/package-lock.json
generated
Normal file
37
mcp-bridge/package-lock.json
generated
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
{
|
||||
"name": "mcp-bridge",
|
||||
"version": "1.0.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "mcp-bridge",
|
||||
"version": "1.0.0",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"ws": "^8.19.0"
|
||||
}
|
||||
},
|
||||
"node_modules/ws": {
|
||||
"version": "8.19.0",
|
||||
"resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz",
|
||||
"integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=10.0.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"bufferutil": "^4.0.1",
|
||||
"utf-8-validate": ">=5.0.2"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"bufferutil": {
|
||||
"optional": true
|
||||
},
|
||||
"utf-8-validate": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
13
mcp-bridge/package.json
Normal file
13
mcp-bridge/package.json
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
"name": "mcp-bridge",
|
||||
"version": "1.0.0",
|
||||
"description": "Generic MCP SSE bridge for the MCP hub",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"start": "node index.js"
|
||||
},
|
||||
"dependencies": {
|
||||
"ws": "^8.19.0"
|
||||
},
|
||||
"license": "ISC"
|
||||
}
|
||||
|
|
@ -5,6 +5,7 @@
|
|||
"scripts": {
|
||||
"test": "node test/e2e.js",
|
||||
"start": "node src/index.js",
|
||||
"bridge": "node mcp-bridge/index.js",
|
||||
"sample-mcp": "node sample-mcp/index.js",
|
||||
"pm2:start": "pm2 start ecosystem.config.js",
|
||||
"pm2:stop": "pm2 stop all",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue