Checkpoint
This commit is contained in:
315
ui/src/services/Stream.ts
Normal file
315
ui/src/services/Stream.ts
Normal file
@@ -0,0 +1,315 @@
|
||||
import mqtt from "mqtt";
|
||||
import type { StreamConnectionOptions, StreamState, TopicSubscription } from "../types/stream.types";
|
||||
|
||||
const defaultConnectionOptions: StreamConnectionOptions = {
|
||||
url: import.meta.env.DEV
|
||||
? 'ws://10.42.23.73:8083'
|
||||
: ((window.location.protocol === 'http:') ? 'ws:' : 'wss:') + '//' + window.location.host + '/broker'
|
||||
}
|
||||
|
||||
export abstract class BaseStream {
|
||||
protected client: mqtt.MqttClient | null = null;
|
||||
protected connectionOptions: StreamConnectionOptions;
|
||||
protected subscribers: Map<string, Set<(data: any, topic: string) => void>> = new Map();
|
||||
protected stateSubscribers: Set<(state: StreamState) => void> = new Set();
|
||||
protected reconnectTimer: NodeJS.Timeout | null = null;
|
||||
protected autoConnect: boolean;
|
||||
|
||||
protected state: StreamState = {
|
||||
isConnected: false,
|
||||
isConnecting: false,
|
||||
error: null,
|
||||
subscriptions: new Map(),
|
||||
lastMessages: new Map(),
|
||||
};
|
||||
|
||||
constructor(connectionOptions: Partial<StreamConnectionOptions>, autoConnect: boolean = true) {
|
||||
this.connectionOptions = {
|
||||
...defaultConnectionOptions,
|
||||
...connectionOptions,
|
||||
}
|
||||
this.autoConnect = autoConnect;
|
||||
|
||||
if (autoConnect) {
|
||||
this.connect();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract decodeMessage(topic: string, payload: Uint8Array): any;
|
||||
protected validateMessage?(topic: string, data: any): boolean;
|
||||
|
||||
public connect(): void {
|
||||
if (this.client?.connected || this.state.isConnecting) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.updateState({ isConnecting: true, error: null });
|
||||
|
||||
try {
|
||||
const randomId = Math.random().toString(16).slice(2, 10);
|
||||
const prefix = import.meta.env.DEV ? 'dev_' : '';
|
||||
const defaultClientId = `${prefix}hamview_${randomId}`;
|
||||
|
||||
this.client = mqtt.connect(this.connectionOptions.url, {
|
||||
...this.connectionOptions.options,
|
||||
clientId: this.connectionOptions.options?.clientId || defaultClientId,
|
||||
});
|
||||
this.setupEventHandlers();
|
||||
} catch (error) {
|
||||
this.handleError(error);
|
||||
}
|
||||
}
|
||||
|
||||
public disconnect(): void {
|
||||
if (this.reconnectTimer) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
}
|
||||
|
||||
if (this.client) {
|
||||
this.client.end(true, () => {
|
||||
this.updateState({
|
||||
isConnected: false,
|
||||
isConnecting: false,
|
||||
subscriptions: new Map(),
|
||||
lastMessages: new Map(),
|
||||
});
|
||||
});
|
||||
this.client = null;
|
||||
}
|
||||
}
|
||||
|
||||
public subscribe<T = any>(
|
||||
topic: string,
|
||||
callback: (data: T, topic: string) => void,
|
||||
qos: 0 | 1 | 2 = 0
|
||||
): () => void {
|
||||
// Add to subscribers map
|
||||
if (!this.subscribers.has(topic)) {
|
||||
this.subscribers.set(topic, new Set());
|
||||
}
|
||||
|
||||
const topicSubscribers = this.subscribers.get(topic)!;
|
||||
topicSubscribers.add(callback);
|
||||
|
||||
// Update state subscriptions
|
||||
const subscription: TopicSubscription<T> = { topic, qos, dataType: undefined as any };
|
||||
this.state.subscriptions.set(topic, subscription);
|
||||
|
||||
// If connected, subscribe to the topic on the broker
|
||||
if (this.client?.connected) {
|
||||
this.client.subscribe(topic, { qos }, (error) => {
|
||||
if (error) {
|
||||
console.error(`Failed to subscribe to ${topic}:`, error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Return unsubscribe function
|
||||
return () => {
|
||||
const topicSubscribers = this.subscribers.get(topic);
|
||||
if (topicSubscribers) {
|
||||
topicSubscribers.delete(callback);
|
||||
|
||||
// If no more subscribers for this topic, unsubscribe from broker
|
||||
if (topicSubscribers.size === 0) {
|
||||
this.subscribers.delete(topic);
|
||||
this.state.subscriptions.delete(topic);
|
||||
this.state.lastMessages.delete(topic);
|
||||
|
||||
if (this.client?.connected) {
|
||||
this.client.unsubscribe(topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public subscribeMany(
|
||||
subscriptions: Array<{ topic: string; qos?: 0 | 1 | 2 }>,
|
||||
callback: (data: any, topic: string) => void
|
||||
): () => void {
|
||||
const unsubscribers = subscriptions.map(({ topic, qos = 0 }) =>
|
||||
this.subscribe(topic, callback, qos)
|
||||
);
|
||||
|
||||
return () => {
|
||||
unsubscribers.forEach(unsub => unsub());
|
||||
};
|
||||
}
|
||||
|
||||
public subscribeToState(callback: (state: StreamState) => void): () => void {
|
||||
this.stateSubscribers.add(callback);
|
||||
callback(this.state); // Immediately send current state
|
||||
|
||||
return () => {
|
||||
this.stateSubscribers.delete(callback);
|
||||
};
|
||||
}
|
||||
|
||||
public getLastMessage<T = any>(topic: string): T | undefined {
|
||||
return this.state.lastMessages.get(topic) as T | undefined;
|
||||
}
|
||||
|
||||
public getSubscriptions(): Map<string, TopicSubscription> {
|
||||
return new Map(this.state.subscriptions);
|
||||
}
|
||||
|
||||
public getState(): StreamState {
|
||||
return {
|
||||
...this.state,
|
||||
subscriptions: new Map(this.state.subscriptions),
|
||||
lastMessages: new Map(this.state.lastMessages),
|
||||
};
|
||||
}
|
||||
|
||||
public isSubscribed(topic: string): boolean {
|
||||
return this.state.subscriptions.has(topic);
|
||||
}
|
||||
|
||||
public destroy(): void {
|
||||
this.disconnect();
|
||||
this.subscribers.clear();
|
||||
this.stateSubscribers.clear();
|
||||
}
|
||||
|
||||
private setupEventHandlers(): void {
|
||||
if (!this.client) return;
|
||||
|
||||
this.client.on('connect', () => {
|
||||
this.updateState({ isConnected: true, isConnecting: false, error: null });
|
||||
|
||||
// Resubscribe to all topics after reconnection
|
||||
const subscriptions = Array.from(this.state.subscriptions.entries());
|
||||
if (subscriptions.length > 0 && this.client) {
|
||||
const subscribePromises = subscriptions.map(([topic, sub]) => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
this.client?.subscribe(topic, { qos: sub.qos || 0 }, (error) => {
|
||||
if (error) reject(error);
|
||||
else resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Promise.all(subscribePromises).catch(error => {
|
||||
console.error('Failed to resubscribe to topics:', error);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
this.client.on('message', (topic, payload) => {
|
||||
try {
|
||||
const uint8Array = new Uint8Array(payload);
|
||||
const data = this.decodeMessage(topic, uint8Array);
|
||||
|
||||
// Validate if validator is provided
|
||||
if (this.validateMessage && !this.validateMessage(topic, data)) {
|
||||
console.warn('Invalid message received on topic:', topic, data);
|
||||
return;
|
||||
}
|
||||
|
||||
// Update last message for this topic
|
||||
this.state.lastMessages.set(topic, data);
|
||||
this.updateState({ lastMessages: new Map(this.state.lastMessages) });
|
||||
|
||||
// Notify subscribers for this topic
|
||||
this.notifySubscribers(topic, data);
|
||||
} catch (error) {
|
||||
console.error(`Error processing message on topic ${topic}:`, error);
|
||||
}
|
||||
});
|
||||
|
||||
this.client.on('error', (error) => {
|
||||
this.handleError(error);
|
||||
});
|
||||
|
||||
this.client.on('close', () => {
|
||||
this.updateState({ isConnected: false });
|
||||
this.attemptReconnect();
|
||||
});
|
||||
}
|
||||
|
||||
private notifySubscribers(topic: string, data: any): void {
|
||||
const topicSubscribers = this.subscribers.get(topic);
|
||||
if (topicSubscribers) {
|
||||
topicSubscribers.forEach(callback => {
|
||||
try {
|
||||
callback(data, topic);
|
||||
} catch (error) {
|
||||
console.error('Error in subscriber callback:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Also notify wildcard subscribers (those subscribed to patterns like 'sensors/+')
|
||||
this.subscribers.forEach((subscribers, subscribedTopic) => {
|
||||
if (subscribedTopic !== topic && this.topicMatches(subscribedTopic, topic)) {
|
||||
subscribers.forEach(callback => {
|
||||
try {
|
||||
callback(data, topic);
|
||||
} catch (error) {
|
||||
console.error('Error in wildcard subscriber callback:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private topicMatches(subscription: string, topic: string): boolean {
|
||||
const subParts = subscription.split('/');
|
||||
const topicParts = topic.split('/');
|
||||
|
||||
// Handle multi-level wildcard
|
||||
if (subscription.includes('#')) {
|
||||
const wildcardIndex = subParts.indexOf('#');
|
||||
// Check if all parts before the wildcard match
|
||||
for (let i = 0; i < wildcardIndex; i++) {
|
||||
if (subParts[i] !== '+' && subParts[i] !== topicParts[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Handle single-level wildcards and exact matches
|
||||
if (subParts.length !== topicParts.length) return false;
|
||||
|
||||
for (let i = 0; i < subParts.length; i++) {
|
||||
if (subParts[i] !== '+' && subParts[i] !== topicParts[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private updateState(updates: Partial<StreamState>) {
|
||||
this.state = { ...this.state, ...updates };
|
||||
this.stateSubscribers.forEach(callback => {
|
||||
try {
|
||||
callback(this.state);
|
||||
} catch (error) {
|
||||
console.error('Error in state subscriber callback:', error);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
private handleError(error: any): void {
|
||||
const errorObj = error instanceof Error ? error : new Error(String(error));
|
||||
this.updateState({ error: errorObj, isConnecting: false });
|
||||
console.error('Stream error:', errorObj);
|
||||
}
|
||||
|
||||
private attemptReconnect(): void {
|
||||
if (!this.autoConnect || this.reconnectTimer) return;
|
||||
|
||||
const reconnectPeriod = this.connectionOptions.options?.reconnectPeriod || 5000;
|
||||
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.reconnectTimer = null;
|
||||
if (!this.state.isConnected && !this.state.isConnecting) {
|
||||
this.connect();
|
||||
}
|
||||
}, reconnectPeriod);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user