import mqtt from "mqtt"; import type { StreamConnectionOptions, StreamState, TopicSubscription } from "../types/stream.types"; const defaultConnectionOptions: StreamConnectionOptions = { url: import.meta.env.DEV ? 'wss://pd0mz.hamnet.nl/broker' : ((window.location.protocol === 'http:') ? 'ws:' : 'wss:') + '//' + window.location.host + '/broker' } export abstract class BaseStream { protected client: mqtt.MqttClient | null = null; protected connectionOptions: StreamConnectionOptions; protected subscribers: Map void>> = new Map(); protected stateSubscribers: Set<(state: StreamState) => void> = new Set(); protected reconnectTimer: ReturnType | null = null; protected autoConnect: boolean; protected state: StreamState = { isConnected: false, isConnecting: false, error: null, subscriptions: new Map(), lastMessages: new Map(), }; constructor(connectionOptions: Partial, autoConnect: boolean = true) { this.connectionOptions = { ...defaultConnectionOptions, ...connectionOptions, } this.autoConnect = autoConnect; if (autoConnect) { this.connect(); } } protected abstract decodeMessage(topic: string, payload: Uint8Array): any; protected validateMessage?(topic: string, data: any): boolean; public connect(): void { if (this.client?.connected || this.state.isConnecting) { return; } this.updateState({ isConnecting: true, error: null }); try { const randomId = Math.random().toString(16).slice(2, 10); const prefix = import.meta.env.DEV ? 'dev_' : ''; const defaultClientId = `${prefix}web_${randomId}`; console.log(`Connecting to MQTT broker at ${this.connectionOptions.url} with clientId ${defaultClientId}`); this.client = mqtt.connect(this.connectionOptions.url, { ...this.connectionOptions.options, clientId: this.connectionOptions.options?.clientId || defaultClientId, }); this.setupEventHandlers(); } catch (error) { this.handleError(error); } } public disconnect(): void { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.client) { this.client.end(true, () => { this.updateState({ isConnected: false, isConnecting: false, subscriptions: new Map(), lastMessages: new Map(), }); }); this.client = null; } } public subscribe( topic: string, callback: (data: T, topic: string) => void, qos: 0 | 1 | 2 = 0 ): () => void { // Add to subscribers map if (!this.subscribers.has(topic)) { this.subscribers.set(topic, new Set()); } const topicSubscribers = this.subscribers.get(topic)!; topicSubscribers.add(callback); // Update state subscriptions const subscription: TopicSubscription = { topic, qos, dataType: undefined as any }; this.state.subscriptions.set(topic, subscription); // If connected, subscribe to the topic on the broker if (this.client?.connected) { this.client.subscribe(topic, { qos }, (error) => { if (error) { console.error(`Failed to subscribe to ${topic}:`, error); } }); } // Return unsubscribe function return () => { const topicSubscribers = this.subscribers.get(topic); if (topicSubscribers) { topicSubscribers.delete(callback); // If no more subscribers for this topic, unsubscribe from broker if (topicSubscribers.size === 0) { this.subscribers.delete(topic); this.state.subscriptions.delete(topic); this.state.lastMessages.delete(topic); if (this.client?.connected) { this.client.unsubscribe(topic); } } } }; } public subscribeMany( subscriptions: Array<{ topic: string; qos?: 0 | 1 | 2 }>, callback: (data: any, topic: string) => void ): () => void { const unsubscribers = subscriptions.map(({ topic, qos = 0 }) => this.subscribe(topic, callback, qos) ); return () => { unsubscribers.forEach(unsub => unsub()); }; } public subscribeToState(callback: (state: StreamState) => void): () => void { this.stateSubscribers.add(callback); callback(this.state); // Immediately send current state return () => { this.stateSubscribers.delete(callback); }; } public getLastMessage(topic: string): T | undefined { return this.state.lastMessages.get(topic) as T | undefined; } public getSubscriptions(): Map { return new Map(this.state.subscriptions); } public getState(): StreamState { return { ...this.state, subscriptions: new Map(this.state.subscriptions), lastMessages: new Map(this.state.lastMessages), }; } public isSubscribed(topic: string): boolean { return this.state.subscriptions.has(topic); } public destroy(): void { this.disconnect(); this.subscribers.clear(); this.stateSubscribers.clear(); } private setupEventHandlers(): void { if (!this.client) return; this.client.on('connect', () => { this.updateState({ isConnected: true, isConnecting: false, error: null }); // Resubscribe to all topics after reconnection const subscriptions = Array.from(this.state.subscriptions.entries()); if (subscriptions.length > 0 && this.client) { const subscribePromises = subscriptions.map(([topic, sub]) => { return new Promise((resolve, reject) => { this.client?.subscribe(topic, { qos: sub.qos || 0 }, (error) => { if (error) reject(error); else resolve(); }); }); }); Promise.all(subscribePromises).catch(error => { console.error('Failed to resubscribe to topics:', error); }); } }); this.client.on('message', (topic, payload) => { try { const uint8Array = new Uint8Array(payload); const data = this.decodeMessage(topic, uint8Array); // Validate if validator is provided if (this.validateMessage && !this.validateMessage(topic, data)) { console.warn('Invalid message received on topic:', topic, data); return; } // Update last message for this topic this.state.lastMessages.set(topic, data); this.updateState({ lastMessages: new Map(this.state.lastMessages) }); // Notify subscribers for this topic this.notifySubscribers(topic, data); } catch (error) { console.error(`Error processing message on topic ${topic}:`, error); } }); this.client.on('error', (error) => { this.handleError(error); }); this.client.on('close', () => { this.updateState({ isConnected: false }); this.attemptReconnect(); }); } private notifySubscribers(topic: string, data: any): void { const topicSubscribers = this.subscribers.get(topic); if (topicSubscribers) { topicSubscribers.forEach(callback => { try { callback(data, topic); } catch (error) { console.error('Error in subscriber callback:', error); } }); } // Also notify wildcard subscribers (those subscribed to patterns like 'sensors/+') this.subscribers.forEach((subscribers, subscribedTopic) => { if (subscribedTopic !== topic && this.topicMatches(subscribedTopic, topic)) { subscribers.forEach(callback => { try { callback(data, topic); } catch (error) { console.error('Error in wildcard subscriber callback:', error); } }); } }); } private topicMatches(subscription: string, topic: string): boolean { const subParts = subscription.split('/'); const topicParts = topic.split('/'); // Handle multi-level wildcard if (subscription.includes('#')) { const wildcardIndex = subParts.indexOf('#'); // Check if all parts before the wildcard match for (let i = 0; i < wildcardIndex; i++) { if (subParts[i] !== '+' && subParts[i] !== topicParts[i]) { return false; } } return true; } // Handle single-level wildcards and exact matches if (subParts.length !== topicParts.length) return false; for (let i = 0; i < subParts.length; i++) { if (subParts[i] !== '+' && subParts[i] !== topicParts[i]) { return false; } } return true; } private updateState(updates: Partial) { this.state = { ...this.state, ...updates }; this.stateSubscribers.forEach(callback => { try { callback(this.state); } catch (error) { console.error('Error in state subscriber callback:', error); } }) } private handleError(error: any): void { const errorObj = error instanceof Error ? error : new Error(String(error)); this.updateState({ error: errorObj, isConnecting: false }); console.error('Stream error:', errorObj); } private attemptReconnect(): void { if (!this.autoConnect || this.reconnectTimer) return; const reconnectPeriod = this.connectionOptions.options?.reconnectPeriod || 5000; this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; if (!this.state.isConnected && !this.state.isConnecting) { this.connect(); } }, reconnectPeriod); } }