mcp-hub-002: WebSocket backend registration and health tracking
This commit is contained in:
parent
60d92af13b
commit
a93bcfffa7
4 changed files with 138 additions and 2 deletions
47
src/backend-registry.js
Normal file
47
src/backend-registry.js
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
class BackendRegistry {
|
||||
constructor() {
|
||||
this.backends = new Map();
|
||||
}
|
||||
|
||||
register(serviceId, ws) {
|
||||
this.backends.set(serviceId, {
|
||||
ws,
|
||||
connectedAt: new Date().toISOString(),
|
||||
lastPing: null,
|
||||
});
|
||||
console.log(`[registry] registered backend: ${serviceId}`);
|
||||
}
|
||||
|
||||
unregister(serviceId) {
|
||||
if (this.backends.has(serviceId)) {
|
||||
this.backends.delete(serviceId);
|
||||
console.log(`[registry] unregistered backend: ${serviceId}`);
|
||||
}
|
||||
}
|
||||
|
||||
get(serviceId) {
|
||||
const entry = this.backends.get(serviceId);
|
||||
return entry ? entry.ws : null;
|
||||
}
|
||||
|
||||
list() {
|
||||
return Array.from(this.backends.entries()).map(([serviceId, entry]) => ({
|
||||
serviceId,
|
||||
connectedAt: entry.connectedAt,
|
||||
lastPing: entry.lastPing,
|
||||
}));
|
||||
}
|
||||
|
||||
updatePing(serviceId) {
|
||||
const entry = this.backends.get(serviceId);
|
||||
if (entry) {
|
||||
entry.lastPing = new Date().toISOString();
|
||||
}
|
||||
}
|
||||
|
||||
count() {
|
||||
return this.backends.size;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = new BackendRegistry();
|
||||
|
|
@ -1,6 +1,11 @@
|
|||
const http = require('http');
|
||||
const app = require('./server');
|
||||
const config = require('./config');
|
||||
const setupWsServer = require('./ws-server');
|
||||
|
||||
app.listen(config.PORT, () => {
|
||||
const httpServer = http.createServer(app);
|
||||
setupWsServer(httpServer);
|
||||
|
||||
httpServer.listen(config.PORT, () => {
|
||||
console.log(`MCP relay hub listening on port ${config.PORT}`);
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
const express = require('express');
|
||||
const registry = require('./backend-registry');
|
||||
|
||||
const app = express();
|
||||
|
||||
|
|
@ -16,7 +17,7 @@ app.get('/health', (req, res) => {
|
|||
res.json({
|
||||
status: 'ok',
|
||||
uptime: process.uptime(),
|
||||
connectedBackends: 0,
|
||||
connectedBackends: registry.count(),
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
|||
83
src/ws-server.js
Normal file
83
src/ws-server.js
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
const WebSocket = require('ws');
|
||||
const config = require('./config');
|
||||
const registry = require('./backend-registry');
|
||||
|
||||
const PING_INTERVAL_MS = 30000;
|
||||
const MAX_MISSED_PONGS = 2;
|
||||
|
||||
function setupWsServer(httpServer) {
|
||||
const wss = new WebSocket.Server({ noServer: true });
|
||||
|
||||
httpServer.on('upgrade', (req, socket, head) => {
|
||||
if (req.url !== '/ws/register') {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
wss.handleUpgrade(req, socket, head, (ws) => {
|
||||
wss.emit('connection', ws, req);
|
||||
});
|
||||
});
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
let serviceId = null;
|
||||
let authenticated = false;
|
||||
let missedPongs = 0;
|
||||
let pingTimer = null;
|
||||
|
||||
ws.once('message', (data) => {
|
||||
let msg;
|
||||
try {
|
||||
msg = JSON.parse(data);
|
||||
} catch {
|
||||
ws.close(4001, 'unauthorized');
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type !== 'register' || !msg.serviceId || msg.secret !== config.HUB_SECRET) {
|
||||
ws.close(4001, 'unauthorized');
|
||||
return;
|
||||
}
|
||||
|
||||
serviceId = msg.serviceId;
|
||||
authenticated = true;
|
||||
registry.register(serviceId, ws);
|
||||
|
||||
ws.on('message', (payload) => {
|
||||
// MCP responses — stored for relay (handled in next task)
|
||||
});
|
||||
|
||||
ws.on('pong', () => {
|
||||
missedPongs = 0;
|
||||
registry.updatePing(serviceId);
|
||||
});
|
||||
|
||||
// Heartbeat
|
||||
pingTimer = setInterval(() => {
|
||||
if (missedPongs >= MAX_MISSED_PONGS) {
|
||||
console.log(`[ws] backend ${serviceId} missed ${missedPongs} pongs, disconnecting`);
|
||||
ws.terminate();
|
||||
return;
|
||||
}
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
missedPongs++;
|
||||
ws.ping();
|
||||
}
|
||||
}, PING_INTERVAL_MS);
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
if (pingTimer) clearInterval(pingTimer);
|
||||
if (authenticated && serviceId) {
|
||||
registry.unregister(serviceId);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('error', (err) => {
|
||||
console.error(`[ws] error on ${serviceId || 'unauthenticated'}:`, err.message);
|
||||
});
|
||||
});
|
||||
|
||||
return wss;
|
||||
}
|
||||
|
||||
module.exports = setupWsServer;
|
||||
Loading…
Add table
Add a link
Reference in a new issue