434 lines
9.2 KiB
Go
434 lines
9.2 KiB
Go
package hamview
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/golang-jwt/jwt/v5"
|
|
"go.yaml.in/yaml/v3"
|
|
|
|
"git.maze.io/go/ham/protocol"
|
|
"git.maze.io/go/ham/protocol/meshcore/crypto"
|
|
meshcorejwt "git.maze.io/go/ham/protocol/meshcore/crypto/jwt"
|
|
"git.maze.io/go/ham/radio"
|
|
)
|
|
|
|
var ErrBrokerNotStarted = errors.New("broker not started")
|
|
|
|
// Broker for publishing and receiving raw packets.
|
|
type Broker interface {
|
|
Start() error
|
|
StartRadio(protocol string, info *radio.Info) error
|
|
|
|
Close() error
|
|
|
|
SubscribeRadios() (<-chan *Radio, error)
|
|
|
|
PublishPacket(topic string, packet *protocol.Packet) error
|
|
SubscribePackets(topic string) (<-chan *protocol.Packet, error)
|
|
}
|
|
|
|
type Receiver interface {
|
|
Disconnected()
|
|
}
|
|
|
|
type BrokerConfig struct {
|
|
Type string `yaml:"type"`
|
|
Config yaml.Node `yaml:"conf"`
|
|
}
|
|
|
|
func NewBroker(config *BrokerConfig) (Broker, error) {
|
|
if config.Config.Kind != yaml.MappingNode {
|
|
return nil, fmt.Errorf("broker: conf should be a mapping")
|
|
}
|
|
|
|
switch config.Type {
|
|
case "mqtt":
|
|
return newMQTTBroker(&config.Config)
|
|
//case "kafka":
|
|
// return newKafkaBroker(&config.Config)
|
|
default:
|
|
return nil, fmt.Errorf("broker: unknown type %q", config.Type)
|
|
}
|
|
}
|
|
|
|
type mqttBrokerConfig struct {
|
|
Host string `yaml:"host"`
|
|
Brokers []string `yaml:"brokers"`
|
|
Auth string `yaml:"auth"`
|
|
Username string `yaml:"username"`
|
|
Password string `yaml:"password"`
|
|
ClientID string `yaml:"client_id"`
|
|
}
|
|
|
|
type mqttBroker struct {
|
|
options *mqtt.ClientOptions
|
|
client mqtt.Client
|
|
}
|
|
|
|
func newMQTTBroker(node *yaml.Node) (*mqttBroker, error) {
|
|
Logger.Info("broker: setting up MQTT")
|
|
|
|
var config mqttBrokerConfig
|
|
if err := node.Decode(&config); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
Logger.Tracef("broker: config: %#+v", config)
|
|
if len(config.Host) == 0 && len(config.Brokers) == 0 {
|
|
return nil, errors.New("at least one host or broker must be configured")
|
|
}
|
|
|
|
options := mqtt.NewClientOptions()
|
|
if config.Host != "" {
|
|
Logger.Debugf("broker: adding %s", config.Host)
|
|
options.AddBroker(config.Host)
|
|
}
|
|
for _, broker := range config.Brokers {
|
|
Logger.Debugf("broker: adding %s", broker)
|
|
options.AddBroker(broker)
|
|
}
|
|
if config.Auth != "" {
|
|
if err := configureAuth(options, config.Auth); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
if config.Password != "" {
|
|
options.SetPassword(config.Password)
|
|
}
|
|
}
|
|
if config.Username != "" {
|
|
options.SetUsername(config.Username)
|
|
}
|
|
if config.ClientID != "" {
|
|
options.SetClientID(config.ClientID)
|
|
} else {
|
|
clientID, err := generateClientID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
options.SetClientID(clientID)
|
|
}
|
|
|
|
options.OnConnect = func(_ mqtt.Client) {
|
|
Logger.Info("broker: connected to MQTT broker")
|
|
}
|
|
options.OnConnectionLost = func(_ mqtt.Client, err error) {
|
|
Logger.Warnf("broker: connection to MQTT broker lost: %v", err)
|
|
}
|
|
|
|
return &mqttBroker{
|
|
options: options,
|
|
}, nil
|
|
}
|
|
|
|
func (broker *mqttBroker) Close() error {
|
|
Logger.Warn("broker: closing")
|
|
if broker.client != nil {
|
|
broker.client.Disconnect(100)
|
|
broker.client = nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (broker *mqttBroker) Start() error {
|
|
// Connect to the broker
|
|
broker.client = mqtt.NewClient(broker.options)
|
|
token := broker.client.Connect()
|
|
token.Wait()
|
|
if err := token.Error(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (broker *mqttBroker) StartRadio(protocol string, info *radio.Info) error {
|
|
if info.Name == "" {
|
|
return errors.New("broker: radio has no name")
|
|
}
|
|
|
|
// Setup last will
|
|
var radio = &Radio{
|
|
Info: info,
|
|
Protocol: protocol,
|
|
IsOnline: false,
|
|
}
|
|
// Configure last will
|
|
will, err := json.Marshal(&radio)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
topic := fmt.Sprintf("radio/%s/%s",
|
|
protocol,
|
|
base64.RawURLEncoding.EncodeToString([]byte(info.Name)))
|
|
|
|
Logger.Debugf("broker: configure last will %s", topic)
|
|
broker.options.SetWill(topic, string(will), 1, true)
|
|
|
|
// Connect to the broker
|
|
if err = broker.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Send status
|
|
radio.IsOnline = true
|
|
payload, err := json.Marshal(radio)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
Logger.Infof("broker: radio %s online at %s", info.Name, topic)
|
|
token := broker.client.Publish(topic, 1, true, string(payload))
|
|
token.Wait()
|
|
return token.Error()
|
|
}
|
|
|
|
func (broker *mqttBroker) SubscribeRadios() (<-chan *Radio, error) {
|
|
if broker.client == nil {
|
|
return nil, ErrBrokerNotStarted
|
|
}
|
|
|
|
radios := make(chan *Radio, 8)
|
|
token := broker.client.Subscribe("radio/#", 0, func(_ mqtt.Client, message mqtt.Message) {
|
|
var radio Radio
|
|
if err := json.Unmarshal(message.Payload(), &radio); err == nil {
|
|
select {
|
|
case radios <- &radio:
|
|
default:
|
|
}
|
|
}
|
|
})
|
|
|
|
if token.Wait() && token.Error() != nil {
|
|
close(radios)
|
|
return nil, token.Error()
|
|
}
|
|
|
|
return radios, nil
|
|
}
|
|
|
|
func (broker *mqttBroker) PublishPacket(topic string, packet *protocol.Packet) error {
|
|
if broker.client == nil {
|
|
return ErrBrokerNotStarted
|
|
}
|
|
|
|
b, err := json.Marshal(packet)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
token := broker.client.Publish(topic, 0, true, string(b))
|
|
if token.Wait() && token.Error() != nil {
|
|
return token.Error()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (broker *mqttBroker) SubscribePackets(topic string) (<-chan *protocol.Packet, error) {
|
|
if broker.client == nil {
|
|
return nil, ErrBrokerNotStarted
|
|
}
|
|
|
|
packets := make(chan *protocol.Packet, 16)
|
|
token := broker.client.Subscribe(topic, 0, func(_ mqtt.Client, message mqtt.Message) {
|
|
var packet protocol.Packet
|
|
if err := json.Unmarshal(message.Payload(), &packet); err == nil {
|
|
select {
|
|
case packets <- &packet:
|
|
default:
|
|
}
|
|
}
|
|
})
|
|
|
|
if token.Wait() && token.Error() != nil {
|
|
close(packets)
|
|
return nil, token.Error()
|
|
}
|
|
|
|
return packets, nil
|
|
}
|
|
|
|
/*
|
|
type kafkaBroker struct {
|
|
configMap kafka.ConfigMap
|
|
producer *kafka.Producer
|
|
}
|
|
|
|
func newKafkaBroker(node *yaml.Node) (*kafkaBroker, error) {
|
|
Logger.Info("broker: setting up Kafka")
|
|
|
|
var config = make(map[string]kafka.ConfigValue)
|
|
if err := node.Decode(config); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ensure default values:
|
|
config["acks"] = "all"
|
|
if s, ok := config["client.id"]; !ok || s == "" {
|
|
var err error
|
|
if config["client.id"], err = generateClientID(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &kafkaBroker{
|
|
configMap: config,
|
|
}, nil
|
|
}
|
|
|
|
func (broker *kafkaBroker) Close() error {
|
|
Logger.Warn("broker: closing")
|
|
if broker.producer != nil {
|
|
broker.producer.Close()
|
|
broker.producer = nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (broker *kafkaBroker) Start() (err error) {
|
|
if broker.producer != nil {
|
|
return nil
|
|
}
|
|
|
|
if broker.producer, err = kafka.NewProducer(&broker.configMap); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (broker *kafkaBroker) StartRadio(protocol string, info *radio.Info) error {
|
|
return broker.Start()
|
|
}
|
|
|
|
func (broker *kafkaBroker) PublishPacket(topic string, packet *protocol.Packet) error {
|
|
if broker.producer == nil {
|
|
return ErrBrokerNotStarted
|
|
}
|
|
|
|
data, err := json.Marshal(packet)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return broker.producer.Produce(&kafka.Message{
|
|
TopicPartition: kafka.TopicPartition{
|
|
Topic: &topic,
|
|
Partition: kafka.PartitionAny,
|
|
},
|
|
Value: data,
|
|
}, nil)
|
|
}
|
|
|
|
func (broker *kafkaBroker) ensureProducer() (err error) {
|
|
if broker.producer == nil {
|
|
broker.producer, err = kafka.NewProducer(&broker.configMap)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (broker *kafkaBroker) SubscribePackets(topic string) (<-chan *protocol.Packet, error) {
|
|
consumer, err := kafka.NewConsumer(&broker.configMap)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if err = consumer.Subscribe(topic, nil); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
packets := make(chan *protocol.Packet, 16)
|
|
|
|
go func() {
|
|
defer close(packets)
|
|
for {
|
|
event := consumer.Poll(100)
|
|
if event == nil {
|
|
continue
|
|
}
|
|
|
|
switch event := event.(type) {
|
|
case kafka.Error:
|
|
// TODO
|
|
if event.IsFatal() {
|
|
return
|
|
}
|
|
|
|
case *kafka.Message:
|
|
var packet protocol.Packet
|
|
if err := json.Unmarshal(event.Value, &packet); err == nil {
|
|
select {
|
|
case packets <- &packet:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return packets, nil
|
|
}
|
|
*/
|
|
|
|
func generateClientID() (string, error) {
|
|
name, err := os.Hostname()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
var (
|
|
node = strings.ToLower(strings.SplitN(name, ".", 2)[0])
|
|
random = make([]byte, 4)
|
|
)
|
|
rand.Read(random)
|
|
return fmt.Sprintf("%s_%08x", node, random), nil
|
|
}
|
|
|
|
func configureAuth(options *mqtt.ClientOptions, value string) error {
|
|
part := strings.Split(value, ":")
|
|
if len(part) < 2 {
|
|
return errors.New("broker: mqtt.auth must be in `type:value` format")
|
|
}
|
|
|
|
switch part[0] {
|
|
case "jwt-ed25519":
|
|
var key *crypto.PrivateKey
|
|
f, err := os.ReadFile(part[1])
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
if _, key, err = crypto.GenerateKey(); err != nil {
|
|
return err
|
|
}
|
|
if err = os.WriteFile(part[1], key.Bytes(), 0600); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if key, err = crypto.NewPrivateKey(f); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
token := jwt.NewWithClaims(meshcorejwt.SigningMethod, jwt.MapClaims{
|
|
"publickey": hex.EncodeToString(key.PublicKey()),
|
|
"iat": time.Now().Unix(),
|
|
})
|
|
tokenString, err := token.SignedString(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
options.SetPassword(tokenString)
|
|
return nil
|
|
|
|
default:
|
|
return fmt.Errorf("broker: unknown auth method %q", part[0])
|
|
}
|
|
}
|