diff --git a/ecosystem.config.js b/ecosystem.config.js index b9dced1..8a804f8 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -3,6 +3,9 @@ require('dotenv').config({ path: __dirname + '/.env' }); const HUB_SECRET = process.env.HUB_SECRET; if (!HUB_SECRET) throw new Error('HUB_SECRET not set in .env'); +const OBSERVE_SECRET = process.env.OBSERVE_SECRET; +if (!OBSERVE_SECRET) throw new Error('OBSERVE_SECRET not set in .env'); + module.exports = { apps: [ { @@ -12,7 +15,11 @@ module.exports = { env: { NODE_ENV: 'development', PORT: 3000, - HUB_AUTH: JSON.stringify({ 'sample-mcp': HUB_SECRET, 'memory-mcp': HUB_SECRET }) + HUB_AUTH: JSON.stringify({ 'sample-mcp': HUB_SECRET, 'memory-mcp': HUB_SECRET }), + OBSERVE_SECRET: OBSERVE_SECRET, + GOOGLE_CLIENT_ID: process.env.GOOGLE_CLIENT_ID || '', + GOOGLE_CLIENT_SECRET: process.env.GOOGLE_CLIENT_SECRET || '', + OAUTH_ISSUER: process.env.OAUTH_ISSUER || 'https://mcp.arik.work' }, max_restarts: 10, restart_delay: 1000, diff --git a/src/config.js b/src/config.js index ed90dc8..f610d2a 100644 --- a/src/config.js +++ b/src/config.js @@ -24,4 +24,5 @@ function getServiceSecret(serviceId) { module.exports = { PORT: parseInt(process.env.PORT, 10) || 3000, getServiceSecret, + OBSERVE_SECRET: process.env.OBSERVE_SECRET, }; diff --git a/src/index.js b/src/index.js index 1d8638a..327c4ca 100644 --- a/src/index.js +++ b/src/index.js @@ -2,9 +2,11 @@ const http = require('http'); const app = require('./server'); const config = require('./config'); const setupWsServer = require('./ws-server'); +const setupObserveServer = require('./ws-observe'); const httpServer = http.createServer(app); setupWsServer(httpServer); +setupObserveServer(httpServer); httpServer.listen(config.PORT, () => { console.log(`MCP relay hub listening on port ${config.PORT}`); diff --git a/src/ws-observe.js b/src/ws-observe.js new file mode 100644 index 0000000..27e3d69 --- /dev/null +++ b/src/ws-observe.js @@ -0,0 +1,109 @@ +const WebSocket = require('ws'); +const config = require('./config'); +const registry = require('./backend-registry'); +const eventBus = require('./event-bus'); +const { log } = require('./event-bus'); + +function setupObserveServer(httpServer) { + const wss = new WebSocket.Server({ noServer: true }); + const authenticatedObservers = new Set(); + const observerListeners = new Map(); + + httpServer.on('upgrade', (req, socket, head) => { + if (req.url !== '/ws/observe') { + return; // Let other handlers process it + } + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit('connection', ws, req); + }); + }); + + wss.on('connection', (ws, req) => { + let authenticated = false; + + ws.once('message', (data) => { + let msg; + try { + msg = JSON.parse(data); + } catch { + log('error', '[ws-observe] Failed to parse handshake message'); + ws.close(4001, 'unauthorized'); + return; + } + + if (msg.type !== 'observe' || !msg.secret) { + log('error', '[ws-observe] Invalid handshake: missing type or secret'); + ws.close(4001, 'unauthorized'); + return; + } + + if (msg.secret !== config.OBSERVE_SECRET) { + log('error', `[ws-observe] auth failed from ${req.socket.remoteAddress}`); + ws.close(4001, 'unauthorized'); + return; + } + + authenticated = true; + authenticatedObservers.add(ws); + log('info', '[ws-observe] observer authenticated'); + + // Send initial snapshot + const snapshot = { + type: 'snapshot', + ts: new Date().toISOString(), + backends: registry.list(), + activeSessions: [] + }; + ws.send(JSON.stringify(snapshot)); + + // Subscribe to all eventBus events and forward to this observer + const eventTypes = ['backend:connected', 'backend:disconnected', 'log', 'session:created', 'session:ended']; + const listeners = {}; + + eventTypes.forEach((eventName) => { + const listener = (payload) => { + const frame = { type: eventName, ...payload }; + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(frame)); + } else { + log('warn', `[ws-observe] ws not open for ${eventName}, state: ${ws.readyState}`); + } + }; + listeners[eventName] = listener; + eventBus.on(eventName, listener); + }); + + log('info', `[ws-observe] registered ${eventTypes.length} event listeners`); + + observerListeners.set(ws, { listeners, eventTypes }); + + // Ignore any subsequent messages + ws.on('message', () => { + // Silently ignore - observers are read-only after handshake + }); + }); + + ws.on('close', () => { + if (authenticated) { + authenticatedObservers.delete(ws); + const observerData = observerListeners.get(ws); + if (observerData) { + const { listeners, eventTypes } = observerData; + eventTypes.forEach((eventName) => { + eventBus.removeListener(eventName, listeners[eventName]); + }); + observerListeners.delete(ws); + } + log('info', '[ws-observe] observer disconnected'); + } + }); + + ws.on('error', (err) => { + log('error', `[ws-observe] error: ${err.message}`); + }); + }); + + return wss; +} + +module.exports = setupObserveServer; diff --git a/src/ws-server.js b/src/ws-server.js index ed88955..23bdc8f 100644 --- a/src/ws-server.js +++ b/src/ws-server.js @@ -13,8 +13,7 @@ function setupWsServer(httpServer) { httpServer.on('upgrade', (req, socket, head) => { if (req.url !== '/ws/register') { - socket.destroy(); - return; + return; // Let other handlers process it } wss.handleUpgrade(req, socket, head, (ws) => { wss.emit('connection', ws, req);