401 lines
8.1 KiB
Go
401 lines
8.1 KiB
Go
package hamview
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
|
|
_ "github.com/cridenour/go-postgis" // PostGIS support
|
|
"github.com/lib/pq" // PostgreSQL support
|
|
|
|
"git.maze.io/go/ham/protocol"
|
|
"git.maze.io/go/ham/protocol/aprs"
|
|
"git.maze.io/go/ham/protocol/meshcore"
|
|
)
|
|
|
|
type CollectorConfig struct {
|
|
Database DatabaseConfig `yaml:"database"`
|
|
}
|
|
|
|
type DatabaseConfig struct {
|
|
Type string `yaml:"type"`
|
|
Conf string `yaml:"conf"`
|
|
}
|
|
|
|
type Collector struct {
|
|
*sql.DB
|
|
|
|
meshCoreGroup map[byte][]*meshcore.Group
|
|
}
|
|
|
|
func NewCollector(config *CollectorConfig) (*Collector, error) {
|
|
d, err := sql.Open(config.Database.Type, config.Database.Conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, query := range []string{
|
|
// radio.*
|
|
sqlCreateRadio,
|
|
sqlIndexRadioName,
|
|
sqlIndexRadioProtocol,
|
|
sqlGeometryRadioPosition,
|
|
|
|
// meshcore_packet.*
|
|
sqlCreateMeshCorePacket,
|
|
sqlIndexMeshCorePacketHash,
|
|
sqlIndexMeshCorePacketPayloadType,
|
|
|
|
// meshcore_node.*
|
|
sqlCreateMeshCoreNode,
|
|
sqlIndexMeshCoreNodeName,
|
|
sqlAlterMeshCoreNodePrefix,
|
|
sqlGeometryMeshCoreNodePosition,
|
|
|
|
// meshcore_node_position.*
|
|
sqlCreateMeshCoreNodePosition,
|
|
sqlGeometryMeshCoreNodePositionPosition,
|
|
sqlIndexMeshCoreNodePositionPosition,
|
|
} {
|
|
if _, err := d.Exec(query); err != nil {
|
|
var ignore bool
|
|
if err, ok := err.(*pq.Error); ok {
|
|
switch err.Code {
|
|
case "42701": // column "x" of relation "y" already exists (42701)
|
|
ignore = true
|
|
}
|
|
}
|
|
Logger.Debugf("collector: sql error %T: %v", err, err)
|
|
if !ignore {
|
|
return nil, fmt.Errorf("error in query %s: %v", query, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return &Collector{
|
|
DB: d,
|
|
meshCoreGroup: make(map[byte][]*meshcore.Group),
|
|
}, nil
|
|
}
|
|
|
|
func (c *Collector) Collect(broker Broker, topic string) error {
|
|
Logger.Debugf("collector: subscribing to radios")
|
|
radios, err := broker.SubscribeRadios()
|
|
if err != nil {
|
|
Logger.Errorf("collector: error subscribing: %v", err)
|
|
return err
|
|
}
|
|
|
|
Logger.Debugf("collector: subscribing to %s", topic)
|
|
packets, err := broker.SubscribePackets(topic)
|
|
if err != nil {
|
|
Logger.Errorf("collector: error subscribing to %s: %v", topic, err)
|
|
return err
|
|
}
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case radio := <-radios:
|
|
if radio == nil {
|
|
break loop
|
|
}
|
|
c.processRadio(radio)
|
|
|
|
case packet := <-packets:
|
|
if packet == nil {
|
|
break loop
|
|
}
|
|
switch packet.Protocol {
|
|
case protocol.APRS:
|
|
c.processAPRSPacket(packet)
|
|
case protocol.MeshCore:
|
|
c.processMeshCorePacket(packet)
|
|
}
|
|
}
|
|
}
|
|
|
|
Logger.Warnf("collector: done processing packets from %s: channel closed", topic)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Collector) processRadio(radio *Radio) {
|
|
Logger.Tracef("collector: process %s radio %q online %t",
|
|
radio.Protocol,
|
|
radio.Name,
|
|
radio.IsOnline)
|
|
|
|
var latitude, longitude, altitude *float64
|
|
if radio.Position != nil {
|
|
latitude = &radio.Position.Latitude
|
|
longitude = &radio.Position.Longitude
|
|
altitude = &radio.Position.Altitude
|
|
}
|
|
|
|
var id int64
|
|
if err := c.QueryRow(`
|
|
INSERT INTO radio (
|
|
name,
|
|
is_online,
|
|
device,
|
|
manufacturer,
|
|
firmware_date,
|
|
firmware_version,
|
|
antenna,
|
|
modulation,
|
|
protocol,
|
|
latitude,
|
|
longitude,
|
|
altitude,
|
|
frequency,
|
|
rx_frequency,
|
|
tx_frequency,
|
|
bandwidth,
|
|
power,
|
|
gain,
|
|
lora_sf,
|
|
lora_cr,
|
|
extra
|
|
) VALUES (
|
|
$1,
|
|
$2,
|
|
NULLIF($3, ''), -- device
|
|
NULLIF($4, ''), -- manufacturer
|
|
$5,
|
|
NULLIF($6, ''), -- firmware_version
|
|
NULLIF($7, ''), -- antenna
|
|
NULLIF($8, ''), -- modulation
|
|
$9, -- protocol
|
|
NULLIF($10, 0.0), -- latitude
|
|
NULLIF($11, 0.0), -- longitude
|
|
$12, -- altitude
|
|
$13, -- frequency
|
|
NULLIF($14, 0.0), -- rx_frequency
|
|
NULLIF($15, 0.0), -- tx_frequency
|
|
$16, -- bandwidth
|
|
NULLIF($17, 0.0), -- power
|
|
NULLIF($18, 0.0), -- gain
|
|
NULLIF($19, 0), -- lora_sf
|
|
NULLIF($20, 0), -- lora_cr
|
|
$21
|
|
)
|
|
ON CONFLICT (name)
|
|
DO UPDATE
|
|
SET
|
|
is_online = $2,
|
|
device = NULLIF($3, ''),
|
|
manufacturer = NULLIF($4, ''),
|
|
firmware_date = $5,
|
|
firmware_version = NULLIF($6, ''),
|
|
antenna = NULLIF($7, ''),
|
|
modulation = NULLIF($8, ''),
|
|
protocol = $9,
|
|
latitude = NULLIF($10, 0.0),
|
|
longitude = NULLIF($11, 0.0),
|
|
altitude = $12,
|
|
frequency = $13,
|
|
rx_frequency = NULLIF($14, 0.0),
|
|
tx_frequency = NULLIF($15, 0.0),
|
|
bandwidth = $16,
|
|
power = NULLIF($17, 0),
|
|
gain = NULLIF($18, 0),
|
|
lora_sf = NULLIF($19, 0),
|
|
lora_cr = NULLIF($20, 0),
|
|
extra = $21
|
|
RETURNING id
|
|
`,
|
|
radio.Name,
|
|
radio.IsOnline,
|
|
radio.Device,
|
|
radio.Manufacturer,
|
|
radio.FirmwareDate,
|
|
radio.FirmwareVersion,
|
|
radio.Antenna,
|
|
radio.Modulation,
|
|
radio.Protocol,
|
|
latitude,
|
|
longitude,
|
|
altitude,
|
|
radio.Frequency,
|
|
radio.RXFrequency,
|
|
radio.TXFrequency,
|
|
radio.Bandwidth,
|
|
radio.Power,
|
|
radio.Gain,
|
|
radio.LoRaSF,
|
|
radio.LoRaCR,
|
|
nil,
|
|
).Scan(&id); err != nil {
|
|
Logger.Warnf("collector: error storing radio: %v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (c *Collector) processAPRSPacket(packet *protocol.Packet) {
|
|
decoded, err := aprs.ParsePacket(string(packet.Raw))
|
|
if err != nil {
|
|
Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err)
|
|
return
|
|
}
|
|
|
|
Logger.Tracef("collector: process %s packet (%d bytes)",
|
|
packet.Protocol,
|
|
len(packet.Raw))
|
|
|
|
var id int64
|
|
if err := c.QueryRow(`
|
|
INSERT INTO aprs_packet (
|
|
src_address,
|
|
dst_address,
|
|
comment
|
|
) VALUES ($1, $2, $3)
|
|
RETURNING id;
|
|
`,
|
|
decoded.Src.String(),
|
|
decoded.Dst.String(),
|
|
decoded.Comment,
|
|
).Scan(&id); err != nil {
|
|
Logger.Warnf("collector: error storing packet: %v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (c *Collector) processMeshCorePacket(packet *protocol.Packet) {
|
|
var parsed meshcore.Packet
|
|
if err := parsed.UnmarshalBytes(packet.Raw); err != nil {
|
|
Logger.Warnf("collector: invalid %s packet: %v", packet.Protocol, err)
|
|
return
|
|
}
|
|
|
|
Logger.Tracef("collector: process %s %s packet (%d bytes)",
|
|
packet.Protocol,
|
|
parsed.PayloadType.String(),
|
|
len(packet.Raw))
|
|
|
|
if len(parsed.Path) == 0 {
|
|
parsed.Path = nil // store NULL
|
|
}
|
|
|
|
var id int64
|
|
if err := c.QueryRow(`
|
|
INSERT INTO meshcore_packet (
|
|
snr,
|
|
rssi,
|
|
hash,
|
|
route_type,
|
|
payload_type,
|
|
path,
|
|
payload,
|
|
raw,
|
|
received_at
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
RETURNING id;`,
|
|
packet.SNR,
|
|
packet.RSSI,
|
|
parsed.Hash(),
|
|
parsed.RouteType,
|
|
parsed.PayloadType,
|
|
parsed.Path,
|
|
parsed.Payload,
|
|
packet.Raw,
|
|
packet.Time,
|
|
).Scan(&id); err != nil {
|
|
Logger.Warnf("collector: error storing packet: %v", err)
|
|
return
|
|
}
|
|
|
|
switch parsed.PayloadType {
|
|
case meshcore.TypeAdvert:
|
|
payload, err := parsed.Decode()
|
|
if err != nil {
|
|
Logger.Warnf("collector: error decoding packet: %v", err)
|
|
return
|
|
}
|
|
|
|
var (
|
|
advert = payload.(*meshcore.Advert)
|
|
nodeID int64
|
|
latitude *float64
|
|
longitude *float64
|
|
)
|
|
if advert.Position != nil {
|
|
latitude = &advert.Position.Latitude
|
|
longitude = &advert.Position.Longitude
|
|
}
|
|
if err = c.QueryRow(`
|
|
INSERT INTO meshcore_node (
|
|
node_type,
|
|
public_key,
|
|
name,
|
|
local_time,
|
|
first_heard,
|
|
last_heard,
|
|
last_latitude,
|
|
last_longitude,
|
|
last_advert_id
|
|
) VALUES (
|
|
$1,
|
|
$2,
|
|
$3,
|
|
$4,
|
|
$5,
|
|
$6,
|
|
$7,
|
|
$8,
|
|
$9
|
|
)
|
|
ON CONFLICT (public_key)
|
|
DO UPDATE
|
|
SET
|
|
name = $3,
|
|
local_time = $4,
|
|
last_heard = $6,
|
|
last_latitude = $7,
|
|
last_longitude = $8,
|
|
last_advert_id = $9
|
|
RETURNING id
|
|
`,
|
|
advert.Type,
|
|
advert.PublicKey.Bytes(),
|
|
advert.Name,
|
|
advert.Time,
|
|
packet.Time,
|
|
packet.Time,
|
|
latitude,
|
|
longitude,
|
|
id,
|
|
).Scan(&nodeID); err != nil {
|
|
Logger.Warnf("collector: error storing node: %v", err)
|
|
return
|
|
}
|
|
|
|
if advert.Position != nil {
|
|
if _, err = c.Exec(`
|
|
INSERT INTO meshcore_node_position (
|
|
node_id,
|
|
heard_at,
|
|
latitude,
|
|
longitude,
|
|
position
|
|
) VALUES (
|
|
$1,
|
|
$2,
|
|
$3,
|
|
$4,
|
|
ST_SetSRID(ST_MakePoint($5, $6), 4326)
|
|
);
|
|
`,
|
|
nodeID,
|
|
packet.Time,
|
|
advert.Position.Latitude,
|
|
advert.Position.Longitude,
|
|
advert.Position.Latitude,
|
|
advert.Position.Longitude,
|
|
); err != nil {
|
|
Logger.Warnf("collector: error storing node position: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|