Refactor SSE Endpoint with Enhanced Token Validation and Client Management

- Replaced direct token comparison with TokenManager validation
- Implemented robust SSE client connection and event subscription workflow
- Added detailed client authentication and connection status reporting
- Improved SSE endpoint with flexible event, entity, and domain subscription
- Enhanced error handling and client disconnect management
This commit is contained in:
jango-blockchained
2025-02-01 07:50:54 +01:00
parent d3da46d5d5
commit 431a68d690
2 changed files with 58 additions and 16 deletions

View File

@@ -5,7 +5,7 @@ import { v4 as uuidv4 } from 'uuid';
import { sseManager } from './sse/index.js';
import { ILogger } from "@digital-alchemy/core";
import express from 'express';
import { rateLimiter, securityHeaders, validateRequest, sanitizeInput, errorHandler } from './security/index.js';
import { rateLimiter, securityHeaders, validateRequest, sanitizeInput, errorHandler, TokenManager } from './security/index.js';
import { MCP_SCHEMA } from './mcp/schema.js';
// Load environment variables based on NODE_ENV
@@ -139,28 +139,70 @@ app.get('/subscribe_events', (req, res) => {
// Get token from query parameter
const token = req.query.token?.toString();
if (!token || token !== HASS_TOKEN) {
if (!token || !TokenManager.validateToken(token)) {
return res.status(401).json({
success: false,
message: 'Unauthorized - Invalid token'
});
}
const tool = tools.find(t => t.name === 'subscribe_events');
if (!tool) {
return res.status(404).json({
success: false,
message: 'Tool not found'
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*'
});
// Send initial connection message
res.write(`data: ${JSON.stringify({
type: 'connection',
status: 'connected',
timestamp: new Date().toISOString()
})}\n\n`);
const clientId = uuidv4();
const client = {
id: clientId,
send: (data: string) => {
res.write(`data: ${data}\n\n`);
}
};
// Add client to SSE manager
const sseClient = sseManager.addClient(client, token);
if (!sseClient || !sseClient.authenticated) {
res.write(`data: ${JSON.stringify({
type: 'error',
message: sseClient ? 'Authentication failed' : 'Maximum client limit reached',
timestamp: new Date().toISOString()
})}\n\n`);
return res.end();
}
tool.execute({
token,
events: req.query.events?.toString().split(','),
entity_id: req.query.entity_id?.toString(),
domain: req.query.domain?.toString(),
response: res
// Subscribe to events if specified
const events = req.query.events?.toString().split(',').filter(Boolean);
if (events?.length) {
events.forEach(event => sseManager.subscribeToEvent(clientId, event));
}
// Subscribe to entity if specified
const entityId = req.query.entity_id?.toString();
if (entityId) {
sseManager.subscribeToEntity(clientId, entityId);
}
// Subscribe to domain if specified
const domain = req.query.domain?.toString();
if (domain) {
sseManager.subscribeToDomain(clientId, domain);
}
// Handle client disconnect
req.on('close', () => {
sseManager.removeClient(clientId);
});
} catch (error) {
res.status(500).json({
success: false,

View File

@@ -1,5 +1,6 @@
import { EventEmitter } from 'events';
import { HassEntity, HassEvent, StateChangedEvent } from '../types/hass.js';
import { TokenManager } from '../security/index.js';
interface RateLimit {
count: number;
@@ -264,8 +265,7 @@ export class SSEManager extends EventEmitter {
private validateToken(token?: string): boolean {
if (!token) return false;
// Compare with HASS_TOKEN from environment
return token === process.env.HASS_TOKEN;
return TokenManager.validateToken(token);
}
// Utility methods