Add WebSocket observer endpoint for hub state monitoring

- Add /ws/observe WebSocket path for real-time hub state observation
- Implement setupObserveServer(httpServer) function that:
  - Requires secret authentication via observe handshake
  - Sends immediate snapshot of backends on successful auth
  - Streams all EventBus events to connected observers
  - Maintains read-only connections (ignores post-handshake messages)
  - Properly cleans up listeners on disconnect
- Add OBSERVE_SECRET to .env (generate with crypto.randomBytes)
- Export OBSERVE_SECRET from config.js
- Wire setupObserveServer into index.js alongside existing setupWsServer
- Support multiple simultaneous observers
- Modified ws-server.js to allow other upgrade handlers (ws-observe, etc)
- Add OBSERVE_SECRET to ecosystem.config.js env for pm2

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Agent 2026-03-13 12:26:55 +00:00
parent 83cbe608a0
commit fc5fa4e16d
5 changed files with 121 additions and 3 deletions

View file

@ -3,6 +3,9 @@ require('dotenv').config({ path: __dirname + '/.env' });
const HUB_SECRET = process.env.HUB_SECRET;
if (!HUB_SECRET) throw new Error('HUB_SECRET not set in .env');
const OBSERVE_SECRET = process.env.OBSERVE_SECRET;
if (!OBSERVE_SECRET) throw new Error('OBSERVE_SECRET not set in .env');
module.exports = {
apps: [
{
@ -12,7 +15,11 @@ module.exports = {
env: {
NODE_ENV: 'development',
PORT: 3000,
HUB_AUTH: JSON.stringify({ 'sample-mcp': HUB_SECRET, 'memory-mcp': HUB_SECRET })
HUB_AUTH: JSON.stringify({ 'sample-mcp': HUB_SECRET, 'memory-mcp': HUB_SECRET }),
OBSERVE_SECRET: OBSERVE_SECRET,
GOOGLE_CLIENT_ID: process.env.GOOGLE_CLIENT_ID || '',
GOOGLE_CLIENT_SECRET: process.env.GOOGLE_CLIENT_SECRET || '',
OAUTH_ISSUER: process.env.OAUTH_ISSUER || 'https://mcp.arik.work'
},
max_restarts: 10,
restart_delay: 1000,

View file

@ -24,4 +24,5 @@ function getServiceSecret(serviceId) {
module.exports = {
PORT: parseInt(process.env.PORT, 10) || 3000,
getServiceSecret,
OBSERVE_SECRET: process.env.OBSERVE_SECRET,
};

View file

@ -2,9 +2,11 @@ const http = require('http');
const app = require('./server');
const config = require('./config');
const setupWsServer = require('./ws-server');
const setupObserveServer = require('./ws-observe');
const httpServer = http.createServer(app);
setupWsServer(httpServer);
setupObserveServer(httpServer);
httpServer.listen(config.PORT, () => {
console.log(`MCP relay hub listening on port ${config.PORT}`);

109
src/ws-observe.js Normal file
View file

@ -0,0 +1,109 @@
const WebSocket = require('ws');
const config = require('./config');
const registry = require('./backend-registry');
const eventBus = require('./event-bus');
const { log } = require('./event-bus');
function setupObserveServer(httpServer) {
const wss = new WebSocket.Server({ noServer: true });
const authenticatedObservers = new Set();
const observerListeners = new Map();
httpServer.on('upgrade', (req, socket, head) => {
if (req.url !== '/ws/observe') {
return; // Let other handlers process it
}
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit('connection', ws, req);
});
});
wss.on('connection', (ws, req) => {
let authenticated = false;
ws.once('message', (data) => {
let msg;
try {
msg = JSON.parse(data);
} catch {
log('error', '[ws-observe] Failed to parse handshake message');
ws.close(4001, 'unauthorized');
return;
}
if (msg.type !== 'observe' || !msg.secret) {
log('error', '[ws-observe] Invalid handshake: missing type or secret');
ws.close(4001, 'unauthorized');
return;
}
if (msg.secret !== config.OBSERVE_SECRET) {
log('error', `[ws-observe] auth failed from ${req.socket.remoteAddress}`);
ws.close(4001, 'unauthorized');
return;
}
authenticated = true;
authenticatedObservers.add(ws);
log('info', '[ws-observe] observer authenticated');
// Send initial snapshot
const snapshot = {
type: 'snapshot',
ts: new Date().toISOString(),
backends: registry.list(),
activeSessions: []
};
ws.send(JSON.stringify(snapshot));
// Subscribe to all eventBus events and forward to this observer
const eventTypes = ['backend:connected', 'backend:disconnected', 'log', 'session:created', 'session:ended'];
const listeners = {};
eventTypes.forEach((eventName) => {
const listener = (payload) => {
const frame = { type: eventName, ...payload };
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(frame));
} else {
log('warn', `[ws-observe] ws not open for ${eventName}, state: ${ws.readyState}`);
}
};
listeners[eventName] = listener;
eventBus.on(eventName, listener);
});
log('info', `[ws-observe] registered ${eventTypes.length} event listeners`);
observerListeners.set(ws, { listeners, eventTypes });
// Ignore any subsequent messages
ws.on('message', () => {
// Silently ignore - observers are read-only after handshake
});
});
ws.on('close', () => {
if (authenticated) {
authenticatedObservers.delete(ws);
const observerData = observerListeners.get(ws);
if (observerData) {
const { listeners, eventTypes } = observerData;
eventTypes.forEach((eventName) => {
eventBus.removeListener(eventName, listeners[eventName]);
});
observerListeners.delete(ws);
}
log('info', '[ws-observe] observer disconnected');
}
});
ws.on('error', (err) => {
log('error', `[ws-observe] error: ${err.message}`);
});
});
return wss;
}
module.exports = setupObserveServer;

View file

@ -13,8 +13,7 @@ function setupWsServer(httpServer) {
httpServer.on('upgrade', (req, socket, head) => {
if (req.url !== '/ws/register') {
socket.destroy();
return;
return; // Let other handlers process it
}
wss.handleUpgrade(req, socket, head, (ws) => {
wss.emit('connection', ws, req);