Files
ham/internal/collector/client.go
2026-02-14 15:59:31 +01:00

154 lines
4.2 KiB
Go

package collector
import (
"encoding/json"
"log"
"os"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type Options struct {
Brokers []string `yaml:"brokers" env:"MQTT_BROKERS"`
ClientID string `yaml:"client_id" env:"MQTT_CLIENT_ID"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Protocol string
}
type DeviceConfig struct {
Callsign string `yaml:"callsign" env:"DEVICE_CALLSIGN"`
Device string `yaml:"device" env:"DEVICE"`
Manufacturer string `yaml:"manufacturer" env:"DEVICE_MANUFACTURER"`
RXFrequency float64 `yaml:"rx_frequency" env:"DEVICE_RX_FREQ"` // in Hz
RXGain float64 `yaml:"rx_gain" env:"DEVICE_RX_GAIN"` // in dBm
TXFrequency float64 `yaml:"tx_frequency" env:"DEVICE_TX_FREQ"` // in Hz
TXPower float64 `yaml:"tx_power" env:"DEVICE_TX_GAIN"` // in dBm
}
type Client struct {
client mqtt.Client
options *Options
topicCollector string
topicDevice string
topicPacket string
}
func NewClient(options *Options) (*Client, error) {
if options.ClientID == "" {
options.ClientID, _ = os.Hostname()
}
opts := mqtt.NewClientOptions()
opts.SetCredentialsProvider(func() (username, password string) {
return options.Username, options.Password
})
opts.SetClientID(options.ClientID)
if len(options.Brokers) == 0 {
log.Println("collector: no brokers configured, using localhost")
opts.AddBroker("localhost:1883")
} else {
for _, broker := range options.Brokers {
log.Printf("collector: adding broker at tcp://%s", broker)
opts.AddBroker(broker)
}
}
if options.Protocol == "" {
options.Protocol = "unspec"
}
client := &Client{
options: options,
topicCollector: "collector/" + options.ClientID,
topicDevice: "device",
topicPacket: "packet/" + options.Protocol,
}
// Last will and testament.
opts.SetWill(client.topicCollector+"/status", client.collectorStatus("offline"), 1, true)
// Logging
opts.OnConnect = func(c mqtt.Client) {
log.Println("collector: connected to MQTT broker")
token := c.Publish(client.topicCollector+"/status", 1, false, client.collectorStatus("online"))
if token.Wait() && token.Error() != nil {
log.Printf("collector: failed to signal collector online: %v", token.Error())
c.Disconnect(0)
return
}
}
opts.OnConnectionLost = func(c mqtt.Client, err error) {
log.Printf("collector: disconnected from MQTT broker: %v", err)
}
// Connect to broker(s).
client.client = mqtt.NewClient(opts)
token := client.client.Connect()
if token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return client, nil
}
func (c *Client) collectorStatus(status string) string {
b, _ := json.Marshal(map[string]any{
"status": status,
"time": time.Now().UTC(),
"id": c.options.ClientID,
"protocol": c.options.Protocol,
})
return string(b)
}
func (c *Client) deviceStatus(config *DeviceConfig, status string) string {
b, _ := json.Marshal(map[string]any{
"status": status,
"time": time.Now().UTC(),
"protocol": c.options.Protocol,
"callsign": config.Callsign,
"device": config.Device,
"manufacturer": config.Manufacturer,
"rx_freq": config.RXFrequency,
"rx_gain": config.RXGain,
"tx_freq": config.TXFrequency,
"tx_power": config.TXPower,
})
return string(b)
}
func (c *Client) DeviceOnline(config *DeviceConfig) error {
return c.setDeviceStatus(config, "online")
}
func (c *Client) DeviceOffline(config *DeviceConfig) error {
return c.setDeviceStatus(config, "offline")
}
func (c *Client) setDeviceStatus(config *DeviceConfig, status string) error {
log.Printf("collector: device %s/%s is %s", c.options.ClientID, config.Callsign, status)
token := c.client.Publish(c.topicDevice+"/"+config.Callsign, 1, false, c.deviceStatus(config, status))
return token.Error()
}
func (c *Client) PublishPacket(id string, packet any) error {
var payload string
switch packet := packet.(type) {
case string:
payload = packet
case []byte:
payload = string(packet)
default:
b, err := json.Marshal(packet)
if err != nil {
return err
}
payload = string(b)
}
token := c.client.Publish(c.topicPacket, 0, false, payload)
return token.Error()
}