diff --git a/src/backend-registry.js b/src/backend-registry.js index f83dfcd..f3faba3 100644 --- a/src/backend-registry.js +++ b/src/backend-registry.js @@ -1,6 +1,7 @@ class BackendRegistry { constructor() { this.backends = new Map(); + this.log = require('./event-bus').log; } register(serviceId, ws) { @@ -9,13 +10,13 @@ class BackendRegistry { connectedAt: new Date().toISOString(), lastPing: null, }); - console.log(`[registry] registered backend: ${serviceId}`); + this.log('info', `[registry] registered backend: ${serviceId}`); } unregister(serviceId) { if (this.backends.has(serviceId)) { this.backends.delete(serviceId); - console.log(`[registry] unregistered backend: ${serviceId}`); + this.log('info', `[registry] unregistered backend: ${serviceId}`); } } diff --git a/src/relay.js b/src/relay.js index 1cc4378..494ba2d 100644 --- a/src/relay.js +++ b/src/relay.js @@ -1,5 +1,7 @@ const { v4: uuidv4 } = require('uuid'); const registry = require('./backend-registry'); +const eventBus = require('./event-bus'); +const { log } = require('./event-bus'); const pendingRequests = new Map(); @@ -29,6 +31,14 @@ function sendToBackend(serviceId, message, clientSessionId, sessions) { payload: message, })); + let method = 'unknown'; + try { + if (typeof message === 'object' && message.method) { + method = message.method; + } + } catch {} + eventBus.emit('message', { serviceId, direction: 'in', method, ts: new Date().toISOString() }); + return requestId; } @@ -37,24 +47,32 @@ function handleBackendMessage(serviceId, data, sessions) { try { msg = JSON.parse(data); } catch (e) { - console.error('[relay] invalid JSON from backend ' + serviceId); + log('error', '[relay] invalid JSON from backend ' + serviceId); return; } if (msg.type === 'mcp-response') { const pending = pendingRequests.get(msg.requestId); if (!pending) { - console.warn('[relay] no pending request for requestId ' + msg.requestId); + log('warn', '[relay] no pending request for requestId ' + msg.requestId); return; } pendingRequests.delete(msg.requestId); const session = sessions.get(pending.clientSessionId); if (!session) { - console.warn('[relay] no session for clientSessionId ' + pending.clientSessionId); + log('warn', '[relay] no session for clientSessionId ' + pending.clientSessionId); return; } + let method = 'unknown'; + try { + if (typeof msg.payload === 'object' && msg.payload.method) { + method = msg.payload.method; + } + } catch {} + eventBus.emit('message', { serviceId, direction: 'out', method, ts: new Date().toISOString() }); + session.res.write('event: message\ndata: ' + JSON.stringify(msg.payload) + '\n\n'); } else if (msg.type === 'mcp-notification') { for (const [, session] of sessions) { diff --git a/src/routes/mcp-proxy.js b/src/routes/mcp-proxy.js index 973f04a..a8d0e0a 100644 --- a/src/routes/mcp-proxy.js +++ b/src/routes/mcp-proxy.js @@ -2,6 +2,8 @@ const { Router } = require('express'); const { v4: uuidv4 } = require('uuid'); const registry = require('../backend-registry'); const relay = require('../relay'); +const eventBus = require('../event-bus'); +const { log } = require('../event-bus'); const router = Router(); const sessions = new Map(); @@ -20,12 +22,14 @@ router.get('/:serviceId/sse', (req, res) => { const clientSessionId = uuidv4(); sessions.set(clientSessionId, { res, serviceId }); + eventBus.emit('session:opened', { serviceId, sessionId: clientSessionId, ts: new Date().toISOString() }); res.write('event: endpoint\ndata: /' + serviceId + '/message?sessionId=' + clientSessionId + '\n\n'); req.on('close', () => { sessions.delete(clientSessionId); - console.log('[sse] client disconnected: ' + clientSessionId + ' (' + serviceId + ')'); + eventBus.emit('session:closed', { serviceId, sessionId: clientSessionId, ts: new Date().toISOString() }); + log('info', '[sse] client disconnected: ' + clientSessionId + ' (' + serviceId + ')'); }); }); diff --git a/src/ws-server.js b/src/ws-server.js index 9f4be65..ed88955 100644 --- a/src/ws-server.js +++ b/src/ws-server.js @@ -43,7 +43,7 @@ function setupWsServer(httpServer) { const expectedSecret = config.getServiceSecret(msg.serviceId); if (expectedSecret === null || msg.secret !== expectedSecret) { - console.log(`[ws] auth failed for serviceId=${msg.serviceId} from ${req.socket.remoteAddress}`); + log('error', `[ws] auth failed for serviceId=${msg.serviceId} from ${req.socket.remoteAddress}`); ws.close(4001, 'unauthorized'); return; } @@ -51,6 +51,7 @@ function setupWsServer(httpServer) { serviceId = msg.serviceId; authenticated = true; registry.register(serviceId, ws); + eventBus.emit('backend:connected', { serviceId, ts: new Date().toISOString() }); ws.on('message', (payload) => { const { sessions } = require('./routes/mcp-proxy'); @@ -65,7 +66,7 @@ function setupWsServer(httpServer) { // Heartbeat pingTimer = setInterval(() => { if (missedPongs >= MAX_MISSED_PONGS) { - console.log(`[ws] backend ${serviceId} missed ${missedPongs} pongs, disconnecting`); + log('warn', `[ws] backend ${serviceId} missed ${missedPongs} pongs, disconnecting`); ws.terminate(); return; } @@ -80,6 +81,7 @@ function setupWsServer(httpServer) { if (pingTimer) clearInterval(pingTimer); if (authenticated && serviceId) { registry.unregister(serviceId); + eventBus.emit('backend:disconnected', { serviceId, ts: new Date().toISOString() }); const { cleanupBackend } = require('./relay'); const { sessions } = require('./routes/mcp-proxy'); cleanupBackend(serviceId, sessions); @@ -87,7 +89,7 @@ function setupWsServer(httpServer) { }); ws.on('error', (err) => { - console.error(`[ws] error on ${serviceId || 'unauthenticated'}:`, err.message); + log('error', `[ws] error on ${serviceId || 'unauthenticated'}: ${err.message}`); }); });