From a93bcfffa7237865f0d940d247997f4f219e9c96 Mon Sep 17 00:00:00 2001 From: Agent Date: Thu, 12 Mar 2026 16:46:30 +0000 Subject: [PATCH] mcp-hub-002: WebSocket backend registration and health tracking --- src/backend-registry.js | 47 +++++++++++++++++++++++ src/index.js | 7 +++- src/server.js | 3 +- src/ws-server.js | 83 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 src/backend-registry.js create mode 100644 src/ws-server.js diff --git a/src/backend-registry.js b/src/backend-registry.js new file mode 100644 index 0000000..f83dfcd --- /dev/null +++ b/src/backend-registry.js @@ -0,0 +1,47 @@ +class BackendRegistry { + constructor() { + this.backends = new Map(); + } + + register(serviceId, ws) { + this.backends.set(serviceId, { + ws, + connectedAt: new Date().toISOString(), + lastPing: null, + }); + console.log(`[registry] registered backend: ${serviceId}`); + } + + unregister(serviceId) { + if (this.backends.has(serviceId)) { + this.backends.delete(serviceId); + console.log(`[registry] unregistered backend: ${serviceId}`); + } + } + + get(serviceId) { + const entry = this.backends.get(serviceId); + return entry ? entry.ws : null; + } + + list() { + return Array.from(this.backends.entries()).map(([serviceId, entry]) => ({ + serviceId, + connectedAt: entry.connectedAt, + lastPing: entry.lastPing, + })); + } + + updatePing(serviceId) { + const entry = this.backends.get(serviceId); + if (entry) { + entry.lastPing = new Date().toISOString(); + } + } + + count() { + return this.backends.size; + } +} + +module.exports = new BackendRegistry(); diff --git a/src/index.js b/src/index.js index 3074451..1d8638a 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,11 @@ +const http = require('http'); const app = require('./server'); const config = require('./config'); +const setupWsServer = require('./ws-server'); -app.listen(config.PORT, () => { +const httpServer = http.createServer(app); +setupWsServer(httpServer); + +httpServer.listen(config.PORT, () => { console.log(`MCP relay hub listening on port ${config.PORT}`); }); diff --git a/src/server.js b/src/server.js index 6d30c78..5c5a31e 100644 --- a/src/server.js +++ b/src/server.js @@ -1,4 +1,5 @@ const express = require('express'); +const registry = require('./backend-registry'); const app = express(); @@ -16,7 +17,7 @@ app.get('/health', (req, res) => { res.json({ status: 'ok', uptime: process.uptime(), - connectedBackends: 0, + connectedBackends: registry.count(), }); }); diff --git a/src/ws-server.js b/src/ws-server.js new file mode 100644 index 0000000..aed21c1 --- /dev/null +++ b/src/ws-server.js @@ -0,0 +1,83 @@ +const WebSocket = require('ws'); +const config = require('./config'); +const registry = require('./backend-registry'); + +const PING_INTERVAL_MS = 30000; +const MAX_MISSED_PONGS = 2; + +function setupWsServer(httpServer) { + const wss = new WebSocket.Server({ noServer: true }); + + httpServer.on('upgrade', (req, socket, head) => { + if (req.url !== '/ws/register') { + socket.destroy(); + return; + } + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req); + }); + }); + + wss.on('connection', (ws) => { + let serviceId = null; + let authenticated = false; + let missedPongs = 0; + let pingTimer = null; + + ws.once('message', (data) => { + let msg; + try { + msg = JSON.parse(data); + } catch { + ws.close(4001, 'unauthorized'); + return; + } + + if (msg.type !== 'register' || !msg.serviceId || msg.secret !== config.HUB_SECRET) { + ws.close(4001, 'unauthorized'); + return; + } + + serviceId = msg.serviceId; + authenticated = true; + registry.register(serviceId, ws); + + ws.on('message', (payload) => { + // MCP responses — stored for relay (handled in next task) + }); + + ws.on('pong', () => { + missedPongs = 0; + registry.updatePing(serviceId); + }); + + // Heartbeat + pingTimer = setInterval(() => { + if (missedPongs >= MAX_MISSED_PONGS) { + console.log(`[ws] backend ${serviceId} missed ${missedPongs} pongs, disconnecting`); + ws.terminate(); + return; + } + if (ws.readyState === WebSocket.OPEN) { + missedPongs++; + ws.ping(); + } + }, PING_INTERVAL_MS); + }); + + ws.on('close', () => { + if (pingTimer) clearInterval(pingTimer); + if (authenticated && serviceId) { + registry.unregister(serviceId); + } + }); + + ws.on('error', (err) => { + console.error(`[ws] error on ${serviceId || 'unauthenticated'}:`, err.message); + }); + }); + + return wss; +} + +module.exports = setupWsServer;