package hamview import ( "database/sql" "fmt" _ "github.com/cridenour/go-postgis" // PostGIS support "github.com/lib/pq" // PostgreSQL support "git.maze.io/go/ham/protocol" "git.maze.io/go/ham/protocol/aprs" "git.maze.io/go/ham/protocol/meshcore" ) type CollectorConfig struct { Database DatabaseConfig `yaml:"database"` } type DatabaseConfig struct { Type string `yaml:"type"` Conf string `yaml:"conf"` } type Collector struct { *sql.DB meshCoreGroup map[byte][]*meshcore.Group } func NewCollector(config *CollectorConfig) (*Collector, error) { d, err := sql.Open(config.Database.Type, config.Database.Conf) if err != nil { return nil, err } for _, query := range []string{ // radio.* sqlCreateRadio, sqlIndexRadioName, sqlIndexRadioProtocol, sqlGeometryRadioPosition, // meshcore_packet.* sqlCreateMeshCorePacket, sqlIndexMeshCorePacketHash, sqlIndexMeshCorePacketPayloadType, // meshcore_node.* sqlCreateMeshCoreNode, sqlIndexMeshCoreNodeName, sqlAlterMeshCoreNodePrefix, sqlGeometryMeshCoreNodePosition, // meshcore_node_position.* sqlCreateMeshCoreNodePosition, sqlGeometryMeshCoreNodePositionPosition, sqlIndexMeshCoreNodePositionPosition, } { if _, err := d.Exec(query); err != nil { var ignore bool if err, ok := err.(*pq.Error); ok { switch err.Code { case "42701": // column "x" of relation "y" already exists (42701) ignore = true } } Logger.Debugf("collector: sql error %T: %v", err, err) if !ignore { return nil, fmt.Errorf("error in query %s: %v", query, err) } } } return &Collector{ DB: d, meshCoreGroup: make(map[byte][]*meshcore.Group), }, nil } func (c *Collector) Collect(broker Broker, topic string) error { Logger.Debugf("collector: subscribing to radios") radios, err := broker.SubscribeRadios() if err != nil { Logger.Errorf("collector: error subscribing: %v", err) return err } Logger.Debugf("collector: subscribing to %s", topic) packets, err := broker.SubscribePackets(topic) if err != nil { Logger.Errorf("collector: error subscribing to %s: %v", topic, err) return err } loop: for { select { case radio := <-radios: if radio == nil { break loop } c.processRadio(radio) case packet := <-packets: if packet == nil { break loop } switch packet.Protocol { case protocol.APRS: c.processAPRSPacket(packet) case protocol.MeshCore: c.processMeshCorePacket(packet) } } } Logger.Warnf("collector: done processing packets from %s: channel closed", topic) return nil } func (c *Collector) processRadio(radio *Radio) { Logger.Tracef("collector: process %s radio %q online %t", radio.Protocol, radio.Name, radio.IsOnline) var latitude, longitude, altitude *float64 if radio.Position != nil { latitude = &radio.Position.Latitude longitude = &radio.Position.Longitude altitude = &radio.Position.Altitude } var id int64 if err := c.QueryRow(` INSERT INTO radio ( name, is_online, device, manufacturer, firmware_date, firmware_version, antenna, modulation, protocol, latitude, longitude, altitude, frequency, rx_frequency, tx_frequency, bandwidth, power, gain, lora_sf, lora_cr, extra ) VALUES ( $1, $2, NULLIF($3, ''), -- device NULLIF($4, ''), -- manufacturer $5, NULLIF($6, ''), -- firmware_version NULLIF($7, ''), -- antenna NULLIF($8, ''), -- modulation $9, -- protocol NULLIF($10, 0.0), -- latitude NULLIF($11, 0.0), -- longitude $12, -- altitude $13, -- frequency NULLIF($14, 0.0), -- rx_frequency NULLIF($15, 0.0), -- tx_frequency $16, -- bandwidth NULLIF($17, 0.0), -- power NULLIF($18, 0.0), -- gain NULLIF($19, 0), -- lora_sf NULLIF($20, 0), -- lora_cr $21 ) ON CONFLICT (name) DO UPDATE SET is_online = $2, device = NULLIF($3, ''), manufacturer = NULLIF($4, ''), firmware_date = $5, firmware_version = NULLIF($6, ''), antenna = NULLIF($7, ''), modulation = NULLIF($8, ''), protocol = $9, latitude = NULLIF($10, 0.0), longitude = NULLIF($11, 0.0), altitude = $12, frequency = $13, rx_frequency = NULLIF($14, 0.0), tx_frequency = NULLIF($15, 0.0), bandwidth = $16, power = NULLIF($17, 0), gain = NULLIF($18, 0), lora_sf = NULLIF($19, 0), lora_cr = NULLIF($20, 0), extra = $21 RETURNING id `, radio.Name, radio.IsOnline, radio.Device, radio.Manufacturer, radio.FirmwareDate, radio.FirmwareVersion, radio.Antenna, radio.Modulation, radio.Protocol, latitude, longitude, altitude, radio.Frequency, radio.RXFrequency, radio.TXFrequency, radio.Bandwidth, radio.Power, radio.Gain, radio.LoRaSF, radio.LoRaCR, nil, ).Scan(&id); err != nil { Logger.Warnf("collector: error storing radio: %v", err) return } } func (c *Collector) processAPRSPacket(packet *protocol.Packet) { decoded, err := aprs.ParsePacket(string(packet.Raw)) if err != nil { Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err) return } Logger.Tracef("collector: process %s packet (%d bytes)", packet.Protocol, len(packet.Raw)) var id int64 if err := c.QueryRow(` INSERT INTO aprs_packet ( src_address, dst_address, comment ) VALUES ($1, $2, $3) RETURNING id; `, decoded.Src.String(), decoded.Dst.String(), decoded.Comment, ).Scan(&id); err != nil { Logger.Warnf("collector: error storing packet: %v", err) return } } func (c *Collector) processMeshCorePacket(packet *protocol.Packet) { var parsed meshcore.Packet if err := parsed.UnmarshalBytes(packet.Raw); err != nil { Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err) return } Logger.Tracef("collector: process %s %s packet (%d bytes)", packet.Protocol, parsed.PayloadType.String(), len(packet.Raw)) if len(parsed.Path) == 0 { parsed.Path = nil // store NULL } var id int64 if err := c.QueryRow(` INSERT INTO meshcore_packet ( snr, rssi, hash, route_type, payload_type, path, payload, raw, received_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id;`, packet.SNR, packet.RSSI, parsed.Hash(), parsed.RouteType, parsed.PayloadType, parsed.Path, parsed.Payload, packet.Raw, packet.Time, ).Scan(&id); err != nil { Logger.Warnf("collector: error storing packet: %v", err) return } switch parsed.PayloadType { case meshcore.TypeAdvert: payload, err := parsed.Decode() if err != nil { Logger.Warnf("collector: error decoding packet: %v", err) return } var ( advert = payload.(*meshcore.Advert) nodeID int64 latitude *float64 longitude *float64 ) if advert.Position != nil { latitude = &advert.Position.Latitude longitude = &advert.Position.Longitude } if err = c.QueryRow(` INSERT INTO meshcore_node ( node_type, public_key, name, local_time, first_heard, last_heard, last_latitude, last_longitude, last_advert_id ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9 ) ON CONFLICT (public_key) DO UPDATE SET name = $3, local_time = $4, last_heard = $6, last_latitude = $7, last_longitude = $8, last_advert_id = $9 RETURNING id `, advert.Type, advert.PublicKey.Bytes(), advert.Name, advert.Time, packet.Time, packet.Time, latitude, longitude, id, ).Scan(&nodeID); err != nil { Logger.Warnf("collector: error storing node: %v", err) return } if advert.Position != nil { if _, err = c.Exec(` INSERT INTO meshcore_node_position ( node_id, heard_at, latitude, longitude, position ) VALUES ( $1, $2, $3, $4, ST_SetSRID(ST_MakePoint($5, $6), 4326) ); `, nodeID, packet.Time, advert.Position.Latitude, advert.Position.Longitude, advert.Position.Latitude, advert.Position.Longitude, ); err != nil { Logger.Warnf("collector: error storing node position: %v", err) return } } } }