diff --git a/.gitignore b/.gitignore index 4cd0ae1..e6c5e5d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ agent_gateway.db -.env -mcp-bridge/node_modules node_modules/ +.env diff --git a/.vscode/mcp.json b/.vscode/mcp.json deleted file mode 100644 index 7cc2ea9..0000000 --- a/.vscode/mcp.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "servers": { - "test-mcp": { - "url": "https://mcp.arik.work/sample-mcp/sse", - "type": "sse" - } - } - , - "inputs": [] -} - - diff --git a/ecosystem.config.js b/ecosystem.config.js deleted file mode 100644 index b9dced1..0000000 --- a/ecosystem.config.js +++ /dev/null @@ -1,51 +0,0 @@ -require('dotenv').config({ path: __dirname + '/.env' }); - -const HUB_SECRET = process.env.HUB_SECRET; -if (!HUB_SECRET) throw new Error('HUB_SECRET not set in .env'); - -module.exports = { - apps: [ - { - name: 'mcp-hub', - script: 'src/index.js', - cwd: '/workspace', - env: { - NODE_ENV: 'development', - PORT: 3000, - HUB_AUTH: JSON.stringify({ 'sample-mcp': HUB_SECRET, 'memory-mcp': HUB_SECRET }) - }, - max_restarts: 10, - restart_delay: 1000, - log_date_format: 'YYYY-MM-DD HH:mm:ss Z', - merge_logs: true - }, - { - name: 'sample-mcp', - script: 'sample-mcp/index.js', - cwd: '/workspace', - env: { - NODE_ENV: 'development', - MCP_SECRET: HUB_SECRET - }, - max_restarts: 10, - restart_delay: 2000, - log_date_format: 'YYYY-MM-DD HH:mm:ss Z', - merge_logs: true - }, - { - name: 'mcp-bridge-memory', - script: 'mcp-bridge/index.js', - cwd: '/workspace', - env: { - BRIDGE_SERVICE_ID: 'memory-mcp', - BRIDGE_SECRET: HUB_SECRET, - BRIDGE_HUB_URL: 'ws://localhost:3000/ws/register', - BRIDGE_UPSTREAM_URL: 'https://memory-mcp.dbchat.ai/mcp/sse' - }, - max_restarts: 10, - restart_delay: 2000, - log_date_format: 'YYYY-MM-DD HH:mm:ss Z', - merge_logs: true - } - ] -}; diff --git a/mcp-bridge/index.js b/mcp-bridge/index.js deleted file mode 100644 index d720347..0000000 --- a/mcp-bridge/index.js +++ /dev/null @@ -1,287 +0,0 @@ -'use strict'; - -const WebSocket = require('ws'); -const http = require('http'); -const https = require('https'); -const { URL } = require('url'); - -const SERVICE_ID = process.env.BRIDGE_SERVICE_ID || 'memory-mcp'; -const SECRET = process.env.BRIDGE_SECRET || 'dev-secret'; -const HUB_URL = process.env.BRIDGE_HUB_URL || 'ws://localhost:3000/ws/register'; -const UPSTREAM_URL = process.env.BRIDGE_UPSTREAM_URL || 'https://memory-mcp.dbchat.ai/mcp/sse'; - -// Map of clientSessionId -> { req (IncomingMessage), postUrl, pendingRequests: Map requestId> } -const sessions = new Map(); - -let ws = null; -let reconnectDelay = 1000; - -function log(...args) { - console.log(new Date().toISOString(), '[mcp-bridge]', ...args); -} - -function sendToHub(obj) { - if (ws && ws.readyState === WebSocket.OPEN) { - ws.send(JSON.stringify(obj)); - } -} - -function closeSession(clientSessionId) { - const session = sessions.get(clientSessionId); - if (session) { - log(`Closing upstream session for clientSessionId=${clientSessionId}`); - try { session.req.destroy(); } catch (_) {} - sessions.delete(clientSessionId); - } -} - -function openUpstreamSession(clientSessionId, onReady) { - const upstreamUrl = new URL(UPSTREAM_URL); - const isHttps = upstreamUrl.protocol === 'https:'; - const lib = isHttps ? https : http; - - const options = { - hostname: upstreamUrl.hostname, - port: upstreamUrl.port || (isHttps ? 443 : 80), - path: upstreamUrl.pathname + upstreamUrl.search, - method: 'GET', - headers: { - 'Accept': 'text/event-stream', - 'Cache-Control': 'no-cache' - } - }; - - log(`Opening upstream SSE connection for clientSessionId=${clientSessionId} -> ${UPSTREAM_URL}`); - - const req = lib.request(options, (res) => { - if (res.statusCode !== 200) { - log(`Upstream SSE returned HTTP ${res.statusCode} for clientSessionId=${clientSessionId}`); - req.destroy(); - onReady(new Error(`Upstream SSE HTTP ${res.statusCode}`)); - return; - } - - const session = { - req, - postUrl: null, - pendingRequests: new Map() - }; - sessions.set(clientSessionId, session); - - let buffer = ''; - let currentEvent = null; - - res.on('data', (chunk) => { - buffer += chunk.toString(); - const lines = buffer.split('\n'); - buffer = lines.pop(); // keep incomplete last line - - for (const line of lines) { - if (line.startsWith('event:')) { - currentEvent = line.slice(6).trim(); - } else if (line.startsWith('data:')) { - const data = line.slice(5).trim(); - - if (currentEvent === 'endpoint') { - // Resolve relative path against upstream origin - let postUrl; - if (data.startsWith('http://') || data.startsWith('https://')) { - postUrl = data; - } else { - postUrl = `${upstreamUrl.protocol}//${upstreamUrl.host}${data}`; - } - session.postUrl = postUrl; - log(`Got upstream POST URL for clientSessionId=${clientSessionId}: ${postUrl}`); - onReady(null); - } else if (currentEvent === 'message' || currentEvent === null) { - // JSON-RPC response from upstream - let msg; - try { - msg = JSON.parse(data); - } catch (e) { - log(`Failed to parse upstream SSE message: ${data}`); - return; - } - - if (msg.id !== undefined && msg.id !== null) { - const key = `${clientSessionId}:${msg.id}`; - const requestId = session.pendingRequests.get(key); - if (requestId) { - session.pendingRequests.delete(key); - log(`Relaying response to hub: requestId=${requestId} jsonrpcId=${msg.id}`); - sendToHub({ - type: 'mcp-response', - requestId, - clientSessionId, - payload: msg - }); - } else { - log(`No pending request for key=${key}, dropping response`); - } - } else { - // Notification — no requestId needed, just log - log(`Received upstream notification for clientSessionId=${clientSessionId}:`, JSON.stringify(msg)); - } - } - - currentEvent = null; - } else if (line === '') { - currentEvent = null; - } - } - }); - - res.on('end', () => { - log(`Upstream SSE stream ended for clientSessionId=${clientSessionId}`); - closeSession(clientSessionId); - }); - - res.on('error', (err) => { - log(`Upstream SSE stream error for clientSessionId=${clientSessionId}:`, err.message); - closeSession(clientSessionId); - }); - }); - - req.on('error', (err) => { - log(`Upstream SSE request error for clientSessionId=${clientSessionId}:`, err.message); - sessions.delete(clientSessionId); - onReady(err); - }); - - req.end(); -} - -function postToUpstream(postUrl, payload, callback) { - const body = JSON.stringify(payload); - const url = new URL(postUrl); - const isHttps = url.protocol === 'https:'; - const lib = isHttps ? https : http; - - const options = { - hostname: url.hostname, - port: url.port || (isHttps ? 443 : 80), - path: url.pathname + url.search, - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Content-Length': Buffer.byteLength(body) - } - }; - - const req = lib.request(options, (res) => { - let data = ''; - res.on('data', (chunk) => { data += chunk; }); - res.on('end', () => { - if (res.statusCode < 200 || res.statusCode >= 300) { - callback(new Error(`POST to upstream returned HTTP ${res.statusCode}: ${data}`)); - } else { - callback(null); - } - }); - }); - - req.on('error', callback); - req.write(body); - req.end(); -} - -function handleMcpRequest(msg) { - const { requestId, clientSessionId, payload } = msg; - - const forwardRequest = () => { - const session = sessions.get(clientSessionId); - if (!session || !session.postUrl) { - log(`No postUrl available for clientSessionId=${clientSessionId}, dropping request`); - return; - } - - // Track pending request if payload has an id - if (payload.id !== undefined && payload.id !== null) { - const key = `${clientSessionId}:${payload.id}`; - session.pendingRequests.set(key, requestId); - log(`Forwarding request: requestId=${requestId} clientSessionId=${clientSessionId} method=${payload.method} jsonrpcId=${payload.id}`); - } else { - log(`Forwarding notification: clientSessionId=${clientSessionId} method=${payload.method}`); - } - - postToUpstream(session.postUrl, payload, (err) => { - if (err) { - log(`Error posting to upstream for clientSessionId=${clientSessionId}:`, err.message); - if (payload.id !== undefined && payload.id !== null) { - const key = `${clientSessionId}:${payload.id}`; - session.pendingRequests.delete(key); - } - } - }); - }; - - if (!sessions.has(clientSessionId)) { - openUpstreamSession(clientSessionId, (err) => { - if (err) { - log(`Failed to open upstream session for clientSessionId=${clientSessionId}:`, err.message); - return; - } - forwardRequest(); - }); - } else { - const session = sessions.get(clientSessionId); - if (!session.postUrl) { - // Still waiting for endpoint event — retry shortly - log(`Session exists but postUrl not yet available for clientSessionId=${clientSessionId}, retrying...`); - setTimeout(forwardRequest, 200); - } else { - forwardRequest(); - } - } -} - -function connect() { - log(`Connecting to hub: ${HUB_URL}`); - ws = new WebSocket(HUB_URL); - - ws.on('open', () => { - reconnectDelay = 1000; - log(`Connected to hub, registering as serviceId=${SERVICE_ID}`); - ws.send(JSON.stringify({ type: 'register', serviceId: SERVICE_ID, secret: SECRET })); - }); - - ws.on('message', (data) => { - let msg; - try { - msg = JSON.parse(data.toString()); - } catch (e) { - log('Failed to parse hub message:', data.toString()); - return; - } - - if (msg.type === 'registered') { - log(`Registered with hub as serviceId=${SERVICE_ID}`); - } else if (msg.type === 'mcp-request') { - handleMcpRequest(msg); - } else { - log('Unknown message from hub:', msg.type); - } - }); - - ws.on('close', (code, reason) => { - log(`Hub WebSocket closed (code=${code}), cleaning up all sessions`); - for (const clientSessionId of sessions.keys()) { - closeSession(clientSessionId); - } - scheduleReconnect(); - }); - - ws.on('error', (err) => { - log('Hub WebSocket error:', err.message); - }); -} - -function scheduleReconnect() { - log(`Reconnecting in ${reconnectDelay}ms...`); - setTimeout(() => { - connect(); - }, reconnectDelay); - reconnectDelay = Math.min(reconnectDelay * 2, 30000); -} - -connect(); diff --git a/mcp-bridge/package-lock.json b/mcp-bridge/package-lock.json deleted file mode 100644 index 3ff6b5e..0000000 --- a/mcp-bridge/package-lock.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "name": "mcp-bridge", - "version": "1.0.0", - "lockfileVersion": 3, - "requires": true, - "packages": { - "": { - "name": "mcp-bridge", - "version": "1.0.0", - "license": "ISC", - "dependencies": { - "ws": "^8.19.0" - } - }, - "node_modules/ws": { - "version": "8.19.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", - "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", - "license": "MIT", - "engines": { - "node": ">=10.0.0" - }, - "peerDependencies": { - "bufferutil": "^4.0.1", - "utf-8-validate": ">=5.0.2" - }, - "peerDependenciesMeta": { - "bufferutil": { - "optional": true - }, - "utf-8-validate": { - "optional": true - } - } - } - } -} diff --git a/mcp-bridge/package.json b/mcp-bridge/package.json deleted file mode 100644 index a5e882a..0000000 --- a/mcp-bridge/package.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "name": "mcp-bridge", - "version": "1.0.0", - "description": "Generic MCP SSE bridge for the MCP hub", - "main": "index.js", - "scripts": { - "start": "node index.js" - }, - "dependencies": { - "ws": "^8.19.0" - }, - "license": "ISC" -} diff --git a/package-lock.json b/package-lock.json index 5565d3c..6566ff3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,6 @@ "version": "1.0.0", "license": "ISC", "dependencies": { - "dotenv": "^17.3.1", "express": "^5.2.1", "uuid": "^13.0.0", "ws": "^8.19.0" @@ -156,18 +155,6 @@ "node": ">= 0.8" } }, - "node_modules/dotenv": { - "version": "17.3.1", - "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.3.1.tgz", - "integrity": "sha512-IO8C/dzEb6O3F9/twg6ZLXz164a2fhTnEWb95H23Dm4OuN+92NmEAlTrupP9VW6Jm3sO26tQlqyvyi4CsnY9GA==", - "license": "BSD-2-Clause", - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://dotenvx.com" - } - }, "node_modules/dunder-proto": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/dunder-proto/-/dunder-proto-1.0.1.tgz", diff --git a/package.json b/package.json index b4d6406..c43456e 100644 --- a/package.json +++ b/package.json @@ -5,19 +5,13 @@ "scripts": { "test": "node test/e2e.js", "start": "node src/index.js", - "bridge": "node mcp-bridge/index.js", - "sample-mcp": "node sample-mcp/index.js", - "pm2:start": "pm2 start ecosystem.config.js", - "pm2:stop": "pm2 stop all", - "pm2:logs": "pm2 logs", - "pm2:status": "pm2 status" + "sample-mcp": "node sample-mcp/index.js" }, "keywords": [], "author": "", "license": "ISC", "description": "", "dependencies": { - "dotenv": "^17.3.1", "express": "^5.2.1", "uuid": "^13.0.0", "ws": "^8.19.0" diff --git a/sample-mcp/index.js b/sample-mcp/index.js index 41c5aba..7eb3ef1 100644 --- a/sample-mcp/index.js +++ b/sample-mcp/index.js @@ -5,7 +5,7 @@ const WebSocket = require('ws'); const HUB_URL = process.env.HUB_URL || 'wss://mcp.arik.work/ws/register'; const HUB_URL_FALLBACK = 'ws://mcp.arik.work/ws/register'; const SERVICE_ID = 'sample-mcp'; -const SECRET = process.env.MCP_SECRET || 'dev-secret'; +const SECRET = 'dev-secret'; let reconnectDelay = 1000; let ws = null; diff --git a/src/config.js b/src/config.js index ed90dc8..c34c9c5 100644 --- a/src/config.js +++ b/src/config.js @@ -1,27 +1,4 @@ -const DEV_SECRET = 'dev-secret'; - -let serviceAuthMap = null; -if (process.env.HUB_AUTH) { - try { - serviceAuthMap = JSON.parse(process.env.HUB_AUTH); - } catch (e) { - console.error('[config] Failed to parse HUB_AUTH JSON:', e.message); - process.exit(1); - } -} else if (process.env.NODE_ENV === 'production') { - console.error('[config] HUB_AUTH must be set in production'); - process.exit(1); -} - -function getServiceSecret(serviceId) { - if (serviceAuthMap) { - return serviceAuthMap[serviceId] !== undefined ? serviceAuthMap[serviceId] : null; - } - // Dev fallback: accept dev-secret for any service - return DEV_SECRET; -} - module.exports = { PORT: parseInt(process.env.PORT, 10) || 3000, - getServiceSecret, + HUB_SECRET: process.env.HUB_SECRET || 'dev-secret', }; diff --git a/src/event-bus.js b/src/event-bus.js deleted file mode 100644 index 6d8e37a..0000000 --- a/src/event-bus.js +++ /dev/null @@ -1,12 +0,0 @@ -const { EventEmitter } = require('events'); - -const eventBus = new EventEmitter(); -eventBus.setMaxListeners(50); - -function log(level, message) { - console[level]?.(message) ?? console.log(message); - eventBus.emit('log', { level, message, ts: new Date().toISOString() }); -} - -module.exports = eventBus; -module.exports.log = log; diff --git a/src/relay.js b/src/relay.js index 1cc4378..6155902 100644 --- a/src/relay.js +++ b/src/relay.js @@ -3,21 +3,9 @@ const registry = require('./backend-registry'); const pendingRequests = new Map(); -function sendToBackend(serviceId, message, clientSessionId, sessions) { +function sendToBackend(serviceId, message, clientSessionId) { const ws = registry.get(serviceId); - if (!ws) { - if (sessions && message.id !== undefined && message.id !== null) { - const session = sessions.get(clientSessionId); - if (session) { - session.res.write('event: message\ndata: ' + JSON.stringify({ - jsonrpc: '2.0', - id: message.id, - error: { code: -32603, message: 'Backend unavailable' } - }) + '\n\n'); - } - } - return null; - } + if (!ws) return null; const requestId = uuidv4(); pendingRequests.set(requestId, { serviceId, clientSessionId }); @@ -65,37 +53,4 @@ function handleBackendMessage(serviceId, data, sessions) { } } -function cleanupBackend(serviceId, sessions) { - // Clean up pending requests for this backend - for (const [requestId, pending] of pendingRequests) { - if (pending.serviceId === serviceId) { - const session = sessions.get(pending.clientSessionId); - if (session) { - // Write error notification event - session.res.write('event: message\ndata: ' + JSON.stringify({ - jsonrpc: '2.0', - id: null, - error: { code: -32603, message: 'Backend disconnected' } - }) + '\n\n'); - } - pendingRequests.delete(requestId); - } - } - - // Clean up active SSE sessions for this backend - for (const [clientSessionId, session] of sessions) { - if (session.serviceId === serviceId) { - // Write final error event - session.res.write('event: message\ndata: ' + JSON.stringify({ - jsonrpc: '2.0', - id: null, - error: { code: -32603, message: 'Backend disconnected' } - }) + '\n\n'); - // End the response - session.res.end(); - sessions.delete(clientSessionId); - } - } -} - -module.exports = { sendToBackend, handleBackendMessage, cleanupBackend }; +module.exports = { sendToBackend, handleBackendMessage }; diff --git a/src/routes/mcp-proxy.js b/src/routes/mcp-proxy.js index 973f04a..8e92b89 100644 --- a/src/routes/mcp-proxy.js +++ b/src/routes/mcp-proxy.js @@ -42,7 +42,7 @@ router.post('/:serviceId/message', (req, res) => { return res.status(502).json({ error: 'backend not connected' }); } - relay.sendToBackend(serviceId, req.body, sessionId, sessions); + relay.sendToBackend(serviceId, req.body, sessionId); return res.status(202).json({ status: 'accepted' }); }); diff --git a/src/ws-server.js b/src/ws-server.js index 9f4be65..e3847ef 100644 --- a/src/ws-server.js +++ b/src/ws-server.js @@ -2,8 +2,6 @@ const WebSocket = require('ws'); const config = require('./config'); const registry = require('./backend-registry'); const relay = require('./relay'); -const eventBus = require('./event-bus'); -const { log } = require('./event-bus'); const PING_INTERVAL_MS = 30000; const MAX_MISSED_PONGS = 2; @@ -21,7 +19,7 @@ function setupWsServer(httpServer) { }); }); - wss.on('connection', (ws, req) => { + wss.on('connection', (ws) => { let serviceId = null; let authenticated = false; let missedPongs = 0; @@ -36,14 +34,7 @@ function setupWsServer(httpServer) { return; } - if (msg.type !== 'register' || !msg.serviceId) { - ws.close(4001, 'unauthorized'); - return; - } - - const expectedSecret = config.getServiceSecret(msg.serviceId); - if (expectedSecret === null || msg.secret !== expectedSecret) { - console.log(`[ws] auth failed for serviceId=${msg.serviceId} from ${req.socket.remoteAddress}`); + if (msg.type !== 'register' || !msg.serviceId || msg.secret !== config.HUB_SECRET) { ws.close(4001, 'unauthorized'); return; } @@ -80,9 +71,6 @@ function setupWsServer(httpServer) { if (pingTimer) clearInterval(pingTimer); if (authenticated && serviceId) { registry.unregister(serviceId); - const { cleanupBackend } = require('./relay'); - const { sessions } = require('./routes/mcp-proxy'); - cleanupBackend(serviceId, sessions); } });