Files
hamview/collector.go
maze fb898bb058
Some checks failed
Run tests / test (1.25) (push) Has been cancelled
Run tests / test (stable) (push) Has been cancelled
Initial import
2026-02-22 20:27:07 +01:00

401 lines
8.1 KiB
Go

package hamview
import (
"database/sql"
"fmt"
_ "github.com/cridenour/go-postgis" // PostGIS support
"github.com/lib/pq" // PostgreSQL support
"git.maze.io/go/ham/protocol"
"git.maze.io/go/ham/protocol/aprs"
"git.maze.io/go/ham/protocol/meshcore"
)
type CollectorConfig struct {
Database DatabaseConfig `yaml:"database"`
}
type DatabaseConfig struct {
Type string `yaml:"type"`
Conf string `yaml:"conf"`
}
type Collector struct {
*sql.DB
meshCoreGroup map[byte][]*meshcore.Group
}
func NewCollector(config *CollectorConfig) (*Collector, error) {
d, err := sql.Open(config.Database.Type, config.Database.Conf)
if err != nil {
return nil, err
}
for _, query := range []string{
// radio.*
sqlCreateRadio,
sqlIndexRadioName,
sqlIndexRadioProtocol,
sqlGeometryRadioPosition,
// meshcore_packet.*
sqlCreateMeshCorePacket,
sqlIndexMeshCorePacketHash,
sqlIndexMeshCorePacketPayloadType,
// meshcore_node.*
sqlCreateMeshCoreNode,
sqlIndexMeshCoreNodeName,
sqlAlterMeshCoreNodePrefix,
sqlGeometryMeshCoreNodePosition,
// meshcore_node_position.*
sqlCreateMeshCoreNodePosition,
sqlGeometryMeshCoreNodePositionPosition,
sqlIndexMeshCoreNodePositionPosition,
} {
if _, err := d.Exec(query); err != nil {
var ignore bool
if err, ok := err.(*pq.Error); ok {
switch err.Code {
case "42701": // column "x" of relation "y" already exists (42701)
ignore = true
}
}
Logger.Debugf("collector: sql error %T: %v", err, err)
if !ignore {
return nil, fmt.Errorf("error in query %s: %v", query, err)
}
}
}
return &Collector{
DB: d,
meshCoreGroup: make(map[byte][]*meshcore.Group),
}, nil
}
func (c *Collector) Collect(broker Broker, topic string) error {
Logger.Debugf("collector: subscribing to radios")
radios, err := broker.SubscribeRadios()
if err != nil {
Logger.Errorf("collector: error subscribing: %v", err)
return err
}
Logger.Debugf("collector: subscribing to %s", topic)
packets, err := broker.SubscribePackets(topic)
if err != nil {
Logger.Errorf("collector: error subscribing to %s: %v", topic, err)
return err
}
loop:
for {
select {
case radio := <-radios:
if radio == nil {
break loop
}
c.processRadio(radio)
case packet := <-packets:
if packet == nil {
break loop
}
switch packet.Protocol {
case protocol.APRS:
c.processAPRSPacket(packet)
case protocol.MeshCore:
c.processMeshCorePacket(packet)
}
}
}
Logger.Warnf("collector: done processing packets from %s: channel closed", topic)
return nil
}
func (c *Collector) processRadio(radio *Radio) {
Logger.Tracef("collector: process %s radio %q online %t",
radio.Protocol,
radio.Name,
radio.IsOnline)
var latitude, longitude, altitude *float64
if radio.Position != nil {
latitude = &radio.Position.Latitude
longitude = &radio.Position.Longitude
altitude = &radio.Position.Altitude
}
var id int64
if err := c.QueryRow(`
INSERT INTO radio (
name,
is_online,
device,
manufacturer,
firmware_date,
firmware_version,
antenna,
modulation,
protocol,
latitude,
longitude,
altitude,
frequency,
rx_frequency,
tx_frequency,
bandwidth,
power,
gain,
lora_sf,
lora_cr,
extra
) VALUES (
$1,
$2,
NULLIF($3, ''), -- device
NULLIF($4, ''), -- manufacturer
$5,
NULLIF($6, ''), -- firmware_version
NULLIF($7, ''), -- antenna
NULLIF($8, ''), -- modulation
$9, -- protocol
NULLIF($10, 0.0), -- latitude
NULLIF($11, 0.0), -- longitude
$12, -- altitude
$13, -- frequency
NULLIF($14, 0.0), -- rx_frequency
NULLIF($15, 0.0), -- tx_frequency
$16, -- bandwidth
NULLIF($17, 0.0), -- power
NULLIF($18, 0.0), -- gain
NULLIF($19, 0), -- lora_sf
NULLIF($20, 0), -- lora_cr
$21
)
ON CONFLICT (name)
DO UPDATE
SET
is_online = $2,
device = NULLIF($3, ''),
manufacturer = NULLIF($4, ''),
firmware_date = $5,
firmware_version = NULLIF($6, ''),
antenna = NULLIF($7, ''),
modulation = NULLIF($8, ''),
protocol = $9,
latitude = NULLIF($10, 0.0),
longitude = NULLIF($11, 0.0),
altitude = $12,
frequency = $13,
rx_frequency = NULLIF($14, 0.0),
tx_frequency = NULLIF($15, 0.0),
bandwidth = $16,
power = NULLIF($17, 0),
gain = NULLIF($18, 0),
lora_sf = NULLIF($19, 0),
lora_cr = NULLIF($20, 0),
extra = $21
RETURNING id
`,
radio.Name,
radio.IsOnline,
radio.Device,
radio.Manufacturer,
radio.FirmwareDate,
radio.FirmwareVersion,
radio.Antenna,
radio.Modulation,
radio.Protocol,
latitude,
longitude,
altitude,
radio.Frequency,
radio.RXFrequency,
radio.TXFrequency,
radio.Bandwidth,
radio.Power,
radio.Gain,
radio.LoRaSF,
radio.LoRaCR,
nil,
).Scan(&id); err != nil {
Logger.Warnf("collector: error storing radio: %v", err)
return
}
}
func (c *Collector) processAPRSPacket(packet *protocol.Packet) {
decoded, err := aprs.ParsePacket(string(packet.Raw))
if err != nil {
Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err)
return
}
Logger.Tracef("collector: process %s packet (%d bytes)",
packet.Protocol,
len(packet.Raw))
var id int64
if err := c.QueryRow(`
INSERT INTO aprs_packet (
src_address,
dst_address,
comment
) VALUES ($1, $2, $3)
RETURNING id;
`,
decoded.Src.String(),
decoded.Dst.String(),
decoded.Comment,
).Scan(&id); err != nil {
Logger.Warnf("collector: error storing packet: %v", err)
return
}
}
func (c *Collector) processMeshCorePacket(packet *protocol.Packet) {
var parsed meshcore.Packet
if err := parsed.UnmarshalBytes(packet.Raw); err != nil {
Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err)
return
}
Logger.Tracef("collector: process %s %s packet (%d bytes)",
packet.Protocol,
parsed.PayloadType.String(),
len(packet.Raw))
if len(parsed.Path) == 0 {
parsed.Path = nil // store NULL
}
var id int64
if err := c.QueryRow(`
INSERT INTO meshcore_packet (
snr,
rssi,
hash,
route_type,
payload_type,
path,
payload,
raw,
received_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id;`,
packet.SNR,
packet.RSSI,
parsed.Hash(),
parsed.RouteType,
parsed.PayloadType,
parsed.Path,
parsed.Payload,
packet.Raw,
packet.Time,
).Scan(&id); err != nil {
Logger.Warnf("collector: error storing packet: %v", err)
return
}
switch parsed.PayloadType {
case meshcore.TypeAdvert:
payload, err := parsed.Decode()
if err != nil {
Logger.Warnf("collector: error decoding packet: %v", err)
return
}
var (
advert = payload.(*meshcore.Advert)
nodeID int64
latitude *float64
longitude *float64
)
if advert.Position != nil {
latitude = &advert.Position.Latitude
longitude = &advert.Position.Longitude
}
if err = c.QueryRow(`
INSERT INTO meshcore_node (
node_type,
public_key,
name,
local_time,
first_heard,
last_heard,
last_latitude,
last_longitude,
last_advert_id
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9
)
ON CONFLICT (public_key)
DO UPDATE
SET
name = $3,
local_time = $4,
last_heard = $6,
last_latitude = $7,
last_longitude = $8,
last_advert_id = $9
RETURNING id
`,
advert.Type,
advert.PublicKey.Bytes(),
advert.Name,
advert.Time,
packet.Time,
packet.Time,
latitude,
longitude,
id,
).Scan(&nodeID); err != nil {
Logger.Warnf("collector: error storing node: %v", err)
return
}
if advert.Position != nil {
if _, err = c.Exec(`
INSERT INTO meshcore_node_position (
node_id,
heard_at,
latitude,
longitude,
position
) VALUES (
$1,
$2,
$3,
$4,
ST_SetSRID(ST_MakePoint($5, $6), 4326)
);
`,
nodeID,
packet.Time,
advert.Position.Latitude,
advert.Position.Longitude,
advert.Position.Latitude,
advert.Position.Longitude,
); err != nil {
Logger.Warnf("collector: error storing node position: %v", err)
return
}
}
}
}