From 10bf5919e45d514c13853762fae713ad2c42f8c3 Mon Sep 17 00:00:00 2001 From: jango-blockchained Date: Mon, 3 Feb 2025 22:29:41 +0100 Subject: [PATCH] refactor: enhance middleware and security with advanced protection mechanisms - Upgraded rate limiter configuration with more granular control and detailed headers - Improved authentication middleware with enhanced token validation and error responses - Implemented advanced input sanitization using sanitize-html with comprehensive XSS protection - Replaced manual security headers with helmet for robust web security configuration - Enhanced error handling middleware with more detailed logging and specific error type handling - Updated SSE rate limiting with burst and window-based restrictions - Improved token validation with more precise signature and claim verification --- src/middleware/index.ts | 226 ++++++++++++++++++++---------- src/security/index.ts | 101 ++++++++------ src/sse/index.ts | 294 ++++++++++++++++++---------------------- 3 files changed, 352 insertions(+), 269 deletions(-) diff --git a/src/middleware/index.ts b/src/middleware/index.ts index c415709..beb6590 100644 --- a/src/middleware/index.ts +++ b/src/middleware/index.ts @@ -2,32 +2,52 @@ import { Request, Response, NextFunction } from 'express'; import { HASS_CONFIG, RATE_LIMIT_CONFIG } from '../config/index.js'; import rateLimit from 'express-rate-limit'; import { TokenManager } from '../security/index.js'; +import sanitizeHtml from 'sanitize-html'; +import helmet from 'helmet'; -// Rate limiter middleware +// Rate limiter middleware with enhanced configuration export const rateLimiter = rateLimit({ windowMs: 60 * 1000, // 1 minute max: RATE_LIMIT_CONFIG.REGULAR, + standardHeaders: true, // Return rate limit info in the `RateLimit-*` headers + legacyHeaders: false, // Disable the `X-RateLimit-*` headers message: { success: false, message: 'Too many requests, please try again later.', reset_time: new Date(Date.now() + 60 * 1000).toISOString() - } + }, + skipSuccessfulRequests: false, // Count all requests + keyGenerator: (req) => req.ip || req.socket.remoteAddress || 'unknown' // Use IP for rate limiting }); -// WebSocket rate limiter middleware +// WebSocket rate limiter middleware with enhanced configuration export const wsRateLimiter = rateLimit({ windowMs: 60 * 1000, // 1 minute max: RATE_LIMIT_CONFIG.WEBSOCKET, + standardHeaders: true, + legacyHeaders: false, message: { success: false, message: 'Too many WebSocket connections, please try again later.', reset_time: new Date(Date.now() + 60 * 1000).toISOString() - } + }, + skipSuccessfulRequests: false, + keyGenerator: (req) => req.ip || req.socket.remoteAddress || 'unknown' }); -// Authentication middleware +// Authentication middleware with enhanced security export const authenticate = (req: Request, res: Response, next: NextFunction) => { - const token = req.headers.authorization?.replace('Bearer ', '') || ''; + const authHeader = req.headers.authorization; + if (!authHeader || !authHeader.startsWith('Bearer ')) { + return res.status(401).json({ + success: false, + message: 'Unauthorized', + error: 'Missing or invalid authorization header', + timestamp: new Date().toISOString() + }); + } + + const token = authHeader.replace('Bearer ', ''); const clientIp = req.ip || req.socket.remoteAddress || ''; const validationResult = TokenManager.validateToken(token, clientIp); @@ -44,18 +64,40 @@ export const authenticate = (req: Request, res: Response, next: NextFunction) => next(); }; -// Enhanced security headers middleware -export const securityHeaders = (_req: Request, res: Response, next: NextFunction) => { - // Set strict security headers - res.setHeader('X-Content-Type-Options', 'nosniff'); - res.setHeader('X-Frame-Options', 'DENY'); - res.setHeader('X-XSS-Protection', '1; mode=block'); - res.setHeader('Strict-Transport-Security', 'max-age=31536000; includeSubDomains; preload'); - res.setHeader('Content-Security-Policy', "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; connect-src 'self' wss: https:;"); - res.setHeader('Referrer-Policy', 'strict-origin-when-cross-origin'); - res.setHeader('Permissions-Policy', 'geolocation=(), microphone=(), camera=()'); - next(); -}; +// Enhanced security headers middleware using helmet +export const securityHeaders = helmet({ + contentSecurityPolicy: { + directives: { + defaultSrc: ["'self'"], + scriptSrc: ["'self'", "'unsafe-inline'"], + styleSrc: ["'self'", "'unsafe-inline'"], + imgSrc: ["'self'", 'data:', 'https:'], + connectSrc: ["'self'", 'wss:', 'https:'], + frameSrc: ["'none'"], + objectSrc: ["'none'"], + baseUri: ["'self'"], + formAction: ["'self'"], + frameAncestors: ["'none'"] + } + }, + crossOriginEmbedderPolicy: true, + crossOriginOpenerPolicy: { policy: 'same-origin' }, + crossOriginResourcePolicy: { policy: 'same-origin' }, + dnsPrefetchControl: { allow: false }, + frameguard: { action: 'deny' }, + hidePoweredBy: true, + hsts: { + maxAge: 31536000, + includeSubDomains: true, + preload: true + }, + ieNoOpen: true, + noSniff: true, + originAgentCluster: true, + permittedCrossDomainPolicies: { permittedPolicies: 'none' }, + referrerPolicy: { policy: 'strict-origin-when-cross-origin' }, + xssFilter: true +}); // Enhanced request validation middleware export const validateRequest = (req: Request, res: Response, next: NextFunction) => { @@ -65,13 +107,16 @@ export const validateRequest = (req: Request, res: Response, next: NextFunction) } // Validate content type for POST/PUT/PATCH requests - if (['POST', 'PUT', 'PATCH'].includes(req.method) && !req.is('application/json')) { - return res.status(415).json({ - success: false, - message: 'Unsupported Media Type', - error: 'Content-Type must be application/json', - timestamp: new Date().toISOString() - }); + if (['POST', 'PUT', 'PATCH'].includes(req.method)) { + const contentType = req.headers['content-type']; + if (!contentType || !contentType.includes('application/json')) { + return res.status(415).json({ + success: false, + message: 'Unsupported Media Type', + error: 'Content-Type must be application/json', + timestamp: new Date().toISOString() + }); + } } // Validate request body size @@ -101,23 +146,42 @@ export const validateRequest = (req: Request, res: Response, next: NextFunction) next(); }; -// Input sanitization middleware +// Enhanced input sanitization middleware export const sanitizeInput = (req: Request, _res: Response, next: NextFunction) => { if (req.body) { - // Recursively sanitize object - const sanitizeObject = (obj: any): any => { + const sanitizeValue = (value: unknown): unknown => { + if (typeof value === 'string') { + // Sanitize HTML content + return sanitizeHtml(value, { + allowedTags: [], // Remove all HTML tags + allowedAttributes: {}, // Remove all attributes + textFilter: (text) => { + // Remove potential XSS patterns + return text.replace(/javascript:/gi, '') + .replace(/data:/gi, '') + .replace(/vbscript:/gi, '') + .replace(/on\w+=/gi, '') + .replace(/\b(alert|confirm|prompt|exec|eval|setTimeout|setInterval)\b/gi, ''); + } + }); + } + return value; + }; + + const sanitizeObject = (obj: unknown): unknown => { if (typeof obj !== 'object' || obj === null) { - return obj; + return sanitizeValue(obj); } if (Array.isArray(obj)) { return obj.map(item => sanitizeObject(item)); } - const sanitized: any = {}; - for (const [key, value] of Object.entries(obj)) { - // Remove any potentially dangerous characters from keys - const sanitizedKey = key.replace(/[<>]/g, ''); + const sanitized: Record = {}; + for (const [key, value] of Object.entries(obj as Record)) { + // Sanitize keys + const sanitizedKey = typeof key === 'string' ? sanitizeValue(key) as string : key; + // Recursively sanitize values sanitized[sanitizedKey] = sanitizeObject(value); } @@ -131,44 +195,68 @@ export const sanitizeInput = (req: Request, _res: Response, next: NextFunction) }; // Enhanced error handling middleware -export const errorHandler = (err: Error, _req: Request, res: Response, _next: NextFunction) => { - console.error('Error:', err); - - // Handle specific error types - if (err.name === 'ValidationError') { - return res.status(400).json({ - success: false, - message: 'Validation Error', - error: err.message, - timestamp: new Date().toISOString() - }); - } - - if (err.name === 'UnauthorizedError') { - return res.status(401).json({ - success: false, - message: 'Unauthorized', - error: err.message, - timestamp: new Date().toISOString() - }); - } - - if (err.name === 'ForbiddenError') { - return res.status(403).json({ - success: false, - message: 'Forbidden', - error: err.message, - timestamp: new Date().toISOString() - }); - } - - // Default error response - res.status(500).json({ - success: false, - message: 'Internal Server Error', - error: process.env.NODE_ENV === 'development' ? err.message : 'An unexpected error occurred', +export const errorHandler = (err: Error, req: Request, res: Response, _next: NextFunction) => { + // Log error with request context + console.error('Error:', { + error: err.message, + stack: err.stack, + method: req.method, + path: req.path, + ip: req.ip, timestamp: new Date().toISOString() }); + + // Handle specific error types + switch (err.name) { + case 'ValidationError': + return res.status(400).json({ + success: false, + message: 'Validation Error', + error: err.message, + timestamp: new Date().toISOString() + }); + + case 'UnauthorizedError': + return res.status(401).json({ + success: false, + message: 'Unauthorized', + error: err.message, + timestamp: new Date().toISOString() + }); + + case 'ForbiddenError': + return res.status(403).json({ + success: false, + message: 'Forbidden', + error: err.message, + timestamp: new Date().toISOString() + }); + + case 'NotFoundError': + return res.status(404).json({ + success: false, + message: 'Not Found', + error: err.message, + timestamp: new Date().toISOString() + }); + + case 'ConflictError': + return res.status(409).json({ + success: false, + message: 'Conflict', + error: err.message, + timestamp: new Date().toISOString() + }); + + default: + // Default error response + return res.status(500).json({ + success: false, + message: 'Internal Server Error', + error: process.env.NODE_ENV === 'development' ? err.message : 'An unexpected error occurred', + timestamp: new Date().toISOString() + }); + } }; // Export all middleware diff --git a/src/security/index.ts b/src/security/index.ts index ca58d89..000915e 100644 --- a/src/security/index.ts +++ b/src/security/index.ts @@ -129,6 +129,7 @@ export class TokenManager { * Validates a JWT token with enhanced security checks */ static validateToken(token: string, ip?: string): { valid: boolean; error?: string } { + // Check basic token format if (!token || typeof token !== 'string') { return { valid: false, error: 'Invalid token format' }; } @@ -139,34 +140,35 @@ export class TokenManager { } // Check for rate limiting - if (ip) { - const attempts = failedAttempts.get(ip); - if (attempts) { - const timeSinceLastAttempt = Date.now() - attempts.lastAttempt; - if (attempts.count >= SECURITY_CONFIG.MAX_FAILED_ATTEMPTS) { - if (timeSinceLastAttempt < SECURITY_CONFIG.LOCKOUT_DURATION) { - return { valid: false, error: 'Too many failed attempts. Please try again later.' }; - } - // Reset after lockout period - failedAttempts.delete(ip); - } - } + if (ip && this.isRateLimited(ip)) { + return { valid: false, error: 'Too many failed attempts. Please try again later.' }; + } + + // Get JWT secret + const secret = process.env.JWT_SECRET; + if (!secret) { + return { valid: false, error: 'JWT secret not configured' }; } try { - const decoded = jwt.decode(token); + // Verify token signature and decode + const decoded = jwt.verify(token, secret) as jwt.JwtPayload; + + // Verify token structure if (!decoded || typeof decoded !== 'object') { this.recordFailedAttempt(ip); return { valid: false, error: 'Invalid token structure' }; } - // Enhanced expiration checks + // Check required claims if (!decoded.exp || !decoded.iat) { this.recordFailedAttempt(ip); return { valid: false, error: 'Token missing required claims' }; } const now = Math.floor(Date.now() / 1000); + + // Check expiration if (decoded.exp <= now) { this.recordFailedAttempt(ip); return { valid: false, error: 'Token has expired' }; @@ -179,47 +181,61 @@ export class TokenManager { return { valid: false, error: 'Token exceeds maximum age limit' }; } - // Verify signature - const secret = process.env.JWT_SECRET; - if (!secret) { - return { valid: false, error: 'JWT secret not configured' }; + // Reset failed attempts on successful validation + if (ip) { + failedAttempts.delete(ip); } - try { - jwt.verify(token, secret); - // Reset failed attempts on successful validation - if (ip) { - failedAttempts.delete(ip); - } - return { valid: true }; - } catch (error) { - this.recordFailedAttempt(ip); - return { valid: false, error: 'Invalid token signature' }; - } + return { valid: true }; } catch (error) { this.recordFailedAttempt(ip); + if (error instanceof jwt.JsonWebTokenError) { + return { valid: false, error: 'Invalid token signature' }; + } + if (error instanceof jwt.TokenExpiredError) { + return { valid: false, error: 'Token has expired' }; + } return { valid: false, error: 'Token validation failed' }; } } + /** + * Checks if an IP is rate limited + */ + private static isRateLimited(ip: string): boolean { + const attempts = failedAttempts.get(ip); + if (!attempts) return false; + + const now = Date.now(); + const timeSinceLastAttempt = now - attempts.lastAttempt; + + // Reset if outside lockout period + if (timeSinceLastAttempt >= SECURITY_CONFIG.LOCKOUT_DURATION) { + failedAttempts.delete(ip); + return false; + } + + return attempts.count >= SECURITY_CONFIG.MAX_FAILED_ATTEMPTS; + } + /** * Records a failed authentication attempt */ private static recordFailedAttempt(ip?: string): void { if (!ip) return; - const attempts = failedAttempts.get(ip) || { count: 0, lastAttempt: 0 }; const now = Date.now(); + const attempts = failedAttempts.get(ip); - // Reset count if last attempt was outside lockout period - if (now - attempts.lastAttempt > SECURITY_CONFIG.LOCKOUT_DURATION) { - attempts.count = 1; + if (!attempts || (now - attempts.lastAttempt) >= SECURITY_CONFIG.LOCKOUT_DURATION) { + // First attempt or reset after lockout + failedAttempts.set(ip, { count: 1, lastAttempt: now }); } else { + // Increment existing attempts attempts.count++; + attempts.lastAttempt = now; + failedAttempts.set(ip, attempts); } - - attempts.lastAttempt = now; - failedAttempts.set(ip, attempts); } /** @@ -231,15 +247,18 @@ export class TokenManager { throw new Error('JWT secret not configured'); } + // Ensure we don't override system claims + const sanitizedPayload = { ...payload }; + delete (sanitizedPayload as any).iat; + delete (sanitizedPayload as any).exp; + return jwt.sign( - { - ...payload, - iat: Math.floor(Date.now() / 1000), - }, + sanitizedPayload, secret, { expiresIn: Math.floor(expiresIn / 1000), - algorithm: 'HS256' + algorithm: 'HS256', + notBefore: 0 // Token is valid immediately } ); } diff --git a/src/sse/index.ts b/src/sse/index.ts index 2aabfab..7578c90 100644 --- a/src/sse/index.ts +++ b/src/sse/index.ts @@ -7,10 +7,17 @@ const DEFAULT_MAX_CLIENTS = 1000; const DEFAULT_PING_INTERVAL = 30000; // 30 seconds const DEFAULT_CLEANUP_INTERVAL = 60000; // 1 minute const DEFAULT_MAX_CONNECTION_AGE = 24 * 60 * 60 * 1000; // 24 hours +const DEFAULT_RATE_LIMIT = { + MAX_MESSAGES: 100, // messages + WINDOW_MS: 60000, // 1 minute + BURST_LIMIT: 10 // max messages per second +}; interface RateLimit { count: number; lastReset: number; + burstCount: number; + lastBurstReset: number; } export interface SSEClient { @@ -32,6 +39,8 @@ interface ClientStats { lastPingAt?: Date; subscriptionCount: number; connectionDuration: number; + messagesSent: number; + lastActivity: Date; } export class SSEManager extends EventEmitter { @@ -42,18 +51,21 @@ export class SSEManager extends EventEmitter { private readonly pingInterval: number; private readonly cleanupInterval: number; private readonly maxConnectionAge: number; + private readonly rateLimit: typeof DEFAULT_RATE_LIMIT; constructor(options: { maxClients?: number; pingInterval?: number; cleanupInterval?: number; maxConnectionAge?: number; + rateLimit?: Partial; } = {}) { super(); this.maxClients = options.maxClients || DEFAULT_MAX_CLIENTS; this.pingInterval = options.pingInterval || DEFAULT_PING_INTERVAL; this.cleanupInterval = options.cleanupInterval || DEFAULT_CLEANUP_INTERVAL; this.maxConnectionAge = options.maxConnectionAge || DEFAULT_MAX_CONNECTION_AGE; + this.rateLimit = { ...DEFAULT_RATE_LIMIT, ...options.rateLimit }; console.log('Initializing SSE Manager...'); this.startMaintenanceTasks(); @@ -63,15 +75,17 @@ export class SSEManager extends EventEmitter { // Send periodic pings to keep connections alive setInterval(() => { this.clients.forEach(client => { - try { - client.send(JSON.stringify({ - type: 'ping', - timestamp: new Date().toISOString() - })); - client.lastPingAt = new Date(); - } catch (error) { - console.error(`Failed to ping client ${client.id}:`, error); - this.removeClient(client.id); + if (!this.isRateLimited(client)) { + try { + client.send(JSON.stringify({ + type: 'ping', + timestamp: new Date().toISOString() + })); + client.lastPingAt = new Date(); + } catch (error) { + console.error(`Failed to ping client ${client.id}:`, error); + this.removeClient(client.id); + } } }); }, this.pingInterval); @@ -98,7 +112,7 @@ export class SSEManager extends EventEmitter { return SSEManager.instance; } - addClient(client: Omit, token: string): SSEClient | null { + addClient(client: Omit, token: string): SSEClient | null { // Validate token const validationResult = TokenManager.validateToken(token, client.ip); if (!validationResult.valid) { @@ -117,7 +131,13 @@ export class SSEManager extends EventEmitter { ...client, authenticated: true, subscriptions: new Set(), - lastPingAt: new Date() + lastPingAt: new Date(), + rateLimit: { + count: 0, + lastReset: Date.now(), + burstCount: 0, + lastBurstReset: Date.now() + } }; this.clients.set(client.id, newClient); @@ -126,22 +146,46 @@ export class SSEManager extends EventEmitter { return newClient; } - private startClientPing(clientId: string) { - const interval = setInterval(() => { - const client = this.clients.get(clientId); - if (!client) { - clearInterval(interval); - return; - } + private isRateLimited(client: SSEClient): boolean { + const now = Date.now(); - this.sendToClient(client, { - type: 'ping', - timestamp: new Date().toISOString() - }); - }, this.pingInterval); + // Reset window counters if needed + if (now - client.rateLimit.lastReset >= this.rateLimit.WINDOW_MS) { + client.rateLimit.count = 0; + client.rateLimit.lastReset = now; + } + + // Reset burst counters if needed (every second) + if (now - client.rateLimit.lastBurstReset >= 1000) { + client.rateLimit.burstCount = 0; + client.rateLimit.lastBurstReset = now; + } + + // Check both window and burst limits + return ( + client.rateLimit.count >= this.rateLimit.MAX_MESSAGES || + client.rateLimit.burstCount >= this.rateLimit.BURST_LIMIT + ); } - removeClient(clientId: string) { + private updateRateLimit(client: SSEClient): void { + const now = Date.now(); + client.rateLimit.count++; + client.rateLimit.burstCount++; + + // Update timestamps if needed + if (now - client.rateLimit.lastReset >= this.rateLimit.WINDOW_MS) { + client.rateLimit.lastReset = now; + client.rateLimit.count = 1; + } + + if (now - client.rateLimit.lastBurstReset >= 1000) { + client.rateLimit.lastBurstReset = now; + client.rateLimit.burstCount = 1; + } + } + + removeClient(clientId: string): void { if (this.clients.has(clientId)) { this.clients.delete(clientId); console.log(`SSE client disconnected: ${clientId}`); @@ -152,46 +196,55 @@ export class SSEManager extends EventEmitter { } } - subscribeToEntity(clientId: string, entityId: string) { + subscribeToEntity(clientId: string, entityId: string): void { const client = this.clients.get(clientId); - if (client?.authenticated) { - client.subscriptions.add(`entity:${entityId}`); - console.log(`Client ${clientId} subscribed to entity: ${entityId}`); + if (!client?.authenticated) { + console.warn(`Unauthenticated client ${clientId} attempted to subscribe to entity: ${entityId}`); + return; + } - // Send current state if available - const currentState = this.entityStates.get(entityId); - if (currentState) { - this.sendToClient(client, { - type: 'state_changed', - data: { - entity_id: currentState.entity_id, - state: currentState.state, - attributes: currentState.attributes, - last_changed: currentState.last_changed, - last_updated: currentState.last_updated - } - }); - } + client.subscriptions.add(`entity:${entityId}`); + console.log(`Client ${clientId} subscribed to entity: ${entityId}`); + + // Send current state if available + const currentState = this.entityStates.get(entityId); + if (currentState && !this.isRateLimited(client)) { + this.sendToClient(client, { + type: 'state_changed', + data: { + entity_id: currentState.entity_id, + state: currentState.state, + attributes: currentState.attributes, + last_changed: currentState.last_changed, + last_updated: currentState.last_updated + } + }); } } - subscribeToDomain(clientId: string, domain: string) { + subscribeToDomain(clientId: string, domain: string): void { const client = this.clients.get(clientId); - if (client?.authenticated) { - client.subscriptions.add(`domain:${domain}`); - console.log(`Client ${clientId} subscribed to domain: ${domain}`); + if (!client?.authenticated) { + console.warn(`Unauthenticated client ${clientId} attempted to subscribe to domain: ${domain}`); + return; } + + client.subscriptions.add(`domain:${domain}`); + console.log(`Client ${clientId} subscribed to domain: ${domain}`); } - subscribeToEvent(clientId: string, eventType: string) { + subscribeToEvent(clientId: string, eventType: string): void { const client = this.clients.get(clientId); - if (client?.authenticated) { - client.subscriptions.add(`event:${eventType}`); - console.log(`Client ${clientId} subscribed to event: ${eventType}`); + if (!client?.authenticated) { + console.warn(`Unauthenticated client ${clientId} attempted to subscribe to event: ${eventType}`); + return; } + + client.subscriptions.add(`event:${eventType}`); + console.log(`Client ${clientId} subscribed to event: ${eventType}`); } - broadcastStateChange(entity: HassEntity) { + broadcastStateChange(entity: HassEntity): void { // Update stored state this.entityStates.set(entity.entity_id, entity); @@ -211,8 +264,8 @@ export class SSEManager extends EventEmitter { console.log(`Broadcasting state change for ${entity.entity_id}`); // Send to relevant subscribers only - for (const client of this.clients.values()) { - if (!client.authenticated) continue; + this.clients.forEach(client => { + if (!client.authenticated || this.isRateLimited(client)) return; if ( client.subscriptions.has(`entity:${entity.entity_id}`) || @@ -221,10 +274,10 @@ export class SSEManager extends EventEmitter { ) { this.sendToClient(client, message); } - } + }); } - broadcastEvent(event: HassEvent) { + broadcastEvent(event: HassEvent): void { const message = { type: event.event_type, data: event.data, @@ -237,117 +290,36 @@ export class SSEManager extends EventEmitter { console.log(`Broadcasting event: ${event.event_type}`); // Send to relevant subscribers only - for (const client of this.clients.values()) { - if (!client.authenticated) continue; + this.clients.forEach(client => { + if (!client.authenticated || this.isRateLimited(client)) return; if (client.subscriptions.has(`event:${event.event_type}`)) { this.sendToClient(client, message); } - } + }); } - private sendToClient(client: SSEClient, data: any) { + private sendToClient(client: SSEClient, data: unknown): void { try { - // Check rate limit - const now = Date.now(); - if (now - client.rateLimit.lastReset > this.cleanupInterval) { - client.rateLimit.count = 0; - client.rateLimit.lastReset = now; - } - - if (client.rateLimit.count >= 1000) { - console.warn(`Rate limit exceeded for client ${client.id}`); - this.sendToClient(client, { - type: 'error', - error: 'rate_limit_exceeded', - message: 'Too many requests, please try again later', - timestamp: new Date().toISOString() - }); + if (!client.authenticated) { + console.warn(`Attempted to send message to unauthenticated client ${client.id}`); return; } - client.rateLimit.count++; - client.lastPingAt = new Date(); - client.send(JSON.stringify(data)); + if (this.isRateLimited(client)) { + console.warn(`Rate limit exceeded for client ${client.id}`); + return; + } + + const message = typeof data === 'string' ? data : JSON.stringify(data); + client.send(message); + this.updateRateLimit(client); } catch (error) { - console.error(`Error sending message to client ${client.id}:`, error); + console.error(`Failed to send message to client ${client.id}:`, error); this.removeClient(client.id); } } - private validateToken(token?: string): boolean { - if (!token) return false; - const validationResult = TokenManager.validateToken(token); - return validationResult.valid; - } - - // Utility methods - getConnectedClients(): number { - return this.clients.size; - } - - getClientSubscriptions(clientId: string) { - return this.clients.get(clientId)?.subscriptions; - } - - getEntityState(entityId: string): HassEntity | undefined { - return this.entityStates.get(entityId); - } - - // Add new event types - broadcastServiceCall(domain: string, service: string, data: any) { - const message = { - type: 'service_called', - data: { - domain, - service, - service_data: data - }, - timestamp: new Date().toISOString() - }; - - this.broadcastToSubscribers('service_called', message); - } - - broadcastAutomationTriggered(automationId: string, trigger: any) { - const message = { - type: 'automation_triggered', - data: { - automation_id: automationId, - trigger - }, - timestamp: new Date().toISOString() - }; - - this.broadcastToSubscribers('automation_triggered', message); - } - - broadcastScriptExecuted(scriptId: string, data: any) { - const message = { - type: 'script_executed', - data: { - script_id: scriptId, - execution_data: data - }, - timestamp: new Date().toISOString() - }; - - this.broadcastToSubscribers('script_executed', message); - } - - private broadcastToSubscribers(eventType: string, message: any) { - for (const client of this.clients.values()) { - if (!client.authenticated) continue; - - if (client.subscriptions.has(`event:${eventType}`) || - client.subscriptions.has(`entity:${eventType}`) || - client.subscriptions.has(`domain:${eventType.split('.')[0]}`)) { - this.sendToClient(client, message); - } - } - } - - // Add statistics methods getStatistics(): { totalClients: number; authenticatedClients: number; @@ -356,31 +328,35 @@ export class SSEManager extends EventEmitter { } { const now = Date.now(); const clientStats: ClientStats[] = []; - const subscriptionCounts: { [key: string]: number } = {}; + const subscriptionStats: { [key: string]: number } = {}; + let authenticatedClients = 0; this.clients.forEach(client => { - // Collect client statistics + if (client.authenticated) { + authenticatedClients++; + } + clientStats.push({ id: client.id, ip: client.ip, connectedAt: client.connectedAt, lastPingAt: client.lastPingAt, subscriptionCount: client.subscriptions.size, - connectionDuration: now - client.connectedAt.getTime() + connectionDuration: now - client.connectedAt.getTime(), + messagesSent: client.rateLimit.count, + lastActivity: new Date(client.rateLimit.lastReset) }); - // Count subscriptions by type client.subscriptions.forEach(sub => { - const [type] = sub.split(':'); - subscriptionCounts[type] = (subscriptionCounts[type] || 0) + 1; + subscriptionStats[sub] = (subscriptionStats[sub] || 0) + 1; }); }); return { totalClients: this.clients.size, - authenticatedClients: Array.from(this.clients.values()).filter(c => c.authenticated).length, + authenticatedClients, clientStats, - subscriptionStats: subscriptionCounts + subscriptionStats }; } }