package hamview import ( "context" "encoding/hex" "fmt" "strings" "time" _ "github.com/cridenour/go-postgis" // PostGIS support _ "github.com/lib/pq" // PostgreSQL support "xorm.io/builder" "xorm.io/xorm" "git.maze.io/go/ham/protocol" "git.maze.io/go/ham/protocol/aprs" "git.maze.io/go/ham/protocol/meshcore" "git.maze.io/ham/hamview/schema" ) type CollectorConfig struct { Database DatabaseConfig `yaml:"database"` } type DatabaseConfig struct { Type string `yaml:"type"` Conf string `yaml:"conf"` } type Collector struct { radioByID map[string]*schema.Radio meshCoreGroup map[byte][]*meshcore.Group } func NewCollector(config *CollectorConfig) (*Collector, error) { Logger.Debugf("collector: opening %q database", config.Database.Type) schema.Logger = Logger if err := schema.Open(config.Database.Type, config.Database.Conf); err != nil { return nil, err } return &Collector{ meshCoreGroup: make(map[byte][]*meshcore.Group), radioByID: make(map[string]*schema.Radio), }, nil } func (c *Collector) Collect(broker Broker, topic string) error { Logger.Debugf("collector: subscribing to radios") radios, err := broker.SubscribeRadios() if err != nil { Logger.Errorf("collector: error subscribing: %v", err) return err } Logger.Debugf("collector: subscribing to %s", topic) packets, err := broker.SubscribePackets(topic) if err != nil { Logger.Errorf("collector: error subscribing to %s: %v", topic, err) return err } ctx := context.Background() loop: for { select { case radio := <-radios: if radio == nil { break loop } c.processRadio(ctx, radio) case packet := <-packets: if packet == nil { break loop } switch packet.Protocol { case protocol.APRS: c.processAPRSPacket(ctx, packet) case protocol.MeshCore: c.processMeshCorePacket(ctx, packet) } } } Logger.Warnf("collector: done processing packets from %s: channel closed", topic) return nil } func (c *Collector) getRadioByID(ctx context.Context, id string) (*schema.Radio, error) { id = strings.TrimRight(id, "=") if radio, ok := c.radioByID[id]; ok { return radio, nil } radio, err := schema.GetRadioByEncodedID(ctx, id) if err == nil { c.radioByID[id] = radio } return radio, err } func (c *Collector) processRadio(ctx context.Context, received *Radio) { Logger.Tracef("collector: process %s radio %q online %t", received.Protocol, received.Name, received.IsOnline) var ( now = time.Now() engine = schema.Query(ctx).(*xorm.Session) ) if err := engine.Begin(); err != nil { Logger.Warnf("collector: can't start session: %v", err) return } radio := new(schema.Radio) has, err := engine.Where(builder.Eq{ "name": received.Name, "protocol": received.Protocol, }).Get(radio) if err != nil { Logger.Warnf("collector: can't query radio: %v", err) return } if has { radio.IsOnline = received.IsOnline radio.UpdatedAt = now if _, err = engine.Cols("is_online", "updated_at").Update(radio); err != nil { _ = engine.Rollback() Logger.Warnf("collector: can't update radio: %v", err) return } } else { radio = &schema.Radio{ Name: received.Name, IsOnline: received.IsOnline, Manufacturer: received.Manufacturer, Device: schema.NULLString(received.Device), FirmwareVersion: schema.NULLString(received.FirmwareVersion), FirmwareDate: schema.NULLTime(received.FirmwareDate), Antenna: schema.NULLString(received.Antenna), Modulation: received.Modulation, Protocol: received.Protocol, Frequency: received.Frequency, Bandwidth: received.Bandwidth, Power: schema.NULLFloat64(received.Power), Gain: schema.NULLFloat64(received.Gain), CreatedAt: now, UpdatedAt: now, } if received.Position != nil { radio.Latitude = &received.Position.Latitude radio.Longitude = &received.Position.Longitude radio.Altitude = &received.Position.Altitude } if received.LoRaCR != 0 && received.LoRaSF != 0 { radio.LoRaSF = &received.LoRaSF radio.LoRaCR = &received.LoRaCR } if _, err = engine.Insert(radio); err != nil { Logger.Warnf("collector: can't insert radio %#+v: %v", radio, err) return } } if err = engine.Commit(); err != nil { Logger.Errorf("collector: can't commit radio session: %v", err) } } func (c *Collector) processAPRSPacket(ctx context.Context, packet *Packet) { radio, err := c.getRadioByID(ctx, packet.RadioID) if err != nil { Logger.Warnf("collector: process %s packet: can't find radio %q: %v", packet.Protocol, packet.RadioID, err) return } decoded, err := aprs.Parse(string(packet.Raw)) if err != nil { Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err) return } Logger.Tracef("collector: process %s packet (%d bytes)", packet.Protocol, len(packet.Raw)) engine := schema.Query(ctx) station := new(schema.APRSStation) has, err := engine.Where(builder.Eq{"call": strings.ToUpper(decoded.Source.String())}).Get(station) if err != nil { Logger.Warnf("collector: can't query APRS station: %v", err) return } else if has { cols := []string{"last_heard_at"} station.LastHeardAt = packet.Time if decoded.Latitude != 0 { station.LastLatitude = &decoded.Latitude station.LastLongitude = &decoded.Longitude cols = append(cols, "last_latitude", "last_longitude") } if _, err = engine.ID(station.ID).Cols(cols...).Update(station); err != nil { Logger.Warnf("collector: can't update APRS station: %v", err) return } } else { station = &schema.APRSStation{ Call: strings.ToUpper(decoded.Source.String()), Symbol: decoded.Symbol, FirstHeardAt: packet.Time, LastHeardAt: packet.Time, } if decoded.Latitude != 0 { station.LastLatitude = &decoded.Latitude station.LastLongitude = &decoded.Longitude } if station.ID, err = engine.Insert(station); err != nil { Logger.Warnf("collector: can't insert APRS station: %v", err) return } } save := &schema.APRSPacket{ RadioID: radio.ID, StationID: station.ID, Source: station.Call, Destination: decoded.Destination.String(), Path: decoded.Path.String(), Comment: decoded.Comment, Symbol: string(decoded.Symbol[:]), Raw: string(packet.Raw), ReceivedAt: packet.Time, } if decoded.Latitude != 0 { save.Latitude = &decoded.Latitude save.Longitude = &decoded.Longitude } if _, err = engine.Insert(save); err != nil { Logger.Warnf("collector: can't insert APRS packet: %v", err) } } func (c *Collector) processMeshCorePacket(ctx context.Context, packet *Packet) { radio, err := c.getRadioByID(ctx, packet.RadioID) if err != nil { Logger.Warnf("collector: process %s packet: can't find radio %q: %v", packet.Protocol, packet.RadioID, err) return } var parsed meshcore.Packet if err = parsed.UnmarshalBytes(packet.Raw); err != nil { Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err) return } Logger.Tracef("collector: process %s %s packet (%d bytes)", packet.Protocol, parsed.PayloadType.String(), len(packet.Raw)) if len(parsed.Path) == 0 { parsed.Path = nil // store NULL } var channelHash string switch parsed.PayloadType { case meshcore.TypeGroupText, meshcore.TypeGroupData: if len(parsed.Payload) > 0 { channelHash = fmt.Sprintf("%02x", parsed.Payload[0]) } } save := &schema.MeshCorePacket{ RadioID: radio.ID, SNR: packet.SNR, RSSI: packet.RSSI, RouteType: uint8(parsed.RouteType), PayloadType: uint8(parsed.PayloadType), Version: parsed.Version, Hash: hex.EncodeToString(parsed.Hash()), Path: parsed.Path, Payload: parsed.Payload, ChannelHash: channelHash, Raw: packet.Raw, ReceivedAt: packet.Time, } if _, err = schema.Query(ctx).Insert(save); err != nil { Logger.Warnf("collector: error storing packet: %v", err) return } switch parsed.PayloadType { case meshcore.TypeAdvert: c.processMeshCoreAdvert(ctx, save, &parsed) } } func (c *Collector) processMeshCoreAdvert(ctx context.Context, packet *schema.MeshCorePacket, parsed *meshcore.Packet) { payload, err := parsed.Decode() if err != nil { Logger.Warnf("collector: error decoding packet: %v", err) return } advert, ok := payload.(*meshcore.Advert) if !ok { Logger.Warnf("collector: expected Advert, got %T!?", payload) return } node := &schema.MeshCoreNode{ PacketHash: packet.Hash, Name: advert.Name, Type: uint8(advert.Type), Prefix: fmt.Sprintf("%02x", advert.PublicKey.Bytes()[0]), PublicKey: hex.EncodeToString(advert.PublicKey.Bytes()), FirstHeardAt: packet.ReceivedAt, LastHeardAt: packet.ReceivedAt, } if advert.Position != nil { node.LastLatitude = &advert.Position.Latitude node.LastLongitude = &advert.Position.Longitude } var ( engine = schema.Query(ctx) existing = new(schema.MeshCoreNode) ) if err = engine.Begin(); err != nil { Logger.Warnf("collector: can't start session: %v", err) return } var has bool if has, err = engine.Where(builder.Eq{"`public_key`": node.PublicKey}).Get(existing); err != nil { _ = engine.Rollback() Logger.Warnf("collector: can't query session: %v", err) return } if has { cols := []string{"last_heard_at"} existing.LastHeardAt = packet.ReceivedAt if advert.Position != nil { existing.LastLatitude = node.LastLatitude existing.LastLongitude = node.LastLongitude cols = append(cols, "last_latitude", "last_longitude") } existing.Name = node.Name _, err = engine.ID(existing.ID).Cols(cols...).Update(existing) } else { _, err = engine.Insert(node) } if err != nil { _ = engine.Rollback() Logger.Warnf("collector: can't save (update: %t): %v", has, err) return } if err = engine.Commit(); err != nil { Logger.Warnf("collector: can't commit session: %v", err) return } /* if advert.Position != nil { if _, err = c.DB.Exec(` INSERT INTO meshcore_node_position ( node_id, heard_at, latitude, longitude, position ) VALUES ( $1, $2, $3, $4, ST_SetSRID(ST_MakePoint($5, $6), 4326) ); `, nodeID, packet.Time, advert.Position.Latitude, advert.Position.Longitude, advert.Position.Latitude, advert.Position.Longitude, ); err != nil { Logger.Warnf("collector: error storing node position: %v", err) return } } } */ }