Add Server-Sent Events (SSE) support for Home Assistant integration

- Implemented comprehensive SSE manager with advanced client management
- Added dynamic event subscription and broadcasting capabilities
- Created robust rate limiting and client connection tracking
- Enhanced Home Assistant event handling with new SSE endpoints
- Updated package.json with UUID dependency for client identification
- Expanded test coverage for SSE and WebSocket event handling
- Improved type definitions for Home Assistant events and entities
This commit is contained in:
jango-blockchained
2025-01-30 11:42:19 +01:00
parent 70c9287c68
commit 64e619252c
8 changed files with 872 additions and 196 deletions

View File

@@ -19,7 +19,37 @@ type HassServices = {
// Define the type for Home Assistant instance
interface HassInstance extends TServiceParams {
baseUrl: string;
token: string;
wsClient: HassWebSocketClient | undefined;
services: HassServices;
als: AlsExtension;
context: TContext;
event: EventEmitter<[never]>;
internal: InternalDefinition;
lifecycle: TLifecycleBase;
logger: ILogger;
scheduler: TScheduler;
config: TInjectedConfig;
params: TServiceParams;
hass: GetApisResult<{
area: typeof Area;
backup: typeof Backup;
call: typeof CallProxy;
configure: typeof Configure;
device: typeof Device;
entity: typeof EntityManager;
events: typeof EventsService;
fetch: typeof FetchAPI;
floor: typeof Floor;
idBy: typeof IDByExtension;
internals: typeof FetchInternals;
label: typeof Label;
refBy: typeof ReferenceService;
registry: typeof Registry;
socket: typeof WebsocketAPI;
zone: typeof Zone;
}>;
}
// Configuration type for application with more specific constraints
@@ -265,32 +295,60 @@ export class HassWebSocketClient extends EventEmitter {
}
}
export interface HassInstance {
fetchStates(): Promise<HomeAssistant.Entity[]>;
fetchState(entityId: string): Promise<HomeAssistant.Entity>;
callService(domain: string, service: string, data: Record<string, any>): Promise<void>;
subscribeEvents(callback: (event: HomeAssistant.Event) => void, eventType?: string): Promise<number>;
unsubscribeEvents(subscriptionId: number): Promise<void>;
}
export class HassInstanceImpl implements HassInstance {
public readonly baseUrl: string;
public readonly token: string;
public wsClient: HassWebSocketClient | undefined;
class HassInstanceImpl implements HassInstance {
private wsClient: HassWebSocketClient | null = null;
public services!: HassServices;
public als!: AlsExtension;
public context!: TContext;
public event!: EventEmitter<[never]>;
public internal!: InternalDefinition;
public lifecycle!: TLifecycleBase;
public logger!: ILogger;
public scheduler!: TScheduler;
public config!: TInjectedConfig;
public params!: TServiceParams;
public hass!: GetApisResult<{
area: typeof Area;
backup: typeof Backup;
call: typeof CallProxy;
configure: typeof Configure;
device: typeof Device;
entity: typeof EntityManager;
events: typeof EventsService;
fetch: typeof FetchAPI;
floor: typeof Floor;
idBy: typeof IDByExtension;
internals: typeof FetchInternals;
label: typeof Label;
refBy: typeof ReferenceService;
registry: typeof Registry;
socket: typeof WebsocketAPI;
zone: typeof Zone;
}>;
constructor(
private readonly baseUrl: string,
private readonly token: string
) { }
services: HassServices;
als: AlsExtension;
context: TContext;
event: EventEmitter<[never]>;
internal: InternalDefinition;
lifecycle: TLifecycleBase;
logger: ILogger;
scheduler: TScheduler;
config: TInjectedConfig;
params: TServiceParams;
hass: GetApisResult<{ area: Area; backup: Backup; call: CallProxy; configure: Configure; device: Device; entity: EntityManager; events: EventsService; fetch: FetchAPI; floor: Floor; idBy: IDByExtension; internals: FetchInternals; label: Label; refBy: ReferenceService; registry: Registry; socket: WebsocketAPI; zone: Zone; }>;
constructor(baseUrl: string, token: string) {
this.baseUrl = baseUrl;
this.token = token;
this.initialize();
}
private initialize() {
// Initialize all required properties with proper type instantiation
this.services = {} as HassServices;
this.als = {} as AlsExtension;
this.context = {} as TContext;
this.event = new EventEmitter();
this.internal = {} as InternalDefinition;
this.lifecycle = {} as TLifecycleBase;
this.logger = {} as ILogger;
this.scheduler = {} as TScheduler;
this.config = {} as TInjectedConfig;
this.params = {} as TServiceParams;
this.hass = {} as GetApisResult<any>;
}
async fetchStates(): Promise<HomeAssistant.Entity[]> {
const response = await fetch(`${this.baseUrl}/api/states`, {
@@ -362,14 +420,20 @@ let hassInstance: HassInstance | null = null;
export async function get_hass(env: keyof typeof CONFIG = 'development'): Promise<HassInstance> {
if (hassInstance) {
console.log('Reusing existing Home Assistant connection');
return hassInstance;
}
const config = CONFIG[env];
if (!config.host || !config.token) {
throw new Error("Missing required configuration");
console.log('Initializing new Home Assistant connection...');
if (!HASS_CONFIG.BASE_URL || !HASS_CONFIG.TOKEN) {
console.error('Missing required configuration: HASS_HOST or HASS_TOKEN not set');
throw new Error("Missing required configuration: HASS_HOST or HASS_TOKEN not set");
}
hassInstance = new HassInstanceImpl(config.host, config.token);
console.log(`Connecting to Home Assistant at ${HASS_CONFIG.BASE_URL}...`);
hassInstance = new HassInstanceImpl(HASS_CONFIG.BASE_URL, HASS_CONFIG.TOKEN);
console.log('Home Assistant connection established successfully');
return hassInstance;
}

View File

@@ -1,4 +1,19 @@
import './polyfills.js';
import { config } from 'dotenv';
import { resolve } from 'path';
import { v4 as uuidv4 } from 'uuid';
import { sseManager } from './sse/index.js';
// Load environment variables based on NODE_ENV
const envFile = process.env.NODE_ENV === 'production'
? '.env'
: process.env.NODE_ENV === 'test'
? '.env.test'
: '.env.development';
console.log(`Loading environment from ${envFile}`);
config({ path: resolve(process.cwd(), envFile) });
import { get_hass } from './hass/index.js';
import { LiteMCP } from 'litemcp';
import { z } from 'zod';
@@ -7,6 +22,9 @@ import { DomainSchema } from './schemas.js';
// Configuration
const HASS_HOST = process.env.HASS_HOST || 'http://192.168.178.63:8123';
const HASS_TOKEN = process.env.HASS_TOKEN;
const PORT = process.env.PORT || 3000;
console.log('Initializing Home Assistant connection...');
interface CommandParams {
command: string;
@@ -113,6 +131,17 @@ interface AutomationResponse {
automation_id: string;
}
interface SSEHeaders {
onAbort?: () => void;
}
interface SSEParams {
token: string;
events?: string[];
entity_id?: string;
domain?: string;
}
async function main() {
const hass = await get_hass();
@@ -760,10 +789,11 @@ async function main() {
throw new Error(`Failed to create automation: ${response.statusText}`);
}
const responseData = await response.json() as { automation_id: string };
return {
success: true,
message: 'Successfully created automation',
automation_id: (await response.json()).automation_id,
automation_id: responseData.automation_id,
};
}
@@ -785,9 +815,11 @@ async function main() {
throw new Error(`Failed to update automation: ${response.statusText}`);
}
const responseData = await response.json() as { automation_id: string };
return {
success: true,
message: `Successfully updated automation ${params.automation_id}`,
automation_id: responseData.automation_id,
message: 'Automation updated successfully'
};
}
@@ -865,9 +897,119 @@ async function main() {
},
});
// Add SSE endpoint
server.addTool({
name: 'subscribe_events',
description: 'Subscribe to Home Assistant events via Server-Sent Events (SSE)',
parameters: z.object({
token: z.string().describe('Authentication token (required)'),
events: z.array(z.string()).optional().describe('List of event types to subscribe to'),
entity_id: z.string().optional().describe('Specific entity ID to monitor for state changes'),
domain: z.string().optional().describe('Domain to monitor (e.g., "light", "switch", etc.)'),
}),
execute: async (params: SSEParams) => {
const clientId = uuidv4();
// Set up SSE headers
const responseHeaders = {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
};
// Create SSE client
const client = {
id: clientId,
send: (data: string) => {
return {
headers: responseHeaders,
body: `data: ${data}\n\n`,
keepAlive: true
};
}
};
// Add client to SSE manager with authentication
const sseClient = sseManager.addClient(client, params.token);
if (!sseClient || !sseClient.authenticated) {
return {
success: false,
message: sseClient ? 'Authentication failed' : 'Maximum client limit reached'
};
}
// Subscribe to specific events if provided
if (params.events?.length) {
console.log(`Client ${clientId} subscribing to events:`, params.events);
for (const eventType of params.events) {
sseManager.subscribeToEvent(clientId, eventType);
}
}
// Subscribe to specific entity if provided
if (params.entity_id) {
console.log(`Client ${clientId} subscribing to entity:`, params.entity_id);
sseManager.subscribeToEntity(clientId, params.entity_id);
}
// Subscribe to domain if provided
if (params.domain) {
console.log(`Client ${clientId} subscribing to domain:`, params.domain);
sseManager.subscribeToDomain(clientId, params.domain);
}
return {
headers: responseHeaders,
body: `data: ${JSON.stringify({
type: 'connection',
status: 'connected',
id: clientId,
authenticated: true,
subscriptions: {
events: params.events || [],
entities: params.entity_id ? [params.entity_id] : [],
domains: params.domain ? [params.domain] : []
},
timestamp: new Date().toISOString()
})}\n\n`,
keepAlive: true
};
}
});
// Add statistics endpoint
server.addTool({
name: 'get_sse_stats',
description: 'Get SSE connection statistics',
parameters: z.object({
token: z.string().describe('Authentication token (required)')
}),
execute: async (params: { token: string }) => {
if (params.token !== HASS_TOKEN) {
return {
success: false,
message: 'Authentication failed'
};
}
return {
success: true,
statistics: sseManager.getStatistics()
};
}
});
console.log('Initializing MCP Server...');
// Start the server
await server.start();
console.log('MCP Server started');
console.log(`MCP Server started on port ${PORT}`);
console.log('Home Assistant server running on stdio');
console.log('SSE endpoints initialized');
// Log successful initialization
console.log('Server initialization complete. Ready to handle requests.');
}
main().catch(console.error);

View File

@@ -1,5 +1,5 @@
import { JSONSchemaType } from 'ajv';
import * as HomeAssistant from '../types/hass.js';
import { Entity, StateChangedEvent } from '../types/hass.js';
// Define base types for automation components
type TriggerType = {
@@ -44,8 +44,49 @@ type DeviceControlType = {
parameters?: Record<string, any> | null;
};
// Schema definitions
export const entitySchema: JSONSchemaType<HomeAssistant.Entity> = {
// Define missing types
export interface Service {
name: string;
description: string;
target?: {
entity?: string[];
device?: string[];
area?: string[];
} | null;
fields: Record<string, any>;
}
export interface Config {
components: string[];
config_dir: string;
elevation: number;
latitude: number;
longitude: number;
location_name: string;
time_zone: string;
unit_system: {
length: string;
mass: string;
temperature: string;
volume: string;
};
version: string;
}
// Define base schemas
const contextSchema = {
type: 'object',
properties: {
id: { type: 'string' },
parent_id: { type: 'string', nullable: true },
user_id: { type: 'string', nullable: true }
},
required: ['id', 'parent_id', 'user_id'],
additionalProperties: false
} as const;
// Entity schema
export const entitySchema = {
type: 'object',
properties: {
entity_id: { type: 'string' },
@@ -56,59 +97,55 @@ export const entitySchema: JSONSchemaType<HomeAssistant.Entity> = {
},
last_changed: { type: 'string' },
last_updated: { type: 'string' },
context: {
type: 'object',
properties: {
id: { type: 'string' },
parent_id: { type: 'string', nullable: true },
user_id: { type: 'string', nullable: true }
},
required: ['id'],
additionalProperties: false
}
context: contextSchema
},
required: ['entity_id', 'state', 'attributes', 'last_changed', 'last_updated', 'context'],
additionalProperties: false
};
} as const;
export const serviceSchema: JSONSchemaType<HomeAssistant.Service> = {
// Service schema
export const serviceSchema = {
type: 'object',
properties: {
domain: { type: 'string' },
service: { type: 'string' },
name: { type: 'string' },
description: { type: 'string' },
target: {
type: 'object',
nullable: true,
properties: {
entity_id: {
type: 'array',
nullable: true,
items: { type: 'string' }
},
device_id: {
type: 'array',
nullable: true,
items: { type: 'string' }
},
area_id: {
type: 'array',
nullable: true,
items: { type: 'string' }
}
entity: { type: 'array', items: { type: 'string' }, nullable: true },
device: { type: 'array', items: { type: 'string' }, nullable: true },
area: { type: 'array', items: { type: 'string' }, nullable: true }
},
required: [],
additionalProperties: false
},
service_data: {
fields: {
type: 'object',
nullable: true,
additionalProperties: true
}
},
required: ['domain', 'service'],
required: ['name', 'description', 'fields'],
additionalProperties: false
} as const;
// Define the trigger schema without type assertion
export const triggerSchema = {
type: 'object',
properties: {
platform: { type: 'string' },
event: { type: 'string', nullable: true },
entity_id: { type: 'string', nullable: true },
to: { type: 'string', nullable: true },
from: { type: 'string', nullable: true },
offset: { type: 'string', nullable: true }
},
required: ['platform'],
additionalProperties: true
};
export const automationSchema: JSONSchemaType<AutomationType> = {
// Define the automation schema
export const automationSchema = {
type: 'object',
properties: {
alias: { type: 'string' },
@@ -120,63 +157,26 @@ export const automationSchema: JSONSchemaType<AutomationType> = {
},
trigger: {
type: 'array',
items: {
type: 'object',
required: ['platform'],
properties: {
platform: { type: 'string' },
event: { type: 'string', nullable: true },
entity_id: { type: 'string', nullable: true },
to: { type: 'string', nullable: true },
from: { type: 'string', nullable: true },
offset: { type: 'string', nullable: true }
},
additionalProperties: true
}
items: triggerSchema
},
condition: {
type: 'array',
nullable: true,
items: {
type: 'object',
required: ['condition'],
properties: {
condition: { type: 'string' }
},
additionalProperties: true
}
},
nullable: true
},
action: {
type: 'array',
items: {
type: 'object',
required: ['service'],
properties: {
service: { type: 'string' },
target: {
type: 'object',
nullable: true,
properties: {
entity_id: {
type: 'array',
items: { type: 'string' },
nullable: true
}
},
additionalProperties: true
},
data: {
type: 'object',
nullable: true,
additionalProperties: true
}
},
additionalProperties: true
}
}
},
required: ['alias', 'trigger', 'action'],
additionalProperties: true
additionalProperties: false
};
export const deviceControlSchema: JSONSchemaType<DeviceControlType> = {
@@ -206,7 +206,8 @@ export const deviceControlSchema: JSONSchemaType<DeviceControlType> = {
additionalProperties: false
};
export const stateChangedEventSchema: JSONSchemaType<HomeAssistant.StateChangedEvent> = {
// State changed event schema
export const stateChangedEventSchema = {
type: 'object',
properties: {
event_type: { type: 'string', const: 'state_changed' },
@@ -214,78 +215,31 @@ export const stateChangedEventSchema: JSONSchemaType<HomeAssistant.StateChangedE
type: 'object',
properties: {
entity_id: { type: 'string' },
new_state: {
type: 'object',
nullable: true,
properties: {
entity_id: { type: 'string' },
state: { type: 'string' },
attributes: {
type: 'object',
additionalProperties: true
},
last_changed: { type: 'string' },
last_updated: { type: 'string' },
context: {
type: 'object',
properties: {
id: { type: 'string' },
parent_id: { type: 'string', nullable: true },
user_id: { type: 'string', nullable: true }
},
required: ['id']
}
},
required: ['entity_id', 'state', 'attributes', 'last_changed', 'last_updated', 'context']
},
old_state: {
type: 'object',
nullable: true,
properties: {
entity_id: { type: 'string' },
state: { type: 'string' },
attributes: {
type: 'object',
additionalProperties: true
},
last_changed: { type: 'string' },
last_updated: { type: 'string' },
context: {
type: 'object',
properties: {
id: { type: 'string' },
parent_id: { type: 'string', nullable: true },
user_id: { type: 'string', nullable: true }
},
required: ['id']
}
},
required: ['entity_id', 'state', 'attributes', 'last_changed', 'last_updated', 'context']
}
new_state: { ...entitySchema, nullable: true },
old_state: { ...entitySchema, nullable: true }
},
required: ['entity_id', 'new_state']
required: ['entity_id', 'new_state', 'old_state'],
additionalProperties: false
},
origin: { type: 'string' },
time_fired: { type: 'string' },
context: {
type: 'object',
properties: {
id: { type: 'string' },
parent_id: { type: 'string', nullable: true },
user_id: { type: 'string', nullable: true }
},
required: ['id']
}
context: contextSchema
},
required: ['event_type', 'data', 'origin', 'time_fired', 'context']
};
required: ['event_type', 'data', 'origin', 'time_fired', 'context'],
additionalProperties: false
} as const;
export const configSchema: JSONSchemaType<HomeAssistant.Config> = {
// Config schema
export const configSchema = {
type: 'object',
properties: {
components: { type: 'array', items: { type: 'string' } },
config_dir: { type: 'string' },
elevation: { type: 'number' },
latitude: { type: 'number' },
longitude: { type: 'number' },
elevation: { type: 'number' },
location_name: { type: 'string' },
time_zone: { type: 'string' },
unit_system: {
type: 'object',
properties: {
@@ -297,14 +251,18 @@ export const configSchema: JSONSchemaType<HomeAssistant.Config> = {
required: ['length', 'mass', 'temperature', 'volume'],
additionalProperties: false
},
location_name: { type: 'string' },
time_zone: { type: 'string' },
components: {
type: 'array',
items: { type: 'string' }
},
version: { type: 'string' }
},
required: ['latitude', 'longitude', 'elevation', 'unit_system', 'location_name', 'time_zone', 'components', 'version'],
required: [
'components',
'config_dir',
'elevation',
'latitude',
'longitude',
'location_name',
'time_zone',
'unit_system',
'version'
],
additionalProperties: false
};
} as const;

370
src/sse/index.ts Normal file
View File

@@ -0,0 +1,370 @@
import { EventEmitter } from 'events';
import { HassEntity, HassEvent, StateChangedEvent } from '../types/hass.js';
interface RateLimit {
count: number;
lastReset: number;
}
export interface SSEClient {
id: string;
send: (data: string) => void;
subscriptions: {
entities: Set<string>;
events: Set<string>;
domains: Set<string>;
};
authenticated: boolean;
rateLimit: RateLimit;
lastPing: number;
connectionTime: number;
}
export class SSEManager extends EventEmitter {
private clients: Map<string, SSEClient> = new Map();
private static instance: SSEManager | null = null;
private entityStates: Map<string, HassEntity> = new Map();
// Configuration
private readonly MAX_CLIENTS = 100;
private readonly RATE_LIMIT_WINDOW = 60000; // 1 minute
private readonly RATE_LIMIT_MAX_REQUESTS = 1000;
private readonly CLIENT_TIMEOUT = 300000; // 5 minutes
private readonly PING_INTERVAL = 30000; // 30 seconds
private constructor() {
super();
console.log('Initializing SSE Manager...');
this.startMaintenanceInterval();
}
private startMaintenanceInterval() {
setInterval(() => {
this.performMaintenance();
}, 60000); // Run every minute
}
private performMaintenance() {
const now = Date.now();
// Check each client for timeouts and rate limits
for (const [clientId, client] of this.clients.entries()) {
// Remove inactive clients
if (now - client.lastPing > this.CLIENT_TIMEOUT) {
console.log(`Removing inactive client: ${clientId}`);
this.removeClient(clientId);
continue;
}
// Reset rate limits if window has passed
if (now - client.rateLimit.lastReset > this.RATE_LIMIT_WINDOW) {
client.rateLimit.count = 0;
client.rateLimit.lastReset = now;
}
}
// Log statistics
console.log(`Maintenance complete - Active clients: ${this.clients.size}`);
}
static getInstance(): SSEManager {
if (!SSEManager.instance) {
SSEManager.instance = new SSEManager();
}
return SSEManager.instance;
}
addClient(client: { id: string; send: (data: string) => void }, token?: string): SSEClient | null {
// Check maximum client limit
if (this.clients.size >= this.MAX_CLIENTS) {
console.warn('Maximum client limit reached, rejecting new connection');
return null;
}
const now = Date.now();
const sseClient: SSEClient = {
id: client.id,
send: client.send,
subscriptions: {
entities: new Set<string>(),
events: new Set<string>(),
domains: new Set<string>()
},
authenticated: this.validateToken(token),
rateLimit: {
count: 0,
lastReset: now
},
lastPing: now,
connectionTime: now
};
this.clients.set(client.id, sseClient);
console.log(`SSE client connected: ${client.id} (authenticated: ${sseClient.authenticated})`);
// Start ping interval for this client
this.startClientPing(client.id);
// Send initial connection success message
this.sendToClient(sseClient, {
type: 'connection',
status: 'connected',
id: client.id,
authenticated: sseClient.authenticated,
timestamp: new Date().toISOString()
});
return sseClient;
}
private startClientPing(clientId: string) {
const interval = setInterval(() => {
const client = this.clients.get(clientId);
if (!client) {
clearInterval(interval);
return;
}
this.sendToClient(client, {
type: 'ping',
timestamp: new Date().toISOString()
});
}, this.PING_INTERVAL);
}
removeClient(clientId: string) {
if (this.clients.has(clientId)) {
this.clients.delete(clientId);
console.log(`SSE client disconnected: ${clientId}`);
}
}
subscribeToEntity(clientId: string, entityId: string) {
const client = this.clients.get(clientId);
if (client?.authenticated) {
client.subscriptions.entities.add(entityId);
console.log(`Client ${clientId} subscribed to entity: ${entityId}`);
// Send current state if available
const currentState = this.entityStates.get(entityId);
if (currentState) {
this.sendToClient(client, {
type: 'state_changed',
data: {
entity_id: currentState.entity_id,
state: currentState.state,
attributes: currentState.attributes,
last_changed: currentState.last_changed,
last_updated: currentState.last_updated
}
});
}
}
}
subscribeToDomain(clientId: string, domain: string) {
const client = this.clients.get(clientId);
if (client?.authenticated) {
client.subscriptions.domains.add(domain);
console.log(`Client ${clientId} subscribed to domain: ${domain}`);
}
}
subscribeToEvent(clientId: string, eventType: string) {
const client = this.clients.get(clientId);
if (client?.authenticated) {
client.subscriptions.events.add(eventType);
console.log(`Client ${clientId} subscribed to event: ${eventType}`);
}
}
broadcastStateChange(entity: HassEntity) {
// Update stored state
this.entityStates.set(entity.entity_id, entity);
const domain = entity.entity_id.split('.')[0];
const message = {
type: 'state_changed',
data: {
entity_id: entity.entity_id,
state: entity.state,
attributes: entity.attributes,
last_changed: entity.last_changed,
last_updated: entity.last_updated
},
timestamp: new Date().toISOString()
};
console.log(`Broadcasting state change for ${entity.entity_id}`);
// Send to relevant subscribers only
for (const client of this.clients.values()) {
if (!client.authenticated) continue;
if (
client.subscriptions.entities.has(entity.entity_id) ||
client.subscriptions.domains.has(domain) ||
client.subscriptions.events.has('state_changed')
) {
this.sendToClient(client, message);
}
}
}
broadcastEvent(event: HassEvent) {
const message = {
type: event.event_type,
data: event.data,
origin: event.origin,
time_fired: event.time_fired,
context: event.context,
timestamp: new Date().toISOString()
};
console.log(`Broadcasting event: ${event.event_type}`);
// Send to relevant subscribers only
for (const client of this.clients.values()) {
if (!client.authenticated) continue;
if (client.subscriptions.events.has(event.event_type)) {
this.sendToClient(client, message);
}
}
}
private sendToClient(client: SSEClient, data: any) {
try {
// Check rate limit
const now = Date.now();
if (now - client.rateLimit.lastReset > this.RATE_LIMIT_WINDOW) {
client.rateLimit.count = 0;
client.rateLimit.lastReset = now;
}
if (client.rateLimit.count >= this.RATE_LIMIT_MAX_REQUESTS) {
console.warn(`Rate limit exceeded for client ${client.id}`);
this.sendToClient(client, {
type: 'error',
error: 'rate_limit_exceeded',
message: 'Too many requests, please try again later',
timestamp: new Date().toISOString()
});
return;
}
client.rateLimit.count++;
client.lastPing = now;
client.send(JSON.stringify(data));
} catch (error) {
console.error(`Error sending message to client ${client.id}:`, error);
this.removeClient(client.id);
}
}
private validateToken(token?: string): boolean {
if (!token) return false;
// Compare with HASS_TOKEN from environment
return token === process.env.HASS_TOKEN;
}
// Utility methods
getConnectedClients(): number {
return this.clients.size;
}
getClientSubscriptions(clientId: string) {
return this.clients.get(clientId)?.subscriptions;
}
getEntityState(entityId: string): HassEntity | undefined {
return this.entityStates.get(entityId);
}
// Add new event types
broadcastServiceCall(domain: string, service: string, data: any) {
const message = {
type: 'service_called',
data: {
domain,
service,
service_data: data
},
timestamp: new Date().toISOString()
};
this.broadcastToSubscribers('service_called', message);
}
broadcastAutomationTriggered(automationId: string, trigger: any) {
const message = {
type: 'automation_triggered',
data: {
automation_id: automationId,
trigger
},
timestamp: new Date().toISOString()
};
this.broadcastToSubscribers('automation_triggered', message);
}
broadcastScriptExecuted(scriptId: string, data: any) {
const message = {
type: 'script_executed',
data: {
script_id: scriptId,
execution_data: data
},
timestamp: new Date().toISOString()
};
this.broadcastToSubscribers('script_executed', message);
}
private broadcastToSubscribers(eventType: string, message: any) {
for (const client of this.clients.values()) {
if (!client.authenticated) continue;
if (client.subscriptions.events.has(eventType)) {
this.sendToClient(client, message);
}
}
}
// Add statistics methods
getStatistics() {
const now = Date.now();
const stats = {
total_clients: this.clients.size,
authenticated_clients: 0,
total_subscriptions: 0,
clients_by_connection_time: {
less_than_1m: 0,
less_than_5m: 0,
less_than_1h: 0,
more_than_1h: 0
},
total_entities_tracked: this.entityStates.size
};
for (const client of this.clients.values()) {
if (client.authenticated) stats.authenticated_clients++;
stats.total_subscriptions +=
client.subscriptions.entities.size +
client.subscriptions.events.size +
client.subscriptions.domains.size;
const connectionDuration = now - client.connectionTime;
if (connectionDuration < 60000) stats.clients_by_connection_time.less_than_1m++;
else if (connectionDuration < 300000) stats.clients_by_connection_time.less_than_5m++;
else if (connectionDuration < 3600000) stats.clients_by_connection_time.less_than_1h++;
else stats.clients_by_connection_time.more_than_1h++;
}
return stats;
}
}
export const sseManager = SSEManager.getInstance();

View File

@@ -47,4 +47,39 @@ export interface StateChangedEvent extends Event {
new_state: Entity | null;
old_state: Entity | null;
};
}
export interface HassEntity {
entity_id: string;
state: string;
attributes: Record<string, any>;
last_changed?: string;
last_updated?: string;
context?: {
id: string;
parent_id?: string;
user_id?: string;
};
}
export interface HassState {
entity_id: string;
state: string;
attributes: {
friendly_name?: string;
description?: string;
[key: string]: any;
};
}
export interface HassEvent {
event_type: string;
data: any;
origin: string;
time_fired: string;
context: {
id: string;
parent_id?: string;
user_id?: string;
};
}