From 431a68d690b9ae582e762ca67662ff12510cf691 Mon Sep 17 00:00:00 2001 From: jango-blockchained Date: Sat, 1 Feb 2025 07:50:54 +0100 Subject: [PATCH] Refactor SSE Endpoint with Enhanced Token Validation and Client Management - Replaced direct token comparison with TokenManager validation - Implemented robust SSE client connection and event subscription workflow - Added detailed client authentication and connection status reporting - Improved SSE endpoint with flexible event, entity, and domain subscription - Enhanced error handling and client disconnect management --- src/index.ts | 70 ++++++++++++++++++++++++++++++++++++++---------- src/sse/index.ts | 4 +-- 2 files changed, 58 insertions(+), 16 deletions(-) diff --git a/src/index.ts b/src/index.ts index 348ac2c..de98a95 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,7 +5,7 @@ import { v4 as uuidv4 } from 'uuid'; import { sseManager } from './sse/index.js'; import { ILogger } from "@digital-alchemy/core"; import express from 'express'; -import { rateLimiter, securityHeaders, validateRequest, sanitizeInput, errorHandler } from './security/index.js'; +import { rateLimiter, securityHeaders, validateRequest, sanitizeInput, errorHandler, TokenManager } from './security/index.js'; import { MCP_SCHEMA } from './mcp/schema.js'; // Load environment variables based on NODE_ENV @@ -139,28 +139,70 @@ app.get('/subscribe_events', (req, res) => { // Get token from query parameter const token = req.query.token?.toString(); - if (!token || token !== HASS_TOKEN) { + if (!token || !TokenManager.validateToken(token)) { return res.status(401).json({ success: false, message: 'Unauthorized - Invalid token' }); } - const tool = tools.find(t => t.name === 'subscribe_events'); - if (!tool) { - return res.status(404).json({ - success: false, - message: 'Tool not found' - }); + // Set SSE headers + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Access-Control-Allow-Origin': '*' + }); + + // Send initial connection message + res.write(`data: ${JSON.stringify({ + type: 'connection', + status: 'connected', + timestamp: new Date().toISOString() + })}\n\n`); + + const clientId = uuidv4(); + const client = { + id: clientId, + send: (data: string) => { + res.write(`data: ${data}\n\n`); + } + }; + + // Add client to SSE manager + const sseClient = sseManager.addClient(client, token); + if (!sseClient || !sseClient.authenticated) { + res.write(`data: ${JSON.stringify({ + type: 'error', + message: sseClient ? 'Authentication failed' : 'Maximum client limit reached', + timestamp: new Date().toISOString() + })}\n\n`); + return res.end(); } - tool.execute({ - token, - events: req.query.events?.toString().split(','), - entity_id: req.query.entity_id?.toString(), - domain: req.query.domain?.toString(), - response: res + // Subscribe to events if specified + const events = req.query.events?.toString().split(',').filter(Boolean); + if (events?.length) { + events.forEach(event => sseManager.subscribeToEvent(clientId, event)); + } + + // Subscribe to entity if specified + const entityId = req.query.entity_id?.toString(); + if (entityId) { + sseManager.subscribeToEntity(clientId, entityId); + } + + // Subscribe to domain if specified + const domain = req.query.domain?.toString(); + if (domain) { + sseManager.subscribeToDomain(clientId, domain); + } + + // Handle client disconnect + req.on('close', () => { + sseManager.removeClient(clientId); }); + } catch (error) { res.status(500).json({ success: false, diff --git a/src/sse/index.ts b/src/sse/index.ts index 582762a..11f4e48 100644 --- a/src/sse/index.ts +++ b/src/sse/index.ts @@ -1,5 +1,6 @@ import { EventEmitter } from 'events'; import { HassEntity, HassEvent, StateChangedEvent } from '../types/hass.js'; +import { TokenManager } from '../security/index.js'; interface RateLimit { count: number; @@ -264,8 +265,7 @@ export class SSEManager extends EventEmitter { private validateToken(token?: string): boolean { if (!token) return false; - // Compare with HASS_TOKEN from environment - return token === process.env.HASS_TOKEN; + return TokenManager.validateToken(token); } // Utility methods