package hamview import ( "crypto/rand" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" "os" "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/golang-jwt/jwt/v5" "go.yaml.in/yaml/v3" "git.maze.io/go/ham/protocol" "git.maze.io/go/ham/protocol/meshcore/crypto" meshcorejwt "git.maze.io/go/ham/protocol/meshcore/crypto/jwt" "git.maze.io/go/ham/radio" ) var ErrBrokerNotStarted = errors.New("broker not started") // Broker for publishing and receiving raw packets. type Broker interface { Start() error StartRadio(protocol string, info *radio.Info) error Close() error SubscribeRadios() (<-chan *Radio, error) PublishPacket(topic string, packet *protocol.Packet) error SubscribePackets(topic string) (<-chan *protocol.Packet, error) } type Receiver interface { Disconnected() } type BrokerConfig struct { Type string `yaml:"type"` Config yaml.Node `yaml:"conf"` } func NewBroker(config *BrokerConfig) (Broker, error) { if config.Config.Kind != yaml.MappingNode { return nil, fmt.Errorf("broker: conf should be a mapping") } switch config.Type { case "mqtt": return newMQTTBroker(&config.Config) //case "kafka": // return newKafkaBroker(&config.Config) default: return nil, fmt.Errorf("broker: unknown type %q", config.Type) } } type mqttBrokerConfig struct { Host string `yaml:"host"` Brokers []string `yaml:"brokers"` Auth string `yaml:"auth"` Username string `yaml:"username"` Password string `yaml:"password"` ClientID string `yaml:"client_id"` } type mqttBroker struct { options *mqtt.ClientOptions client mqtt.Client } func newMQTTBroker(node *yaml.Node) (*mqttBroker, error) { Logger.Info("broker: setting up MQTT") var config mqttBrokerConfig if err := node.Decode(&config); err != nil { return nil, err } Logger.Tracef("broker: config: %#+v", config) if len(config.Host) == 0 && len(config.Brokers) == 0 { return nil, errors.New("at least one host or broker must be configured") } options := mqtt.NewClientOptions() if config.Host != "" { Logger.Debugf("broker: adding %s", config.Host) options.AddBroker(config.Host) } for _, broker := range config.Brokers { Logger.Debugf("broker: adding %s", broker) options.AddBroker(broker) } if config.Auth != "" { if err := configureAuth(options, config.Auth); err != nil { return nil, err } } else { if config.Password != "" { options.SetPassword(config.Password) } } if config.Username != "" { options.SetUsername(config.Username) } if config.ClientID != "" { options.SetClientID(config.ClientID) } else { clientID, err := generateClientID() if err != nil { return nil, err } options.SetClientID(clientID) } options.OnConnect = func(_ mqtt.Client) { Logger.Info("broker: connected to MQTT broker") } options.OnConnectionLost = func(_ mqtt.Client, err error) { Logger.Warnf("broker: connection to MQTT broker lost: %v", err) } return &mqttBroker{ options: options, }, nil } func (broker *mqttBroker) Close() error { Logger.Warn("broker: closing") if broker.client != nil { broker.client.Disconnect(100) broker.client = nil } return nil } func (broker *mqttBroker) Start() error { // Connect to the broker broker.client = mqtt.NewClient(broker.options) token := broker.client.Connect() token.Wait() if err := token.Error(); err != nil { return err } return nil } func (broker *mqttBroker) StartRadio(protocol string, info *radio.Info) error { if info.Name == "" { return errors.New("broker: radio has no name") } // Setup last will var radio = &Radio{ Info: info, Protocol: protocol, IsOnline: false, } // Configure last will will, err := json.Marshal(&radio) if err != nil { return err } topic := fmt.Sprintf("radio/%s/%s", protocol, base64.RawURLEncoding.EncodeToString([]byte(info.Name))) Logger.Debugf("broker: configure last will %s", topic) broker.options.SetWill(topic, string(will), 1, true) // Connect to the broker if err = broker.Start(); err != nil { return err } // Send status radio.IsOnline = true payload, err := json.Marshal(radio) if err != nil { return err } Logger.Infof("broker: radio %s online at %s", info.Name, topic) token := broker.client.Publish(topic, 1, true, string(payload)) token.Wait() return token.Error() } func (broker *mqttBroker) SubscribeRadios() (<-chan *Radio, error) { if broker.client == nil { return nil, ErrBrokerNotStarted } radios := make(chan *Radio, 8) token := broker.client.Subscribe("radio/#", 0, func(_ mqtt.Client, message mqtt.Message) { var radio Radio if err := json.Unmarshal(message.Payload(), &radio); err == nil { select { case radios <- &radio: default: } } }) if token.Wait() && token.Error() != nil { close(radios) return nil, token.Error() } return radios, nil } func (broker *mqttBroker) PublishPacket(topic string, packet *protocol.Packet) error { if broker.client == nil { return ErrBrokerNotStarted } b, err := json.Marshal(packet) if err != nil { return err } token := broker.client.Publish(topic, 0, true, string(b)) if token.Wait() && token.Error() != nil { return token.Error() } return nil } func (broker *mqttBroker) SubscribePackets(topic string) (<-chan *protocol.Packet, error) { if broker.client == nil { return nil, ErrBrokerNotStarted } packets := make(chan *protocol.Packet, 16) token := broker.client.Subscribe(topic, 0, func(_ mqtt.Client, message mqtt.Message) { var packet protocol.Packet if err := json.Unmarshal(message.Payload(), &packet); err == nil { select { case packets <- &packet: default: } } }) if token.Wait() && token.Error() != nil { close(packets) return nil, token.Error() } return packets, nil } /* type kafkaBroker struct { configMap kafka.ConfigMap producer *kafka.Producer } func newKafkaBroker(node *yaml.Node) (*kafkaBroker, error) { Logger.Info("broker: setting up Kafka") var config = make(map[string]kafka.ConfigValue) if err := node.Decode(config); err != nil { return nil, err } // Ensure default values: config["acks"] = "all" if s, ok := config["client.id"]; !ok || s == "" { var err error if config["client.id"], err = generateClientID(); err != nil { return nil, err } } return &kafkaBroker{ configMap: config, }, nil } func (broker *kafkaBroker) Close() error { Logger.Warn("broker: closing") if broker.producer != nil { broker.producer.Close() broker.producer = nil } return nil } func (broker *kafkaBroker) Start() (err error) { if broker.producer != nil { return nil } if broker.producer, err = kafka.NewProducer(&broker.configMap); err != nil { return err } return nil } func (broker *kafkaBroker) StartRadio(protocol string, info *radio.Info) error { return broker.Start() } func (broker *kafkaBroker) PublishPacket(topic string, packet *protocol.Packet) error { if broker.producer == nil { return ErrBrokerNotStarted } data, err := json.Marshal(packet) if err != nil { return err } return broker.producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, Partition: kafka.PartitionAny, }, Value: data, }, nil) } func (broker *kafkaBroker) ensureProducer() (err error) { if broker.producer == nil { broker.producer, err = kafka.NewProducer(&broker.configMap) } return } func (broker *kafkaBroker) SubscribePackets(topic string) (<-chan *protocol.Packet, error) { consumer, err := kafka.NewConsumer(&broker.configMap) if err != nil { return nil, err } else if err = consumer.Subscribe(topic, nil); err != nil { return nil, err } packets := make(chan *protocol.Packet, 16) go func() { defer close(packets) for { event := consumer.Poll(100) if event == nil { continue } switch event := event.(type) { case kafka.Error: // TODO if event.IsFatal() { return } case *kafka.Message: var packet protocol.Packet if err := json.Unmarshal(event.Value, &packet); err == nil { select { case packets <- &packet: default: } } } } }() return packets, nil } */ func generateClientID() (string, error) { name, err := os.Hostname() if err != nil { return "", err } var ( node = strings.ToLower(strings.SplitN(name, ".", 2)[0]) random = make([]byte, 4) ) rand.Read(random) return fmt.Sprintf("%s_%08x", node, random), nil } func configureAuth(options *mqtt.ClientOptions, value string) error { part := strings.Split(value, ":") if len(part) < 2 { return errors.New("broker: mqtt.auth must be in `type:value` format") } switch part[0] { case "jwt-ed25519": var key *crypto.PrivateKey f, err := os.ReadFile(part[1]) if err != nil { if !os.IsNotExist(err) { return err } if _, key, err = crypto.GenerateKey(); err != nil { return err } if err = os.WriteFile(part[1], key.Bytes(), 0600); err != nil { return err } } else { if key, err = crypto.NewPrivateKey(f); err != nil { return err } } token := jwt.NewWithClaims(meshcorejwt.SigningMethod, jwt.MapClaims{ "publickey": hex.EncodeToString(key.PublicKey()), "iat": time.Now().Unix(), }) tokenString, err := token.SignedString(key) if err != nil { return err } options.SetPassword(tokenString) return nil default: return fmt.Errorf("broker: unknown auth method %q", part[0]) } }