Initial import
This commit is contained in:
433
broker.go
Normal file
433
broker.go
Normal file
@@ -0,0 +1,433 @@
|
||||
package hamview
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
"go.yaml.in/yaml/v3"
|
||||
|
||||
"git.maze.io/go/ham/protocol"
|
||||
"git.maze.io/go/ham/protocol/meshcore/crypto"
|
||||
meshcorejwt "git.maze.io/go/ham/protocol/meshcore/crypto/jwt"
|
||||
"git.maze.io/go/ham/radio"
|
||||
)
|
||||
|
||||
var ErrBrokerNotStarted = errors.New("broker not started")
|
||||
|
||||
// Broker for publishing and receiving raw packets.
|
||||
type Broker interface {
|
||||
Start() error
|
||||
StartRadio(protocol string, info *radio.Info) error
|
||||
|
||||
Close() error
|
||||
|
||||
SubscribeRadios() (<-chan *Radio, error)
|
||||
|
||||
PublishPacket(topic string, packet *protocol.Packet) error
|
||||
SubscribePackets(topic string) (<-chan *protocol.Packet, error)
|
||||
}
|
||||
|
||||
type Receiver interface {
|
||||
Disconnected()
|
||||
}
|
||||
|
||||
type BrokerConfig struct {
|
||||
Type string `yaml:"type"`
|
||||
Config yaml.Node `yaml:"conf"`
|
||||
}
|
||||
|
||||
func NewBroker(config *BrokerConfig) (Broker, error) {
|
||||
if config.Config.Kind != yaml.MappingNode {
|
||||
return nil, fmt.Errorf("broker: conf should be a mapping")
|
||||
}
|
||||
|
||||
switch config.Type {
|
||||
case "mqtt":
|
||||
return newMQTTBroker(&config.Config)
|
||||
//case "kafka":
|
||||
// return newKafkaBroker(&config.Config)
|
||||
default:
|
||||
return nil, fmt.Errorf("broker: unknown type %q", config.Type)
|
||||
}
|
||||
}
|
||||
|
||||
type mqttBrokerConfig struct {
|
||||
Host string `yaml:"host"`
|
||||
Brokers []string `yaml:"brokers"`
|
||||
Auth string `yaml:"auth"`
|
||||
Username string `yaml:"username"`
|
||||
Password string `yaml:"password"`
|
||||
ClientID string `yaml:"client_id"`
|
||||
}
|
||||
|
||||
type mqttBroker struct {
|
||||
options *mqtt.ClientOptions
|
||||
client mqtt.Client
|
||||
}
|
||||
|
||||
func newMQTTBroker(node *yaml.Node) (*mqttBroker, error) {
|
||||
Logger.Info("broker: setting up MQTT")
|
||||
|
||||
var config mqttBrokerConfig
|
||||
if err := node.Decode(&config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Logger.Tracef("broker: config: %#+v", config)
|
||||
if len(config.Host) == 0 && len(config.Brokers) == 0 {
|
||||
return nil, errors.New("at least one host or broker must be configured")
|
||||
}
|
||||
|
||||
options := mqtt.NewClientOptions()
|
||||
if config.Host != "" {
|
||||
Logger.Debugf("broker: adding %s", config.Host)
|
||||
options.AddBroker(config.Host)
|
||||
}
|
||||
for _, broker := range config.Brokers {
|
||||
Logger.Debugf("broker: adding %s", broker)
|
||||
options.AddBroker(broker)
|
||||
}
|
||||
if config.Auth != "" {
|
||||
if err := configureAuth(options, config.Auth); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if config.Password != "" {
|
||||
options.SetPassword(config.Password)
|
||||
}
|
||||
}
|
||||
if config.Username != "" {
|
||||
options.SetUsername(config.Username)
|
||||
}
|
||||
if config.ClientID != "" {
|
||||
options.SetClientID(config.ClientID)
|
||||
} else {
|
||||
clientID, err := generateClientID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
options.SetClientID(clientID)
|
||||
}
|
||||
|
||||
options.OnConnect = func(_ mqtt.Client) {
|
||||
Logger.Info("broker: connected to MQTT broker")
|
||||
}
|
||||
options.OnConnectionLost = func(_ mqtt.Client, err error) {
|
||||
Logger.Warnf("broker: connection to MQTT broker lost: %v", err)
|
||||
}
|
||||
|
||||
return &mqttBroker{
|
||||
options: options,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (broker *mqttBroker) Close() error {
|
||||
Logger.Warn("broker: closing")
|
||||
if broker.client != nil {
|
||||
broker.client.Disconnect(100)
|
||||
broker.client = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *mqttBroker) Start() error {
|
||||
// Connect to the broker
|
||||
broker.client = mqtt.NewClient(broker.options)
|
||||
token := broker.client.Connect()
|
||||
token.Wait()
|
||||
if err := token.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *mqttBroker) StartRadio(protocol string, info *radio.Info) error {
|
||||
if info.Name == "" {
|
||||
return errors.New("broker: radio has no name")
|
||||
}
|
||||
|
||||
// Setup last will
|
||||
var radio = &Radio{
|
||||
Info: info,
|
||||
Protocol: protocol,
|
||||
IsOnline: false,
|
||||
}
|
||||
// Configure last will
|
||||
will, err := json.Marshal(&radio)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
topic := fmt.Sprintf("radio/%s/%s",
|
||||
protocol,
|
||||
base64.RawURLEncoding.EncodeToString([]byte(info.Name)))
|
||||
|
||||
Logger.Debugf("broker: configure last will %s", topic)
|
||||
broker.options.SetWill(topic, string(will), 1, true)
|
||||
|
||||
// Connect to the broker
|
||||
if err = broker.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Send status
|
||||
radio.IsOnline = true
|
||||
payload, err := json.Marshal(radio)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Logger.Infof("broker: radio %s online at %s", info.Name, topic)
|
||||
token := broker.client.Publish(topic, 1, true, string(payload))
|
||||
token.Wait()
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (broker *mqttBroker) SubscribeRadios() (<-chan *Radio, error) {
|
||||
if broker.client == nil {
|
||||
return nil, ErrBrokerNotStarted
|
||||
}
|
||||
|
||||
radios := make(chan *Radio, 8)
|
||||
token := broker.client.Subscribe("radio/#", 0, func(_ mqtt.Client, message mqtt.Message) {
|
||||
var radio Radio
|
||||
if err := json.Unmarshal(message.Payload(), &radio); err == nil {
|
||||
select {
|
||||
case radios <- &radio:
|
||||
default:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if token.Wait() && token.Error() != nil {
|
||||
close(radios)
|
||||
return nil, token.Error()
|
||||
}
|
||||
|
||||
return radios, nil
|
||||
}
|
||||
|
||||
func (broker *mqttBroker) PublishPacket(topic string, packet *protocol.Packet) error {
|
||||
if broker.client == nil {
|
||||
return ErrBrokerNotStarted
|
||||
}
|
||||
|
||||
b, err := json.Marshal(packet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
token := broker.client.Publish(topic, 0, true, string(b))
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *mqttBroker) SubscribePackets(topic string) (<-chan *protocol.Packet, error) {
|
||||
if broker.client == nil {
|
||||
return nil, ErrBrokerNotStarted
|
||||
}
|
||||
|
||||
packets := make(chan *protocol.Packet, 16)
|
||||
token := broker.client.Subscribe(topic, 0, func(_ mqtt.Client, message mqtt.Message) {
|
||||
var packet protocol.Packet
|
||||
if err := json.Unmarshal(message.Payload(), &packet); err == nil {
|
||||
select {
|
||||
case packets <- &packet:
|
||||
default:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if token.Wait() && token.Error() != nil {
|
||||
close(packets)
|
||||
return nil, token.Error()
|
||||
}
|
||||
|
||||
return packets, nil
|
||||
}
|
||||
|
||||
/*
|
||||
type kafkaBroker struct {
|
||||
configMap kafka.ConfigMap
|
||||
producer *kafka.Producer
|
||||
}
|
||||
|
||||
func newKafkaBroker(node *yaml.Node) (*kafkaBroker, error) {
|
||||
Logger.Info("broker: setting up Kafka")
|
||||
|
||||
var config = make(map[string]kafka.ConfigValue)
|
||||
if err := node.Decode(config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ensure default values:
|
||||
config["acks"] = "all"
|
||||
if s, ok := config["client.id"]; !ok || s == "" {
|
||||
var err error
|
||||
if config["client.id"], err = generateClientID(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return &kafkaBroker{
|
||||
configMap: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (broker *kafkaBroker) Close() error {
|
||||
Logger.Warn("broker: closing")
|
||||
if broker.producer != nil {
|
||||
broker.producer.Close()
|
||||
broker.producer = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *kafkaBroker) Start() (err error) {
|
||||
if broker.producer != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if broker.producer, err = kafka.NewProducer(&broker.configMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (broker *kafkaBroker) StartRadio(protocol string, info *radio.Info) error {
|
||||
return broker.Start()
|
||||
}
|
||||
|
||||
func (broker *kafkaBroker) PublishPacket(topic string, packet *protocol.Packet) error {
|
||||
if broker.producer == nil {
|
||||
return ErrBrokerNotStarted
|
||||
}
|
||||
|
||||
data, err := json.Marshal(packet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return broker.producer.Produce(&kafka.Message{
|
||||
TopicPartition: kafka.TopicPartition{
|
||||
Topic: &topic,
|
||||
Partition: kafka.PartitionAny,
|
||||
},
|
||||
Value: data,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (broker *kafkaBroker) ensureProducer() (err error) {
|
||||
if broker.producer == nil {
|
||||
broker.producer, err = kafka.NewProducer(&broker.configMap)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (broker *kafkaBroker) SubscribePackets(topic string) (<-chan *protocol.Packet, error) {
|
||||
consumer, err := kafka.NewConsumer(&broker.configMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if err = consumer.Subscribe(topic, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
packets := make(chan *protocol.Packet, 16)
|
||||
|
||||
go func() {
|
||||
defer close(packets)
|
||||
for {
|
||||
event := consumer.Poll(100)
|
||||
if event == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
switch event := event.(type) {
|
||||
case kafka.Error:
|
||||
// TODO
|
||||
if event.IsFatal() {
|
||||
return
|
||||
}
|
||||
|
||||
case *kafka.Message:
|
||||
var packet protocol.Packet
|
||||
if err := json.Unmarshal(event.Value, &packet); err == nil {
|
||||
select {
|
||||
case packets <- &packet:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return packets, nil
|
||||
}
|
||||
*/
|
||||
|
||||
func generateClientID() (string, error) {
|
||||
name, err := os.Hostname()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var (
|
||||
node = strings.ToLower(strings.SplitN(name, ".", 2)[0])
|
||||
random = make([]byte, 4)
|
||||
)
|
||||
rand.Read(random)
|
||||
return fmt.Sprintf("%s_%08x", node, random), nil
|
||||
}
|
||||
|
||||
func configureAuth(options *mqtt.ClientOptions, value string) error {
|
||||
part := strings.Split(value, ":")
|
||||
if len(part) < 2 {
|
||||
return errors.New("broker: mqtt.auth must be in `type:value` format")
|
||||
}
|
||||
|
||||
switch part[0] {
|
||||
case "jwt-ed25519":
|
||||
var key *crypto.PrivateKey
|
||||
f, err := os.ReadFile(part[1])
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
if _, key, err = crypto.GenerateKey(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = os.WriteFile(part[1], key.Bytes(), 0600); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if key, err = crypto.NewPrivateKey(f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
token := jwt.NewWithClaims(meshcorejwt.SigningMethod, jwt.MapClaims{
|
||||
"publickey": hex.EncodeToString(key.PublicKey()),
|
||||
"iat": time.Now().Unix(),
|
||||
})
|
||||
tokenString, err := token.SignedString(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
options.SetPassword(tokenString)
|
||||
return nil
|
||||
|
||||
default:
|
||||
return fmt.Errorf("broker: unknown auth method %q", part[0])
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user