diff --git a/src/relay.js b/src/relay.js new file mode 100644 index 0000000..6155902 --- /dev/null +++ b/src/relay.js @@ -0,0 +1,56 @@ +const { v4: uuidv4 } = require('uuid'); +const registry = require('./backend-registry'); + +const pendingRequests = new Map(); + +function sendToBackend(serviceId, message, clientSessionId) { + const ws = registry.get(serviceId); + if (!ws) return null; + + const requestId = uuidv4(); + pendingRequests.set(requestId, { serviceId, clientSessionId }); + + ws.send(JSON.stringify({ + type: 'mcp-request', + requestId, + clientSessionId, + payload: message, + })); + + return requestId; +} + +function handleBackendMessage(serviceId, data, sessions) { + let msg; + try { + msg = JSON.parse(data); + } catch (e) { + console.error('[relay] invalid JSON from backend ' + serviceId); + return; + } + + if (msg.type === 'mcp-response') { + const pending = pendingRequests.get(msg.requestId); + if (!pending) { + console.warn('[relay] no pending request for requestId ' + msg.requestId); + return; + } + pendingRequests.delete(msg.requestId); + + const session = sessions.get(pending.clientSessionId); + if (!session) { + console.warn('[relay] no session for clientSessionId ' + pending.clientSessionId); + return; + } + + session.res.write('event: message\ndata: ' + JSON.stringify(msg.payload) + '\n\n'); + } else if (msg.type === 'mcp-notification') { + for (const [, session] of sessions) { + if (session.serviceId === serviceId) { + session.res.write('event: message\ndata: ' + JSON.stringify(msg.payload) + '\n\n'); + } + } + } +} + +module.exports = { sendToBackend, handleBackendMessage }; diff --git a/src/routes/mcp-proxy.js b/src/routes/mcp-proxy.js new file mode 100644 index 0000000..8e92b89 --- /dev/null +++ b/src/routes/mcp-proxy.js @@ -0,0 +1,51 @@ +const { Router } = require('express'); +const { v4: uuidv4 } = require('uuid'); +const registry = require('../backend-registry'); +const relay = require('../relay'); + +const router = Router(); +const sessions = new Map(); + +router.get('/:serviceId/sse', (req, res) => { + const { serviceId } = req.params; + + if (!registry.get(serviceId)) { + return res.status(502).json({ error: 'backend not connected' }); + } + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.flushHeaders(); + + const clientSessionId = uuidv4(); + sessions.set(clientSessionId, { res, serviceId }); + + res.write('event: endpoint\ndata: /' + serviceId + '/message?sessionId=' + clientSessionId + '\n\n'); + + req.on('close', () => { + sessions.delete(clientSessionId); + console.log('[sse] client disconnected: ' + clientSessionId + ' (' + serviceId + ')'); + }); +}); + +router.post('/:serviceId/message', (req, res) => { + const { serviceId } = req.params; + const { sessionId } = req.query; + + const session = sessions.get(sessionId); + if (!session) { + return res.status(404).json({ error: 'session not found' }); + } + + if (!registry.get(serviceId)) { + return res.status(502).json({ error: 'backend not connected' }); + } + + relay.sendToBackend(serviceId, req.body, sessionId); + + return res.status(202).json({ status: 'accepted' }); +}); + +module.exports = router; +module.exports.sessions = sessions; diff --git a/src/server.js b/src/server.js index 5c5a31e..f5db210 100644 --- a/src/server.js +++ b/src/server.js @@ -1,5 +1,6 @@ const express = require('express'); const registry = require('./backend-registry'); +const mcpProxy = require('./routes/mcp-proxy'); const app = express(); @@ -8,7 +9,7 @@ app.use(express.json()); app.use((req, res, next) => { const start = Date.now(); res.on('finish', () => { - console.log(`${req.method} ${req.path} ${res.statusCode} ${Date.now() - start}ms`); + console.log(req.method + ' ' + req.path + ' ' + res.statusCode + ' ' + (Date.now() - start) + 'ms'); }); next(); }); @@ -21,4 +22,6 @@ app.get('/health', (req, res) => { }); }); +app.use('/', mcpProxy); + module.exports = app; diff --git a/src/ws-server.js b/src/ws-server.js index aed21c1..e3847ef 100644 --- a/src/ws-server.js +++ b/src/ws-server.js @@ -1,6 +1,7 @@ const WebSocket = require('ws'); const config = require('./config'); const registry = require('./backend-registry'); +const relay = require('./relay'); const PING_INTERVAL_MS = 30000; const MAX_MISSED_PONGS = 2; @@ -43,7 +44,8 @@ function setupWsServer(httpServer) { registry.register(serviceId, ws); ws.on('message', (payload) => { - // MCP responses — stored for relay (handled in next task) + const { sessions } = require('./routes/mcp-proxy'); + relay.handleBackendMessage(serviceId, payload, sessions); }); ws.on('pong', () => {