package collector import ( "encoding/json" "log" "os" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) type Options struct { Brokers []string `yaml:"brokers" env:"MQTT_BROKERS"` ClientID string `yaml:"client_id" env:"MQTT_CLIENT_ID"` Username string `yaml:"username"` Password string `yaml:"password"` Protocol string } type DeviceConfig struct { Callsign string `yaml:"callsign" env:"DEVICE_CALLSIGN"` Device string `yaml:"device" env:"DEVICE"` Manufacturer string `yaml:"manufacturer" env:"DEVICE_MANUFACTURER"` RXFrequency float64 `yaml:"rx_frequency" env:"DEVICE_RX_FREQ"` // in Hz RXGain float64 `yaml:"rx_gain" env:"DEVICE_RX_GAIN"` // in dBm TXFrequency float64 `yaml:"tx_frequency" env:"DEVICE_TX_FREQ"` // in Hz TXPower float64 `yaml:"tx_power" env:"DEVICE_TX_GAIN"` // in dBm } type Client struct { client mqtt.Client options *Options topicCollector string topicDevice string topicPacket string } func NewClient(options *Options) (*Client, error) { if options.ClientID == "" { options.ClientID, _ = os.Hostname() } opts := mqtt.NewClientOptions() opts.SetCredentialsProvider(func() (username, password string) { return options.Username, options.Password }) opts.SetClientID(options.ClientID) if len(options.Brokers) == 0 { log.Println("collector: no brokers configured, using localhost") opts.AddBroker("localhost:1883") } else { for _, broker := range options.Brokers { log.Printf("collector: adding broker at tcp://%s", broker) opts.AddBroker(broker) } } if options.Protocol == "" { options.Protocol = "unspec" } client := &Client{ options: options, topicCollector: "collector/" + options.ClientID, topicDevice: "device", topicPacket: "packet/" + options.Protocol, } // Last will and testament. opts.SetWill(client.topicCollector+"/status", client.collectorStatus("offline"), 1, true) // Logging opts.OnConnect = func(c mqtt.Client) { log.Println("collector: connected to MQTT broker") token := c.Publish(client.topicCollector+"/status", 1, false, client.collectorStatus("online")) if token.Wait() && token.Error() != nil { log.Printf("collector: failed to signal collector online: %v", token.Error()) c.Disconnect(0) return } } opts.OnConnectionLost = func(c mqtt.Client, err error) { log.Printf("collector: disconnected from MQTT broker: %v", err) } // Connect to broker(s). client.client = mqtt.NewClient(opts) token := client.client.Connect() if token.Wait() && token.Error() != nil { return nil, token.Error() } return client, nil } func (c *Client) collectorStatus(status string) string { b, _ := json.Marshal(map[string]any{ "status": status, "time": time.Now().UTC(), "id": c.options.ClientID, "protocol": c.options.Protocol, }) return string(b) } func (c *Client) deviceStatus(config *DeviceConfig, status string) string { b, _ := json.Marshal(map[string]any{ "status": status, "time": time.Now().UTC(), "protocol": c.options.Protocol, "callsign": config.Callsign, "device": config.Device, "manufacturer": config.Manufacturer, "rx_freq": config.RXFrequency, "rx_gain": config.RXGain, "tx_freq": config.TXFrequency, "tx_power": config.TXPower, }) return string(b) } func (c *Client) DeviceOnline(config *DeviceConfig) error { return c.setDeviceStatus(config, "online") } func (c *Client) DeviceOffline(config *DeviceConfig) error { return c.setDeviceStatus(config, "offline") } func (c *Client) setDeviceStatus(config *DeviceConfig, status string) error { log.Printf("collector: device %s/%s is %s", c.options.ClientID, config.Callsign, status) token := c.client.Publish(c.topicDevice+"/"+config.Callsign, 1, false, c.deviceStatus(config, status)) return token.Error() } func (c *Client) PublishPacket(id string, packet any) error { var payload string switch packet := packet.(type) { case string: payload = packet case []byte: payload = string(packet) default: b, err := json.Marshal(packet) if err != nil { return err } payload = string(b) } token := c.client.Publish(c.topicPacket, 0, false, payload) return token.Error() }