From 64e619252c4b2015fe6ec86365d137bf3d918c85 Mon Sep 17 00:00:00 2001 From: jango-blockchained Date: Thu, 30 Jan 2025 11:42:19 +0100 Subject: [PATCH] Add Server-Sent Events (SSE) support for Home Assistant integration - Implemented comprehensive SSE manager with advanced client management - Added dynamic event subscription and broadcasting capabilities - Created robust rate limiting and client connection tracking - Enhanced Home Assistant event handling with new SSE endpoints - Updated package.json with UUID dependency for client identification - Expanded test coverage for SSE and WebSocket event handling - Improved type definitions for Home Assistant events and entities --- __tests__/hass/api.test.ts | 119 +++++++++- __tests__/websocket/events.test.ts | 18 +- package.json | 4 +- src/hass/index.ts | 120 +++++++--- src/index.ts | 148 +++++++++++- src/schemas/hass.ts | 254 +++++++++----------- src/sse/index.ts | 370 +++++++++++++++++++++++++++++ src/types/hass.ts | 35 +++ 8 files changed, 872 insertions(+), 196 deletions(-) create mode 100644 src/sse/index.ts diff --git a/__tests__/hass/api.test.ts b/__tests__/hass/api.test.ts index 802a8b4..6cc1d68 100644 --- a/__tests__/hass/api.test.ts +++ b/__tests__/hass/api.test.ts @@ -1,5 +1,57 @@ -import { HassInstance } from '../../src/hass/index.js'; +import { HassInstanceImpl } from '../../src/hass/index.js'; import * as HomeAssistant from '../../src/types/hass.js'; +import { HassWebSocketClient } from '../../src/websocket/client.js'; + +// Add DOM types for WebSocket and events +type CloseEvent = { + code: number; + reason: string; + wasClean: boolean; +}; + +type MessageEvent = { + data: any; + type: string; + lastEventId: string; +}; + +type Event = { + type: string; +}; + +interface WebSocketLike { + send(data: string): void; + close(): void; + addEventListener(type: string, listener: (event: any) => void): void; + removeEventListener(type: string, listener: (event: any) => void): void; + dispatchEvent(event: Event): boolean; + onopen: ((event: Event) => void) | null; + onclose: ((event: CloseEvent) => void) | null; + onmessage: ((event: MessageEvent) => void) | null; + onerror: ((event: Event) => void) | null; + url: string; + readyState: number; + bufferedAmount: number; + extensions: string; + protocol: string; + binaryType: string; +} + +interface MockWebSocketInstance extends WebSocketLike { + send: jest.Mock; + close: jest.Mock; + addEventListener: jest.Mock; + removeEventListener: jest.Mock; + dispatchEvent: jest.Mock; +} + +interface MockWebSocketConstructor extends jest.Mock { + CONNECTING: 0; + OPEN: 1; + CLOSING: 2; + CLOSED: 3; + prototype: WebSocketLike; +} // Mock the entire hass module jest.mock('../../src/hass/index.js', () => ({ @@ -7,10 +59,40 @@ jest.mock('../../src/hass/index.js', () => ({ })); describe('Home Assistant API', () => { - let hass: HassInstance; + let hass: HassInstanceImpl; + let mockWs: MockWebSocketInstance; + let MockWebSocket: MockWebSocketConstructor; beforeEach(() => { - hass = new HassInstance('http://localhost:8123', 'test_token'); + hass = new HassInstanceImpl('http://localhost:8123', 'test_token'); + mockWs = { + send: jest.fn(), + close: jest.fn(), + addEventListener: jest.fn(), + removeEventListener: jest.fn(), + dispatchEvent: jest.fn(), + onopen: null, + onclose: null, + onmessage: null, + onerror: null, + url: '', + readyState: 1, + bufferedAmount: 0, + extensions: '', + protocol: '', + binaryType: 'blob' + } as MockWebSocketInstance; + + // Create a mock WebSocket constructor + MockWebSocket = jest.fn().mockImplementation(() => mockWs) as MockWebSocketConstructor; + MockWebSocket.CONNECTING = 0; + MockWebSocket.OPEN = 1; + MockWebSocket.CLOSING = 2; + MockWebSocket.CLOSED = 3; + MockWebSocket.prototype = {} as WebSocketLike; + + // Mock WebSocket globally + (global as any).WebSocket = MockWebSocket; }); describe('State Management', () => { @@ -105,24 +187,16 @@ describe('Home Assistant API', () => { describe('Event Subscription', () => { it('should subscribe to events', async () => { const callback = jest.fn(); - const mockWs = { - send: jest.fn(), - close: jest.fn(), - addEventListener: jest.fn() - }; - - global.WebSocket = jest.fn().mockImplementation(() => mockWs); - await hass.subscribeEvents(callback, 'state_changed'); - expect(WebSocket).toHaveBeenCalledWith( + expect(MockWebSocket).toHaveBeenCalledWith( 'ws://localhost:8123/api/websocket' ); }); it('should handle subscription errors', async () => { const callback = jest.fn(); - global.WebSocket = jest.fn().mockImplementation(() => { + MockWebSocket.mockImplementation(() => { throw new Error('WebSocket connection failed'); }); @@ -131,4 +205,23 @@ describe('Home Assistant API', () => { ).rejects.toThrow('WebSocket connection failed'); }); }); + + describe('WebSocket connection', () => { + it('should connect to WebSocket endpoint', async () => { + await hass.subscribeEvents(() => { }); + expect(MockWebSocket).toHaveBeenCalledWith( + 'ws://localhost:8123/api/websocket' + ); + }); + + it('should handle connection errors', async () => { + MockWebSocket.mockImplementation(() => { + throw new Error('Connection failed'); + }); + + await expect(hass.subscribeEvents(() => { })).rejects.toThrow( + 'Connection failed' + ); + }); + }); }); \ No newline at end of file diff --git a/__tests__/websocket/events.test.ts b/__tests__/websocket/events.test.ts index dccd32b..c4fe78c 100644 --- a/__tests__/websocket/events.test.ts +++ b/__tests__/websocket/events.test.ts @@ -109,7 +109,11 @@ describe('WebSocket Event Handling', () => { attributes: { brightness: 255 }, last_changed: '2024-01-01T00:00:00Z', last_updated: '2024-01-01T00:00:00Z', - context: { id: '123' } + context: { + id: '123', + parent_id: null, + user_id: null + } }, old_state: { entity_id: 'light.living_room', @@ -117,12 +121,20 @@ describe('WebSocket Event Handling', () => { attributes: {}, last_changed: '2024-01-01T00:00:00Z', last_updated: '2024-01-01T00:00:00Z', - context: { id: '122' } + context: { + id: '122', + parent_id: null, + user_id: null + } } }, origin: 'LOCAL', time_fired: '2024-01-01T00:00:00Z', - context: { id: '123' } + context: { + id: '123', + parent_id: null, + user_id: null + } }; client.on('event', (event) => { diff --git a/package.json b/package.json index 8c0de4c..eace54f 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "@strandbrown/homeassistant-mcp", + "name": "jango-blockchained/homeassistant-mcp", "version": "0.1.0", "description": "Model Context Protocol Server for Home Assistant", "type": "module", @@ -26,6 +26,7 @@ "express-rate-limit": "^7.5.0", "helmet": "^8.0.0", "litemcp": "^0.7.0", + "uuid": "^11.0.5", "ws": "^8.18.0", "zod": "^3.22.4" }, @@ -36,6 +37,7 @@ "@types/helmet": "^0.0.48", "@types/jest": "^28.1.8", "@types/node": "^20.17.10", + "@types/uuid": "^10.0.0", "@types/ws": "^8.5.14", "jest": "^28.1.3", "semver": "^6.3.1", diff --git a/src/hass/index.ts b/src/hass/index.ts index 8ef47de..f34bafe 100644 --- a/src/hass/index.ts +++ b/src/hass/index.ts @@ -19,7 +19,37 @@ type HassServices = { // Define the type for Home Assistant instance interface HassInstance extends TServiceParams { + baseUrl: string; + token: string; + wsClient: HassWebSocketClient | undefined; services: HassServices; + als: AlsExtension; + context: TContext; + event: EventEmitter<[never]>; + internal: InternalDefinition; + lifecycle: TLifecycleBase; + logger: ILogger; + scheduler: TScheduler; + config: TInjectedConfig; + params: TServiceParams; + hass: GetApisResult<{ + area: typeof Area; + backup: typeof Backup; + call: typeof CallProxy; + configure: typeof Configure; + device: typeof Device; + entity: typeof EntityManager; + events: typeof EventsService; + fetch: typeof FetchAPI; + floor: typeof Floor; + idBy: typeof IDByExtension; + internals: typeof FetchInternals; + label: typeof Label; + refBy: typeof ReferenceService; + registry: typeof Registry; + socket: typeof WebsocketAPI; + zone: typeof Zone; + }>; } // Configuration type for application with more specific constraints @@ -265,32 +295,60 @@ export class HassWebSocketClient extends EventEmitter { } } -export interface HassInstance { - fetchStates(): Promise; - fetchState(entityId: string): Promise; - callService(domain: string, service: string, data: Record): Promise; - subscribeEvents(callback: (event: HomeAssistant.Event) => void, eventType?: string): Promise; - unsubscribeEvents(subscriptionId: number): Promise; -} +export class HassInstanceImpl implements HassInstance { + public readonly baseUrl: string; + public readonly token: string; + public wsClient: HassWebSocketClient | undefined; -class HassInstanceImpl implements HassInstance { - private wsClient: HassWebSocketClient | null = null; + public services!: HassServices; + public als!: AlsExtension; + public context!: TContext; + public event!: EventEmitter<[never]>; + public internal!: InternalDefinition; + public lifecycle!: TLifecycleBase; + public logger!: ILogger; + public scheduler!: TScheduler; + public config!: TInjectedConfig; + public params!: TServiceParams; + public hass!: GetApisResult<{ + area: typeof Area; + backup: typeof Backup; + call: typeof CallProxy; + configure: typeof Configure; + device: typeof Device; + entity: typeof EntityManager; + events: typeof EventsService; + fetch: typeof FetchAPI; + floor: typeof Floor; + idBy: typeof IDByExtension; + internals: typeof FetchInternals; + label: typeof Label; + refBy: typeof ReferenceService; + registry: typeof Registry; + socket: typeof WebsocketAPI; + zone: typeof Zone; + }>; - constructor( - private readonly baseUrl: string, - private readonly token: string - ) { } - services: HassServices; - als: AlsExtension; - context: TContext; - event: EventEmitter<[never]>; - internal: InternalDefinition; - lifecycle: TLifecycleBase; - logger: ILogger; - scheduler: TScheduler; - config: TInjectedConfig; - params: TServiceParams; - hass: GetApisResult<{ area: Area; backup: Backup; call: CallProxy; configure: Configure; device: Device; entity: EntityManager; events: EventsService; fetch: FetchAPI; floor: Floor; idBy: IDByExtension; internals: FetchInternals; label: Label; refBy: ReferenceService; registry: Registry; socket: WebsocketAPI; zone: Zone; }>; + constructor(baseUrl: string, token: string) { + this.baseUrl = baseUrl; + this.token = token; + this.initialize(); + } + + private initialize() { + // Initialize all required properties with proper type instantiation + this.services = {} as HassServices; + this.als = {} as AlsExtension; + this.context = {} as TContext; + this.event = new EventEmitter(); + this.internal = {} as InternalDefinition; + this.lifecycle = {} as TLifecycleBase; + this.logger = {} as ILogger; + this.scheduler = {} as TScheduler; + this.config = {} as TInjectedConfig; + this.params = {} as TServiceParams; + this.hass = {} as GetApisResult; + } async fetchStates(): Promise { const response = await fetch(`${this.baseUrl}/api/states`, { @@ -362,14 +420,20 @@ let hassInstance: HassInstance | null = null; export async function get_hass(env: keyof typeof CONFIG = 'development'): Promise { if (hassInstance) { + console.log('Reusing existing Home Assistant connection'); return hassInstance; } - const config = CONFIG[env]; - if (!config.host || !config.token) { - throw new Error("Missing required configuration"); + console.log('Initializing new Home Assistant connection...'); + + if (!HASS_CONFIG.BASE_URL || !HASS_CONFIG.TOKEN) { + console.error('Missing required configuration: HASS_HOST or HASS_TOKEN not set'); + throw new Error("Missing required configuration: HASS_HOST or HASS_TOKEN not set"); } - hassInstance = new HassInstanceImpl(config.host, config.token); + console.log(`Connecting to Home Assistant at ${HASS_CONFIG.BASE_URL}...`); + hassInstance = new HassInstanceImpl(HASS_CONFIG.BASE_URL, HASS_CONFIG.TOKEN); + console.log('Home Assistant connection established successfully'); + return hassInstance; } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 2ffb8d1..324b05e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,19 @@ import './polyfills.js'; +import { config } from 'dotenv'; +import { resolve } from 'path'; +import { v4 as uuidv4 } from 'uuid'; +import { sseManager } from './sse/index.js'; + +// Load environment variables based on NODE_ENV +const envFile = process.env.NODE_ENV === 'production' + ? '.env' + : process.env.NODE_ENV === 'test' + ? '.env.test' + : '.env.development'; + +console.log(`Loading environment from ${envFile}`); +config({ path: resolve(process.cwd(), envFile) }); + import { get_hass } from './hass/index.js'; import { LiteMCP } from 'litemcp'; import { z } from 'zod'; @@ -7,6 +22,9 @@ import { DomainSchema } from './schemas.js'; // Configuration const HASS_HOST = process.env.HASS_HOST || 'http://192.168.178.63:8123'; const HASS_TOKEN = process.env.HASS_TOKEN; +const PORT = process.env.PORT || 3000; + +console.log('Initializing Home Assistant connection...'); interface CommandParams { command: string; @@ -113,6 +131,17 @@ interface AutomationResponse { automation_id: string; } +interface SSEHeaders { + onAbort?: () => void; +} + +interface SSEParams { + token: string; + events?: string[]; + entity_id?: string; + domain?: string; +} + async function main() { const hass = await get_hass(); @@ -760,10 +789,11 @@ async function main() { throw new Error(`Failed to create automation: ${response.statusText}`); } + const responseData = await response.json() as { automation_id: string }; return { success: true, message: 'Successfully created automation', - automation_id: (await response.json()).automation_id, + automation_id: responseData.automation_id, }; } @@ -785,9 +815,11 @@ async function main() { throw new Error(`Failed to update automation: ${response.statusText}`); } + const responseData = await response.json() as { automation_id: string }; return { success: true, - message: `Successfully updated automation ${params.automation_id}`, + automation_id: responseData.automation_id, + message: 'Automation updated successfully' }; } @@ -865,9 +897,119 @@ async function main() { }, }); + // Add SSE endpoint + server.addTool({ + name: 'subscribe_events', + description: 'Subscribe to Home Assistant events via Server-Sent Events (SSE)', + parameters: z.object({ + token: z.string().describe('Authentication token (required)'), + events: z.array(z.string()).optional().describe('List of event types to subscribe to'), + entity_id: z.string().optional().describe('Specific entity ID to monitor for state changes'), + domain: z.string().optional().describe('Domain to monitor (e.g., "light", "switch", etc.)'), + }), + execute: async (params: SSEParams) => { + const clientId = uuidv4(); + + // Set up SSE headers + const responseHeaders = { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }; + + // Create SSE client + const client = { + id: clientId, + send: (data: string) => { + return { + headers: responseHeaders, + body: `data: ${data}\n\n`, + keepAlive: true + }; + } + }; + + // Add client to SSE manager with authentication + const sseClient = sseManager.addClient(client, params.token); + + if (!sseClient || !sseClient.authenticated) { + return { + success: false, + message: sseClient ? 'Authentication failed' : 'Maximum client limit reached' + }; + } + + // Subscribe to specific events if provided + if (params.events?.length) { + console.log(`Client ${clientId} subscribing to events:`, params.events); + for (const eventType of params.events) { + sseManager.subscribeToEvent(clientId, eventType); + } + } + + // Subscribe to specific entity if provided + if (params.entity_id) { + console.log(`Client ${clientId} subscribing to entity:`, params.entity_id); + sseManager.subscribeToEntity(clientId, params.entity_id); + } + + // Subscribe to domain if provided + if (params.domain) { + console.log(`Client ${clientId} subscribing to domain:`, params.domain); + sseManager.subscribeToDomain(clientId, params.domain); + } + + return { + headers: responseHeaders, + body: `data: ${JSON.stringify({ + type: 'connection', + status: 'connected', + id: clientId, + authenticated: true, + subscriptions: { + events: params.events || [], + entities: params.entity_id ? [params.entity_id] : [], + domains: params.domain ? [params.domain] : [] + }, + timestamp: new Date().toISOString() + })}\n\n`, + keepAlive: true + }; + } + }); + + // Add statistics endpoint + server.addTool({ + name: 'get_sse_stats', + description: 'Get SSE connection statistics', + parameters: z.object({ + token: z.string().describe('Authentication token (required)') + }), + execute: async (params: { token: string }) => { + if (params.token !== HASS_TOKEN) { + return { + success: false, + message: 'Authentication failed' + }; + } + + return { + success: true, + statistics: sseManager.getStatistics() + }; + } + }); + + console.log('Initializing MCP Server...'); + // Start the server await server.start(); - console.log('MCP Server started'); + console.log(`MCP Server started on port ${PORT}`); + console.log('Home Assistant server running on stdio'); + console.log('SSE endpoints initialized'); + + // Log successful initialization + console.log('Server initialization complete. Ready to handle requests.'); } main().catch(console.error); \ No newline at end of file diff --git a/src/schemas/hass.ts b/src/schemas/hass.ts index 2d2f539..69769c0 100644 --- a/src/schemas/hass.ts +++ b/src/schemas/hass.ts @@ -1,5 +1,5 @@ import { JSONSchemaType } from 'ajv'; -import * as HomeAssistant from '../types/hass.js'; +import { Entity, StateChangedEvent } from '../types/hass.js'; // Define base types for automation components type TriggerType = { @@ -44,8 +44,49 @@ type DeviceControlType = { parameters?: Record | null; }; -// Schema definitions -export const entitySchema: JSONSchemaType = { +// Define missing types +export interface Service { + name: string; + description: string; + target?: { + entity?: string[]; + device?: string[]; + area?: string[]; + } | null; + fields: Record; +} + +export interface Config { + components: string[]; + config_dir: string; + elevation: number; + latitude: number; + longitude: number; + location_name: string; + time_zone: string; + unit_system: { + length: string; + mass: string; + temperature: string; + volume: string; + }; + version: string; +} + +// Define base schemas +const contextSchema = { + type: 'object', + properties: { + id: { type: 'string' }, + parent_id: { type: 'string', nullable: true }, + user_id: { type: 'string', nullable: true } + }, + required: ['id', 'parent_id', 'user_id'], + additionalProperties: false +} as const; + +// Entity schema +export const entitySchema = { type: 'object', properties: { entity_id: { type: 'string' }, @@ -56,59 +97,55 @@ export const entitySchema: JSONSchemaType = { }, last_changed: { type: 'string' }, last_updated: { type: 'string' }, - context: { - type: 'object', - properties: { - id: { type: 'string' }, - parent_id: { type: 'string', nullable: true }, - user_id: { type: 'string', nullable: true } - }, - required: ['id'], - additionalProperties: false - } + context: contextSchema }, required: ['entity_id', 'state', 'attributes', 'last_changed', 'last_updated', 'context'], additionalProperties: false -}; +} as const; -export const serviceSchema: JSONSchemaType = { +// Service schema +export const serviceSchema = { type: 'object', properties: { - domain: { type: 'string' }, - service: { type: 'string' }, + name: { type: 'string' }, + description: { type: 'string' }, target: { type: 'object', nullable: true, properties: { - entity_id: { - type: 'array', - nullable: true, - items: { type: 'string' } - }, - device_id: { - type: 'array', - nullable: true, - items: { type: 'string' } - }, - area_id: { - type: 'array', - nullable: true, - items: { type: 'string' } - } + entity: { type: 'array', items: { type: 'string' }, nullable: true }, + device: { type: 'array', items: { type: 'string' }, nullable: true }, + area: { type: 'array', items: { type: 'string' }, nullable: true } }, + required: [], additionalProperties: false }, - service_data: { + fields: { type: 'object', - nullable: true, additionalProperties: true } }, - required: ['domain', 'service'], + required: ['name', 'description', 'fields'], additionalProperties: false +} as const; + +// Define the trigger schema without type assertion +export const triggerSchema = { + type: 'object', + properties: { + platform: { type: 'string' }, + event: { type: 'string', nullable: true }, + entity_id: { type: 'string', nullable: true }, + to: { type: 'string', nullable: true }, + from: { type: 'string', nullable: true }, + offset: { type: 'string', nullable: true } + }, + required: ['platform'], + additionalProperties: true }; -export const automationSchema: JSONSchemaType = { +// Define the automation schema +export const automationSchema = { type: 'object', properties: { alias: { type: 'string' }, @@ -120,63 +157,26 @@ export const automationSchema: JSONSchemaType = { }, trigger: { type: 'array', - items: { - type: 'object', - required: ['platform'], - properties: { - platform: { type: 'string' }, - event: { type: 'string', nullable: true }, - entity_id: { type: 'string', nullable: true }, - to: { type: 'string', nullable: true }, - from: { type: 'string', nullable: true }, - offset: { type: 'string', nullable: true } - }, - additionalProperties: true - } + items: triggerSchema }, condition: { type: 'array', - nullable: true, items: { type: 'object', - required: ['condition'], - properties: { - condition: { type: 'string' } - }, additionalProperties: true - } + }, + nullable: true }, action: { type: 'array', items: { type: 'object', - required: ['service'], - properties: { - service: { type: 'string' }, - target: { - type: 'object', - nullable: true, - properties: { - entity_id: { - type: 'array', - items: { type: 'string' }, - nullable: true - } - }, - additionalProperties: true - }, - data: { - type: 'object', - nullable: true, - additionalProperties: true - } - }, additionalProperties: true } } }, required: ['alias', 'trigger', 'action'], - additionalProperties: true + additionalProperties: false }; export const deviceControlSchema: JSONSchemaType = { @@ -206,7 +206,8 @@ export const deviceControlSchema: JSONSchemaType = { additionalProperties: false }; -export const stateChangedEventSchema: JSONSchemaType = { +// State changed event schema +export const stateChangedEventSchema = { type: 'object', properties: { event_type: { type: 'string', const: 'state_changed' }, @@ -214,78 +215,31 @@ export const stateChangedEventSchema: JSONSchemaType = { +// Config schema +export const configSchema = { type: 'object', properties: { + components: { type: 'array', items: { type: 'string' } }, + config_dir: { type: 'string' }, + elevation: { type: 'number' }, latitude: { type: 'number' }, longitude: { type: 'number' }, - elevation: { type: 'number' }, + location_name: { type: 'string' }, + time_zone: { type: 'string' }, unit_system: { type: 'object', properties: { @@ -297,14 +251,18 @@ export const configSchema: JSONSchemaType = { required: ['length', 'mass', 'temperature', 'volume'], additionalProperties: false }, - location_name: { type: 'string' }, - time_zone: { type: 'string' }, - components: { - type: 'array', - items: { type: 'string' } - }, version: { type: 'string' } }, - required: ['latitude', 'longitude', 'elevation', 'unit_system', 'location_name', 'time_zone', 'components', 'version'], + required: [ + 'components', + 'config_dir', + 'elevation', + 'latitude', + 'longitude', + 'location_name', + 'time_zone', + 'unit_system', + 'version' + ], additionalProperties: false -}; \ No newline at end of file +} as const; \ No newline at end of file diff --git a/src/sse/index.ts b/src/sse/index.ts new file mode 100644 index 0000000..582762a --- /dev/null +++ b/src/sse/index.ts @@ -0,0 +1,370 @@ +import { EventEmitter } from 'events'; +import { HassEntity, HassEvent, StateChangedEvent } from '../types/hass.js'; + +interface RateLimit { + count: number; + lastReset: number; +} + +export interface SSEClient { + id: string; + send: (data: string) => void; + subscriptions: { + entities: Set; + events: Set; + domains: Set; + }; + authenticated: boolean; + rateLimit: RateLimit; + lastPing: number; + connectionTime: number; +} + +export class SSEManager extends EventEmitter { + private clients: Map = new Map(); + private static instance: SSEManager | null = null; + private entityStates: Map = new Map(); + + // Configuration + private readonly MAX_CLIENTS = 100; + private readonly RATE_LIMIT_WINDOW = 60000; // 1 minute + private readonly RATE_LIMIT_MAX_REQUESTS = 1000; + private readonly CLIENT_TIMEOUT = 300000; // 5 minutes + private readonly PING_INTERVAL = 30000; // 30 seconds + + private constructor() { + super(); + console.log('Initializing SSE Manager...'); + this.startMaintenanceInterval(); + } + + private startMaintenanceInterval() { + setInterval(() => { + this.performMaintenance(); + }, 60000); // Run every minute + } + + private performMaintenance() { + const now = Date.now(); + + // Check each client for timeouts and rate limits + for (const [clientId, client] of this.clients.entries()) { + // Remove inactive clients + if (now - client.lastPing > this.CLIENT_TIMEOUT) { + console.log(`Removing inactive client: ${clientId}`); + this.removeClient(clientId); + continue; + } + + // Reset rate limits if window has passed + if (now - client.rateLimit.lastReset > this.RATE_LIMIT_WINDOW) { + client.rateLimit.count = 0; + client.rateLimit.lastReset = now; + } + } + + // Log statistics + console.log(`Maintenance complete - Active clients: ${this.clients.size}`); + } + + static getInstance(): SSEManager { + if (!SSEManager.instance) { + SSEManager.instance = new SSEManager(); + } + return SSEManager.instance; + } + + addClient(client: { id: string; send: (data: string) => void }, token?: string): SSEClient | null { + // Check maximum client limit + if (this.clients.size >= this.MAX_CLIENTS) { + console.warn('Maximum client limit reached, rejecting new connection'); + return null; + } + + const now = Date.now(); + const sseClient: SSEClient = { + id: client.id, + send: client.send, + subscriptions: { + entities: new Set(), + events: new Set(), + domains: new Set() + }, + authenticated: this.validateToken(token), + rateLimit: { + count: 0, + lastReset: now + }, + lastPing: now, + connectionTime: now + }; + + this.clients.set(client.id, sseClient); + console.log(`SSE client connected: ${client.id} (authenticated: ${sseClient.authenticated})`); + + // Start ping interval for this client + this.startClientPing(client.id); + + // Send initial connection success message + this.sendToClient(sseClient, { + type: 'connection', + status: 'connected', + id: client.id, + authenticated: sseClient.authenticated, + timestamp: new Date().toISOString() + }); + + return sseClient; + } + + private startClientPing(clientId: string) { + const interval = setInterval(() => { + const client = this.clients.get(clientId); + if (!client) { + clearInterval(interval); + return; + } + + this.sendToClient(client, { + type: 'ping', + timestamp: new Date().toISOString() + }); + }, this.PING_INTERVAL); + } + + removeClient(clientId: string) { + if (this.clients.has(clientId)) { + this.clients.delete(clientId); + console.log(`SSE client disconnected: ${clientId}`); + } + } + + subscribeToEntity(clientId: string, entityId: string) { + const client = this.clients.get(clientId); + if (client?.authenticated) { + client.subscriptions.entities.add(entityId); + console.log(`Client ${clientId} subscribed to entity: ${entityId}`); + + // Send current state if available + const currentState = this.entityStates.get(entityId); + if (currentState) { + this.sendToClient(client, { + type: 'state_changed', + data: { + entity_id: currentState.entity_id, + state: currentState.state, + attributes: currentState.attributes, + last_changed: currentState.last_changed, + last_updated: currentState.last_updated + } + }); + } + } + } + + subscribeToDomain(clientId: string, domain: string) { + const client = this.clients.get(clientId); + if (client?.authenticated) { + client.subscriptions.domains.add(domain); + console.log(`Client ${clientId} subscribed to domain: ${domain}`); + } + } + + subscribeToEvent(clientId: string, eventType: string) { + const client = this.clients.get(clientId); + if (client?.authenticated) { + client.subscriptions.events.add(eventType); + console.log(`Client ${clientId} subscribed to event: ${eventType}`); + } + } + + broadcastStateChange(entity: HassEntity) { + // Update stored state + this.entityStates.set(entity.entity_id, entity); + + const domain = entity.entity_id.split('.')[0]; + const message = { + type: 'state_changed', + data: { + entity_id: entity.entity_id, + state: entity.state, + attributes: entity.attributes, + last_changed: entity.last_changed, + last_updated: entity.last_updated + }, + timestamp: new Date().toISOString() + }; + + console.log(`Broadcasting state change for ${entity.entity_id}`); + + // Send to relevant subscribers only + for (const client of this.clients.values()) { + if (!client.authenticated) continue; + + if ( + client.subscriptions.entities.has(entity.entity_id) || + client.subscriptions.domains.has(domain) || + client.subscriptions.events.has('state_changed') + ) { + this.sendToClient(client, message); + } + } + } + + broadcastEvent(event: HassEvent) { + const message = { + type: event.event_type, + data: event.data, + origin: event.origin, + time_fired: event.time_fired, + context: event.context, + timestamp: new Date().toISOString() + }; + + console.log(`Broadcasting event: ${event.event_type}`); + + // Send to relevant subscribers only + for (const client of this.clients.values()) { + if (!client.authenticated) continue; + + if (client.subscriptions.events.has(event.event_type)) { + this.sendToClient(client, message); + } + } + } + + private sendToClient(client: SSEClient, data: any) { + try { + // Check rate limit + const now = Date.now(); + if (now - client.rateLimit.lastReset > this.RATE_LIMIT_WINDOW) { + client.rateLimit.count = 0; + client.rateLimit.lastReset = now; + } + + if (client.rateLimit.count >= this.RATE_LIMIT_MAX_REQUESTS) { + console.warn(`Rate limit exceeded for client ${client.id}`); + this.sendToClient(client, { + type: 'error', + error: 'rate_limit_exceeded', + message: 'Too many requests, please try again later', + timestamp: new Date().toISOString() + }); + return; + } + + client.rateLimit.count++; + client.lastPing = now; + client.send(JSON.stringify(data)); + } catch (error) { + console.error(`Error sending message to client ${client.id}:`, error); + this.removeClient(client.id); + } + } + + private validateToken(token?: string): boolean { + if (!token) return false; + // Compare with HASS_TOKEN from environment + return token === process.env.HASS_TOKEN; + } + + // Utility methods + getConnectedClients(): number { + return this.clients.size; + } + + getClientSubscriptions(clientId: string) { + return this.clients.get(clientId)?.subscriptions; + } + + getEntityState(entityId: string): HassEntity | undefined { + return this.entityStates.get(entityId); + } + + // Add new event types + broadcastServiceCall(domain: string, service: string, data: any) { + const message = { + type: 'service_called', + data: { + domain, + service, + service_data: data + }, + timestamp: new Date().toISOString() + }; + + this.broadcastToSubscribers('service_called', message); + } + + broadcastAutomationTriggered(automationId: string, trigger: any) { + const message = { + type: 'automation_triggered', + data: { + automation_id: automationId, + trigger + }, + timestamp: new Date().toISOString() + }; + + this.broadcastToSubscribers('automation_triggered', message); + } + + broadcastScriptExecuted(scriptId: string, data: any) { + const message = { + type: 'script_executed', + data: { + script_id: scriptId, + execution_data: data + }, + timestamp: new Date().toISOString() + }; + + this.broadcastToSubscribers('script_executed', message); + } + + private broadcastToSubscribers(eventType: string, message: any) { + for (const client of this.clients.values()) { + if (!client.authenticated) continue; + + if (client.subscriptions.events.has(eventType)) { + this.sendToClient(client, message); + } + } + } + + // Add statistics methods + getStatistics() { + const now = Date.now(); + const stats = { + total_clients: this.clients.size, + authenticated_clients: 0, + total_subscriptions: 0, + clients_by_connection_time: { + less_than_1m: 0, + less_than_5m: 0, + less_than_1h: 0, + more_than_1h: 0 + }, + total_entities_tracked: this.entityStates.size + }; + + for (const client of this.clients.values()) { + if (client.authenticated) stats.authenticated_clients++; + + stats.total_subscriptions += + client.subscriptions.entities.size + + client.subscriptions.events.size + + client.subscriptions.domains.size; + + const connectionDuration = now - client.connectionTime; + if (connectionDuration < 60000) stats.clients_by_connection_time.less_than_1m++; + else if (connectionDuration < 300000) stats.clients_by_connection_time.less_than_5m++; + else if (connectionDuration < 3600000) stats.clients_by_connection_time.less_than_1h++; + else stats.clients_by_connection_time.more_than_1h++; + } + + return stats; + } +} + +export const sseManager = SSEManager.getInstance(); \ No newline at end of file diff --git a/src/types/hass.ts b/src/types/hass.ts index a7f1581..5511e84 100644 --- a/src/types/hass.ts +++ b/src/types/hass.ts @@ -47,4 +47,39 @@ export interface StateChangedEvent extends Event { new_state: Entity | null; old_state: Entity | null; }; +} + +export interface HassEntity { + entity_id: string; + state: string; + attributes: Record; + last_changed?: string; + last_updated?: string; + context?: { + id: string; + parent_id?: string; + user_id?: string; + }; +} + +export interface HassState { + entity_id: string; + state: string; + attributes: { + friendly_name?: string; + description?: string; + [key: string]: any; + }; +} + +export interface HassEvent { + event_type: string; + data: any; + origin: string; + time_fired: string; + context: { + id: string; + parent_id?: string; + user_id?: string; + }; } \ No newline at end of file