refactor: enhance middleware and security with advanced protection mechanisms
- Upgraded rate limiter configuration with more granular control and detailed headers - Improved authentication middleware with enhanced token validation and error responses - Implemented advanced input sanitization using sanitize-html with comprehensive XSS protection - Replaced manual security headers with helmet for robust web security configuration - Enhanced error handling middleware with more detailed logging and specific error type handling - Updated SSE rate limiting with burst and window-based restrictions - Improved token validation with more precise signature and claim verification
This commit is contained in:
294
src/sse/index.ts
294
src/sse/index.ts
@@ -7,10 +7,17 @@ const DEFAULT_MAX_CLIENTS = 1000;
|
||||
const DEFAULT_PING_INTERVAL = 30000; // 30 seconds
|
||||
const DEFAULT_CLEANUP_INTERVAL = 60000; // 1 minute
|
||||
const DEFAULT_MAX_CONNECTION_AGE = 24 * 60 * 60 * 1000; // 24 hours
|
||||
const DEFAULT_RATE_LIMIT = {
|
||||
MAX_MESSAGES: 100, // messages
|
||||
WINDOW_MS: 60000, // 1 minute
|
||||
BURST_LIMIT: 10 // max messages per second
|
||||
};
|
||||
|
||||
interface RateLimit {
|
||||
count: number;
|
||||
lastReset: number;
|
||||
burstCount: number;
|
||||
lastBurstReset: number;
|
||||
}
|
||||
|
||||
export interface SSEClient {
|
||||
@@ -32,6 +39,8 @@ interface ClientStats {
|
||||
lastPingAt?: Date;
|
||||
subscriptionCount: number;
|
||||
connectionDuration: number;
|
||||
messagesSent: number;
|
||||
lastActivity: Date;
|
||||
}
|
||||
|
||||
export class SSEManager extends EventEmitter {
|
||||
@@ -42,18 +51,21 @@ export class SSEManager extends EventEmitter {
|
||||
private readonly pingInterval: number;
|
||||
private readonly cleanupInterval: number;
|
||||
private readonly maxConnectionAge: number;
|
||||
private readonly rateLimit: typeof DEFAULT_RATE_LIMIT;
|
||||
|
||||
constructor(options: {
|
||||
maxClients?: number;
|
||||
pingInterval?: number;
|
||||
cleanupInterval?: number;
|
||||
maxConnectionAge?: number;
|
||||
rateLimit?: Partial<typeof DEFAULT_RATE_LIMIT>;
|
||||
} = {}) {
|
||||
super();
|
||||
this.maxClients = options.maxClients || DEFAULT_MAX_CLIENTS;
|
||||
this.pingInterval = options.pingInterval || DEFAULT_PING_INTERVAL;
|
||||
this.cleanupInterval = options.cleanupInterval || DEFAULT_CLEANUP_INTERVAL;
|
||||
this.maxConnectionAge = options.maxConnectionAge || DEFAULT_MAX_CONNECTION_AGE;
|
||||
this.rateLimit = { ...DEFAULT_RATE_LIMIT, ...options.rateLimit };
|
||||
|
||||
console.log('Initializing SSE Manager...');
|
||||
this.startMaintenanceTasks();
|
||||
@@ -63,15 +75,17 @@ export class SSEManager extends EventEmitter {
|
||||
// Send periodic pings to keep connections alive
|
||||
setInterval(() => {
|
||||
this.clients.forEach(client => {
|
||||
try {
|
||||
client.send(JSON.stringify({
|
||||
type: 'ping',
|
||||
timestamp: new Date().toISOString()
|
||||
}));
|
||||
client.lastPingAt = new Date();
|
||||
} catch (error) {
|
||||
console.error(`Failed to ping client ${client.id}:`, error);
|
||||
this.removeClient(client.id);
|
||||
if (!this.isRateLimited(client)) {
|
||||
try {
|
||||
client.send(JSON.stringify({
|
||||
type: 'ping',
|
||||
timestamp: new Date().toISOString()
|
||||
}));
|
||||
client.lastPingAt = new Date();
|
||||
} catch (error) {
|
||||
console.error(`Failed to ping client ${client.id}:`, error);
|
||||
this.removeClient(client.id);
|
||||
}
|
||||
}
|
||||
});
|
||||
}, this.pingInterval);
|
||||
@@ -98,7 +112,7 @@ export class SSEManager extends EventEmitter {
|
||||
return SSEManager.instance;
|
||||
}
|
||||
|
||||
addClient(client: Omit<SSEClient, 'authenticated' | 'subscriptions'>, token: string): SSEClient | null {
|
||||
addClient(client: Omit<SSEClient, 'authenticated' | 'subscriptions' | 'rateLimit'>, token: string): SSEClient | null {
|
||||
// Validate token
|
||||
const validationResult = TokenManager.validateToken(token, client.ip);
|
||||
if (!validationResult.valid) {
|
||||
@@ -117,7 +131,13 @@ export class SSEManager extends EventEmitter {
|
||||
...client,
|
||||
authenticated: true,
|
||||
subscriptions: new Set(),
|
||||
lastPingAt: new Date()
|
||||
lastPingAt: new Date(),
|
||||
rateLimit: {
|
||||
count: 0,
|
||||
lastReset: Date.now(),
|
||||
burstCount: 0,
|
||||
lastBurstReset: Date.now()
|
||||
}
|
||||
};
|
||||
|
||||
this.clients.set(client.id, newClient);
|
||||
@@ -126,22 +146,46 @@ export class SSEManager extends EventEmitter {
|
||||
return newClient;
|
||||
}
|
||||
|
||||
private startClientPing(clientId: string) {
|
||||
const interval = setInterval(() => {
|
||||
const client = this.clients.get(clientId);
|
||||
if (!client) {
|
||||
clearInterval(interval);
|
||||
return;
|
||||
}
|
||||
private isRateLimited(client: SSEClient): boolean {
|
||||
const now = Date.now();
|
||||
|
||||
this.sendToClient(client, {
|
||||
type: 'ping',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
}, this.pingInterval);
|
||||
// Reset window counters if needed
|
||||
if (now - client.rateLimit.lastReset >= this.rateLimit.WINDOW_MS) {
|
||||
client.rateLimit.count = 0;
|
||||
client.rateLimit.lastReset = now;
|
||||
}
|
||||
|
||||
// Reset burst counters if needed (every second)
|
||||
if (now - client.rateLimit.lastBurstReset >= 1000) {
|
||||
client.rateLimit.burstCount = 0;
|
||||
client.rateLimit.lastBurstReset = now;
|
||||
}
|
||||
|
||||
// Check both window and burst limits
|
||||
return (
|
||||
client.rateLimit.count >= this.rateLimit.MAX_MESSAGES ||
|
||||
client.rateLimit.burstCount >= this.rateLimit.BURST_LIMIT
|
||||
);
|
||||
}
|
||||
|
||||
removeClient(clientId: string) {
|
||||
private updateRateLimit(client: SSEClient): void {
|
||||
const now = Date.now();
|
||||
client.rateLimit.count++;
|
||||
client.rateLimit.burstCount++;
|
||||
|
||||
// Update timestamps if needed
|
||||
if (now - client.rateLimit.lastReset >= this.rateLimit.WINDOW_MS) {
|
||||
client.rateLimit.lastReset = now;
|
||||
client.rateLimit.count = 1;
|
||||
}
|
||||
|
||||
if (now - client.rateLimit.lastBurstReset >= 1000) {
|
||||
client.rateLimit.lastBurstReset = now;
|
||||
client.rateLimit.burstCount = 1;
|
||||
}
|
||||
}
|
||||
|
||||
removeClient(clientId: string): void {
|
||||
if (this.clients.has(clientId)) {
|
||||
this.clients.delete(clientId);
|
||||
console.log(`SSE client disconnected: ${clientId}`);
|
||||
@@ -152,46 +196,55 @@ export class SSEManager extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
subscribeToEntity(clientId: string, entityId: string) {
|
||||
subscribeToEntity(clientId: string, entityId: string): void {
|
||||
const client = this.clients.get(clientId);
|
||||
if (client?.authenticated) {
|
||||
client.subscriptions.add(`entity:${entityId}`);
|
||||
console.log(`Client ${clientId} subscribed to entity: ${entityId}`);
|
||||
if (!client?.authenticated) {
|
||||
console.warn(`Unauthenticated client ${clientId} attempted to subscribe to entity: ${entityId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Send current state if available
|
||||
const currentState = this.entityStates.get(entityId);
|
||||
if (currentState) {
|
||||
this.sendToClient(client, {
|
||||
type: 'state_changed',
|
||||
data: {
|
||||
entity_id: currentState.entity_id,
|
||||
state: currentState.state,
|
||||
attributes: currentState.attributes,
|
||||
last_changed: currentState.last_changed,
|
||||
last_updated: currentState.last_updated
|
||||
}
|
||||
});
|
||||
}
|
||||
client.subscriptions.add(`entity:${entityId}`);
|
||||
console.log(`Client ${clientId} subscribed to entity: ${entityId}`);
|
||||
|
||||
// Send current state if available
|
||||
const currentState = this.entityStates.get(entityId);
|
||||
if (currentState && !this.isRateLimited(client)) {
|
||||
this.sendToClient(client, {
|
||||
type: 'state_changed',
|
||||
data: {
|
||||
entity_id: currentState.entity_id,
|
||||
state: currentState.state,
|
||||
attributes: currentState.attributes,
|
||||
last_changed: currentState.last_changed,
|
||||
last_updated: currentState.last_updated
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
subscribeToDomain(clientId: string, domain: string) {
|
||||
subscribeToDomain(clientId: string, domain: string): void {
|
||||
const client = this.clients.get(clientId);
|
||||
if (client?.authenticated) {
|
||||
client.subscriptions.add(`domain:${domain}`);
|
||||
console.log(`Client ${clientId} subscribed to domain: ${domain}`);
|
||||
if (!client?.authenticated) {
|
||||
console.warn(`Unauthenticated client ${clientId} attempted to subscribe to domain: ${domain}`);
|
||||
return;
|
||||
}
|
||||
|
||||
client.subscriptions.add(`domain:${domain}`);
|
||||
console.log(`Client ${clientId} subscribed to domain: ${domain}`);
|
||||
}
|
||||
|
||||
subscribeToEvent(clientId: string, eventType: string) {
|
||||
subscribeToEvent(clientId: string, eventType: string): void {
|
||||
const client = this.clients.get(clientId);
|
||||
if (client?.authenticated) {
|
||||
client.subscriptions.add(`event:${eventType}`);
|
||||
console.log(`Client ${clientId} subscribed to event: ${eventType}`);
|
||||
if (!client?.authenticated) {
|
||||
console.warn(`Unauthenticated client ${clientId} attempted to subscribe to event: ${eventType}`);
|
||||
return;
|
||||
}
|
||||
|
||||
client.subscriptions.add(`event:${eventType}`);
|
||||
console.log(`Client ${clientId} subscribed to event: ${eventType}`);
|
||||
}
|
||||
|
||||
broadcastStateChange(entity: HassEntity) {
|
||||
broadcastStateChange(entity: HassEntity): void {
|
||||
// Update stored state
|
||||
this.entityStates.set(entity.entity_id, entity);
|
||||
|
||||
@@ -211,8 +264,8 @@ export class SSEManager extends EventEmitter {
|
||||
console.log(`Broadcasting state change for ${entity.entity_id}`);
|
||||
|
||||
// Send to relevant subscribers only
|
||||
for (const client of this.clients.values()) {
|
||||
if (!client.authenticated) continue;
|
||||
this.clients.forEach(client => {
|
||||
if (!client.authenticated || this.isRateLimited(client)) return;
|
||||
|
||||
if (
|
||||
client.subscriptions.has(`entity:${entity.entity_id}`) ||
|
||||
@@ -221,10 +274,10 @@ export class SSEManager extends EventEmitter {
|
||||
) {
|
||||
this.sendToClient(client, message);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
broadcastEvent(event: HassEvent) {
|
||||
broadcastEvent(event: HassEvent): void {
|
||||
const message = {
|
||||
type: event.event_type,
|
||||
data: event.data,
|
||||
@@ -237,117 +290,36 @@ export class SSEManager extends EventEmitter {
|
||||
console.log(`Broadcasting event: ${event.event_type}`);
|
||||
|
||||
// Send to relevant subscribers only
|
||||
for (const client of this.clients.values()) {
|
||||
if (!client.authenticated) continue;
|
||||
this.clients.forEach(client => {
|
||||
if (!client.authenticated || this.isRateLimited(client)) return;
|
||||
|
||||
if (client.subscriptions.has(`event:${event.event_type}`)) {
|
||||
this.sendToClient(client, message);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private sendToClient(client: SSEClient, data: any) {
|
||||
private sendToClient(client: SSEClient, data: unknown): void {
|
||||
try {
|
||||
// Check rate limit
|
||||
const now = Date.now();
|
||||
if (now - client.rateLimit.lastReset > this.cleanupInterval) {
|
||||
client.rateLimit.count = 0;
|
||||
client.rateLimit.lastReset = now;
|
||||
}
|
||||
|
||||
if (client.rateLimit.count >= 1000) {
|
||||
console.warn(`Rate limit exceeded for client ${client.id}`);
|
||||
this.sendToClient(client, {
|
||||
type: 'error',
|
||||
error: 'rate_limit_exceeded',
|
||||
message: 'Too many requests, please try again later',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
if (!client.authenticated) {
|
||||
console.warn(`Attempted to send message to unauthenticated client ${client.id}`);
|
||||
return;
|
||||
}
|
||||
|
||||
client.rateLimit.count++;
|
||||
client.lastPingAt = new Date();
|
||||
client.send(JSON.stringify(data));
|
||||
if (this.isRateLimited(client)) {
|
||||
console.warn(`Rate limit exceeded for client ${client.id}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const message = typeof data === 'string' ? data : JSON.stringify(data);
|
||||
client.send(message);
|
||||
this.updateRateLimit(client);
|
||||
} catch (error) {
|
||||
console.error(`Error sending message to client ${client.id}:`, error);
|
||||
console.error(`Failed to send message to client ${client.id}:`, error);
|
||||
this.removeClient(client.id);
|
||||
}
|
||||
}
|
||||
|
||||
private validateToken(token?: string): boolean {
|
||||
if (!token) return false;
|
||||
const validationResult = TokenManager.validateToken(token);
|
||||
return validationResult.valid;
|
||||
}
|
||||
|
||||
// Utility methods
|
||||
getConnectedClients(): number {
|
||||
return this.clients.size;
|
||||
}
|
||||
|
||||
getClientSubscriptions(clientId: string) {
|
||||
return this.clients.get(clientId)?.subscriptions;
|
||||
}
|
||||
|
||||
getEntityState(entityId: string): HassEntity | undefined {
|
||||
return this.entityStates.get(entityId);
|
||||
}
|
||||
|
||||
// Add new event types
|
||||
broadcastServiceCall(domain: string, service: string, data: any) {
|
||||
const message = {
|
||||
type: 'service_called',
|
||||
data: {
|
||||
domain,
|
||||
service,
|
||||
service_data: data
|
||||
},
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
this.broadcastToSubscribers('service_called', message);
|
||||
}
|
||||
|
||||
broadcastAutomationTriggered(automationId: string, trigger: any) {
|
||||
const message = {
|
||||
type: 'automation_triggered',
|
||||
data: {
|
||||
automation_id: automationId,
|
||||
trigger
|
||||
},
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
this.broadcastToSubscribers('automation_triggered', message);
|
||||
}
|
||||
|
||||
broadcastScriptExecuted(scriptId: string, data: any) {
|
||||
const message = {
|
||||
type: 'script_executed',
|
||||
data: {
|
||||
script_id: scriptId,
|
||||
execution_data: data
|
||||
},
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
this.broadcastToSubscribers('script_executed', message);
|
||||
}
|
||||
|
||||
private broadcastToSubscribers(eventType: string, message: any) {
|
||||
for (const client of this.clients.values()) {
|
||||
if (!client.authenticated) continue;
|
||||
|
||||
if (client.subscriptions.has(`event:${eventType}`) ||
|
||||
client.subscriptions.has(`entity:${eventType}`) ||
|
||||
client.subscriptions.has(`domain:${eventType.split('.')[0]}`)) {
|
||||
this.sendToClient(client, message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add statistics methods
|
||||
getStatistics(): {
|
||||
totalClients: number;
|
||||
authenticatedClients: number;
|
||||
@@ -356,31 +328,35 @@ export class SSEManager extends EventEmitter {
|
||||
} {
|
||||
const now = Date.now();
|
||||
const clientStats: ClientStats[] = [];
|
||||
const subscriptionCounts: { [key: string]: number } = {};
|
||||
const subscriptionStats: { [key: string]: number } = {};
|
||||
let authenticatedClients = 0;
|
||||
|
||||
this.clients.forEach(client => {
|
||||
// Collect client statistics
|
||||
if (client.authenticated) {
|
||||
authenticatedClients++;
|
||||
}
|
||||
|
||||
clientStats.push({
|
||||
id: client.id,
|
||||
ip: client.ip,
|
||||
connectedAt: client.connectedAt,
|
||||
lastPingAt: client.lastPingAt,
|
||||
subscriptionCount: client.subscriptions.size,
|
||||
connectionDuration: now - client.connectedAt.getTime()
|
||||
connectionDuration: now - client.connectedAt.getTime(),
|
||||
messagesSent: client.rateLimit.count,
|
||||
lastActivity: new Date(client.rateLimit.lastReset)
|
||||
});
|
||||
|
||||
// Count subscriptions by type
|
||||
client.subscriptions.forEach(sub => {
|
||||
const [type] = sub.split(':');
|
||||
subscriptionCounts[type] = (subscriptionCounts[type] || 0) + 1;
|
||||
subscriptionStats[sub] = (subscriptionStats[sub] || 0) + 1;
|
||||
});
|
||||
});
|
||||
|
||||
return {
|
||||
totalClients: this.clients.size,
|
||||
authenticatedClients: Array.from(this.clients.values()).filter(c => c.authenticated).length,
|
||||
authenticatedClients,
|
||||
clientStats,
|
||||
subscriptionStats: subscriptionCounts
|
||||
subscriptionStats
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user