mcp-hub-010: EventBus — central event emitter for hub telemetry

This commit is contained in:
Agent 2026-03-13 11:52:48 +00:00
parent 3028e787ac
commit 84a804c57a
4 changed files with 34 additions and 9 deletions

View file

@ -1,6 +1,7 @@
class BackendRegistry { class BackendRegistry {
constructor() { constructor() {
this.backends = new Map(); this.backends = new Map();
this.log = require('./event-bus').log;
} }
register(serviceId, ws) { register(serviceId, ws) {
@ -9,13 +10,13 @@ class BackendRegistry {
connectedAt: new Date().toISOString(), connectedAt: new Date().toISOString(),
lastPing: null, lastPing: null,
}); });
console.log(`[registry] registered backend: ${serviceId}`); this.log('info', `[registry] registered backend: ${serviceId}`);
} }
unregister(serviceId) { unregister(serviceId) {
if (this.backends.has(serviceId)) { if (this.backends.has(serviceId)) {
this.backends.delete(serviceId); this.backends.delete(serviceId);
console.log(`[registry] unregistered backend: ${serviceId}`); this.log('info', `[registry] unregistered backend: ${serviceId}`);
} }
} }

View file

@ -1,5 +1,7 @@
const { v4: uuidv4 } = require('uuid'); const { v4: uuidv4 } = require('uuid');
const registry = require('./backend-registry'); const registry = require('./backend-registry');
const eventBus = require('./event-bus');
const { log } = require('./event-bus');
const pendingRequests = new Map(); const pendingRequests = new Map();
@ -29,6 +31,14 @@ function sendToBackend(serviceId, message, clientSessionId, sessions) {
payload: message, payload: message,
})); }));
let method = 'unknown';
try {
if (typeof message === 'object' && message.method) {
method = message.method;
}
} catch {}
eventBus.emit('message', { serviceId, direction: 'in', method, ts: new Date().toISOString() });
return requestId; return requestId;
} }
@ -37,24 +47,32 @@ function handleBackendMessage(serviceId, data, sessions) {
try { try {
msg = JSON.parse(data); msg = JSON.parse(data);
} catch (e) { } catch (e) {
console.error('[relay] invalid JSON from backend ' + serviceId); log('error', '[relay] invalid JSON from backend ' + serviceId);
return; return;
} }
if (msg.type === 'mcp-response') { if (msg.type === 'mcp-response') {
const pending = pendingRequests.get(msg.requestId); const pending = pendingRequests.get(msg.requestId);
if (!pending) { if (!pending) {
console.warn('[relay] no pending request for requestId ' + msg.requestId); log('warn', '[relay] no pending request for requestId ' + msg.requestId);
return; return;
} }
pendingRequests.delete(msg.requestId); pendingRequests.delete(msg.requestId);
const session = sessions.get(pending.clientSessionId); const session = sessions.get(pending.clientSessionId);
if (!session) { if (!session) {
console.warn('[relay] no session for clientSessionId ' + pending.clientSessionId); log('warn', '[relay] no session for clientSessionId ' + pending.clientSessionId);
return; return;
} }
let method = 'unknown';
try {
if (typeof msg.payload === 'object' && msg.payload.method) {
method = msg.payload.method;
}
} catch {}
eventBus.emit('message', { serviceId, direction: 'out', method, ts: new Date().toISOString() });
session.res.write('event: message\ndata: ' + JSON.stringify(msg.payload) + '\n\n'); session.res.write('event: message\ndata: ' + JSON.stringify(msg.payload) + '\n\n');
} else if (msg.type === 'mcp-notification') { } else if (msg.type === 'mcp-notification') {
for (const [, session] of sessions) { for (const [, session] of sessions) {

View file

@ -2,6 +2,8 @@ const { Router } = require('express');
const { v4: uuidv4 } = require('uuid'); const { v4: uuidv4 } = require('uuid');
const registry = require('../backend-registry'); const registry = require('../backend-registry');
const relay = require('../relay'); const relay = require('../relay');
const eventBus = require('../event-bus');
const { log } = require('../event-bus');
const router = Router(); const router = Router();
const sessions = new Map(); const sessions = new Map();
@ -20,12 +22,14 @@ router.get('/:serviceId/sse', (req, res) => {
const clientSessionId = uuidv4(); const clientSessionId = uuidv4();
sessions.set(clientSessionId, { res, serviceId }); sessions.set(clientSessionId, { res, serviceId });
eventBus.emit('session:opened', { serviceId, sessionId: clientSessionId, ts: new Date().toISOString() });
res.write('event: endpoint\ndata: /' + serviceId + '/message?sessionId=' + clientSessionId + '\n\n'); res.write('event: endpoint\ndata: /' + serviceId + '/message?sessionId=' + clientSessionId + '\n\n');
req.on('close', () => { req.on('close', () => {
sessions.delete(clientSessionId); sessions.delete(clientSessionId);
console.log('[sse] client disconnected: ' + clientSessionId + ' (' + serviceId + ')'); eventBus.emit('session:closed', { serviceId, sessionId: clientSessionId, ts: new Date().toISOString() });
log('info', '[sse] client disconnected: ' + clientSessionId + ' (' + serviceId + ')');
}); });
}); });

View file

@ -43,7 +43,7 @@ function setupWsServer(httpServer) {
const expectedSecret = config.getServiceSecret(msg.serviceId); const expectedSecret = config.getServiceSecret(msg.serviceId);
if (expectedSecret === null || msg.secret !== expectedSecret) { if (expectedSecret === null || msg.secret !== expectedSecret) {
console.log(`[ws] auth failed for serviceId=${msg.serviceId} from ${req.socket.remoteAddress}`); log('error', `[ws] auth failed for serviceId=${msg.serviceId} from ${req.socket.remoteAddress}`);
ws.close(4001, 'unauthorized'); ws.close(4001, 'unauthorized');
return; return;
} }
@ -51,6 +51,7 @@ function setupWsServer(httpServer) {
serviceId = msg.serviceId; serviceId = msg.serviceId;
authenticated = true; authenticated = true;
registry.register(serviceId, ws); registry.register(serviceId, ws);
eventBus.emit('backend:connected', { serviceId, ts: new Date().toISOString() });
ws.on('message', (payload) => { ws.on('message', (payload) => {
const { sessions } = require('./routes/mcp-proxy'); const { sessions } = require('./routes/mcp-proxy');
@ -65,7 +66,7 @@ function setupWsServer(httpServer) {
// Heartbeat // Heartbeat
pingTimer = setInterval(() => { pingTimer = setInterval(() => {
if (missedPongs >= MAX_MISSED_PONGS) { if (missedPongs >= MAX_MISSED_PONGS) {
console.log(`[ws] backend ${serviceId} missed ${missedPongs} pongs, disconnecting`); log('warn', `[ws] backend ${serviceId} missed ${missedPongs} pongs, disconnecting`);
ws.terminate(); ws.terminate();
return; return;
} }
@ -80,6 +81,7 @@ function setupWsServer(httpServer) {
if (pingTimer) clearInterval(pingTimer); if (pingTimer) clearInterval(pingTimer);
if (authenticated && serviceId) { if (authenticated && serviceId) {
registry.unregister(serviceId); registry.unregister(serviceId);
eventBus.emit('backend:disconnected', { serviceId, ts: new Date().toISOString() });
const { cleanupBackend } = require('./relay'); const { cleanupBackend } = require('./relay');
const { sessions } = require('./routes/mcp-proxy'); const { sessions } = require('./routes/mcp-proxy');
cleanupBackend(serviceId, sessions); cleanupBackend(serviceId, sessions);
@ -87,7 +89,7 @@ function setupWsServer(httpServer) {
}); });
ws.on('error', (err) => { ws.on('error', (err) => {
console.error(`[ws] error on ${serviceId || 'unauthenticated'}:`, err.message); log('error', `[ws] error on ${serviceId || 'unauthenticated'}: ${err.message}`);
}); });
}); });