protocol/aprs: refactored
This commit is contained in:
@@ -4,11 +4,14 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand/v2"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.maze.io/go/ham/protocol"
|
"git.maze.io/go/ham/protocol"
|
||||||
|
"git.maze.io/go/ham/radio"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,12 +20,23 @@ const (
|
|||||||
DefaultServerAddr = "rotate.aprs2.net:14580"
|
DefaultServerAddr = "rotate.aprs2.net:14580"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// OnProxyClientFunc callback.
|
||||||
|
type OnProxyClientFunc func(callsign string, client *ProxyClient)
|
||||||
|
|
||||||
type Proxy struct {
|
type Proxy struct {
|
||||||
Logger *logrus.Logger
|
// Logger used for the proxy.
|
||||||
Filter string
|
Logger *logrus.Logger
|
||||||
server string
|
|
||||||
listen net.Listener
|
// Filter is an APRS range filter.
|
||||||
packets chan *protocol.Packet
|
Filter string
|
||||||
|
|
||||||
|
// OnClient gets called when a new client is logged in through the proxy.
|
||||||
|
OnClient OnProxyClientFunc
|
||||||
|
|
||||||
|
server string
|
||||||
|
listen net.Listener
|
||||||
|
clientsMu sync.RWMutex
|
||||||
|
clients map[uint64]*ProxyClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProxy(listen, server string) (*Proxy, error) {
|
func NewProxy(listen, server string) (*Proxy, error) {
|
||||||
@@ -41,9 +55,10 @@ func NewProxy(listen, server string) (*Proxy, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
proxy := &Proxy{
|
proxy := &Proxy{
|
||||||
Logger: logrus.New(),
|
Logger: logrus.New(),
|
||||||
server: server,
|
server: server,
|
||||||
listen: listener,
|
listen: listener,
|
||||||
|
clients: make(map[uint64]*ProxyClient),
|
||||||
}
|
}
|
||||||
go proxy.accept()
|
go proxy.accept()
|
||||||
|
|
||||||
@@ -51,34 +66,72 @@ func NewProxy(listen, server string) (*Proxy, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (proxy *Proxy) Close() error {
|
func (proxy *Proxy) Close() error {
|
||||||
if proxy.packets != nil {
|
|
||||||
close(proxy.packets)
|
|
||||||
proxy.packets = nil
|
|
||||||
}
|
|
||||||
return proxy.listen.Close()
|
return proxy.listen.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxy *Proxy) RawPackets() <-chan *protocol.Packet {
|
|
||||||
if proxy.packets == nil {
|
|
||||||
proxy.packets = make(chan *protocol.Packet, 16)
|
|
||||||
}
|
|
||||||
return proxy.packets
|
|
||||||
}
|
|
||||||
|
|
||||||
func (proxy *Proxy) accept() {
|
func (proxy *Proxy) accept() {
|
||||||
for {
|
for {
|
||||||
client, err := proxy.listen.Accept()
|
conn, err := proxy.listen.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
proxy.Logger.Errorf("aprs-is proxy: error accepting client: %v", err)
|
proxy.Logger.Errorf("aprs-is proxy: error accepting client: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
go proxy.handle(client)
|
client := NewProxyClient(conn)
|
||||||
|
client.filter = proxy.Filter
|
||||||
|
client.callback = func(callsign string, client *ProxyClient) {
|
||||||
|
if proxy.OnClient != nil {
|
||||||
|
proxy.OnClient(callsign, client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go client.Run(proxy)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxy *Proxy) handle(client net.Conn) {
|
func (proxy *Proxy) addClient(client *ProxyClient) {
|
||||||
defer func() { _ = client.Close() }()
|
proxy.clientsMu.Lock()
|
||||||
|
proxy.clients[client.id] = client
|
||||||
|
proxy.clientsMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (proxy *Proxy) removeClient(client *ProxyClient) {
|
||||||
|
proxy.clientsMu.Lock()
|
||||||
|
delete(proxy.clients, client.id)
|
||||||
|
proxy.clientsMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProxyClient struct {
|
||||||
|
net.Conn
|
||||||
|
logger *logrus.Logger
|
||||||
|
id uint64
|
||||||
|
filter string
|
||||||
|
myCall string
|
||||||
|
packets chan *protocol.Packet
|
||||||
|
callback OnProxyClientFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProxyClient(conn net.Conn) *ProxyClient {
|
||||||
|
return &ProxyClient{
|
||||||
|
Conn: conn,
|
||||||
|
id: uint64(rand.Int64()),
|
||||||
|
logger: logrus.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *ProxyClient) Close() error {
|
||||||
|
if client.packets != nil {
|
||||||
|
close(client.packets)
|
||||||
|
client.packets = nil
|
||||||
|
}
|
||||||
|
return client.Conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *ProxyClient) Run(proxy *Proxy) {
|
||||||
|
proxy.addClient(client)
|
||||||
|
defer func() {
|
||||||
|
proxy.removeClient(client)
|
||||||
|
_ = client.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
host, _, _ := net.SplitHostPort(client.RemoteAddr().String())
|
host, _, _ := net.SplitHostPort(client.RemoteAddr().String())
|
||||||
proxy.Logger.Infof("aprs-is proxy[%s]: new connection", host)
|
proxy.Logger.Infof("aprs-is proxy[%s]: new connection", host)
|
||||||
@@ -94,12 +147,12 @@ func (proxy *Proxy) handle(client net.Conn) {
|
|||||||
wait sync.WaitGroup
|
wait sync.WaitGroup
|
||||||
call string
|
call string
|
||||||
)
|
)
|
||||||
wait.Go(func() { proxy.proxy(client, server, host, "->", &call) })
|
wait.Go(func() { client.copy(client, server, host, "->", &call) })
|
||||||
wait.Go(func() { proxy.proxy(server, client, host, "<-", nil) })
|
wait.Go(func() { client.copy(server, client, host, "<-", nil) })
|
||||||
wait.Wait()
|
wait.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) {
|
func (client *ProxyClient) copy(dst, src net.Conn, host, dir string, call *string) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if tcp, ok := dst.(*net.TCPConn); ok {
|
if tcp, ok := dst.(*net.TCPConn); ok {
|
||||||
_ = tcp.CloseWrite()
|
_ = tcp.CloseWrite()
|
||||||
@@ -112,14 +165,14 @@ func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) {
|
|||||||
for {
|
for {
|
||||||
line, err := reader.ReadBytes('\n')
|
line, err := reader.ReadBytes('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
proxy.Logger.Warnf("aprs-is proxy[%s]: %s read error: %v", host, src.RemoteAddr(), err)
|
client.logger.Warnf("aprs-is proxy[%s]: %s read error: %v", host, src.RemoteAddr(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// proxy to remote unaltered
|
// proxy to remote unaltered
|
||||||
if len(line) > 0 {
|
if len(line) > 0 {
|
||||||
if _, err = dst.Write(line); err != nil {
|
if _, err = dst.Write(line); err != nil {
|
||||||
proxy.Logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, dst.RemoteAddr(), err)
|
client.logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, dst.RemoteAddr(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -127,19 +180,27 @@ func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) {
|
|||||||
// parse line
|
// parse line
|
||||||
line = bytes.TrimRight(line, "\r\n")
|
line = bytes.TrimRight(line, "\r\n")
|
||||||
if len(line) > 0 {
|
if len(line) > 0 {
|
||||||
proxy.Logger.Tracef("aprs-is proxy[%s]: %s %s", host, dir, string(line))
|
client.logger.Tracef("aprs-is proxy[%s]: %s %s", host, dir, string(line))
|
||||||
|
|
||||||
if call != nil && strings.HasPrefix(string(line), "# logresp ") {
|
if strings.HasPrefix(string(line), "# logresp ") {
|
||||||
// server responds to client login
|
// server responds to client login
|
||||||
part := strings.SplitN(string(line), " ", 5)
|
part := strings.SplitN(string(line), " ", 5)
|
||||||
if len(part) > 4 && part[3] == "verified," {
|
if len(part) > 4 && part[3] == "verified," {
|
||||||
*call = part[2]
|
// Keep track of our call.
|
||||||
proxy.Logger.Infof("aprs-is proxy[%s]: logged in as %s", host, *call)
|
client.myCall = part[2]
|
||||||
|
if call != nil {
|
||||||
|
*call = part[2]
|
||||||
|
}
|
||||||
|
client.logger.Infof("aprs-is proxy[%s]: logged in as %s", host, *call)
|
||||||
|
|
||||||
if proxy.Filter != "" {
|
// Invoke OnClient callback.
|
||||||
proxy.Logger.Tracef("aprs-is proxy[%s]: %s filter %s", host, dir, proxy.Filter)
|
client.callback(client.myCall, client)
|
||||||
if _, err = fmt.Fprintf(src, "filter %s\r\n", proxy.Filter); err != nil {
|
|
||||||
proxy.Logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, src.RemoteAddr(), err)
|
// Send optional range filter to the APRS-IS server.
|
||||||
|
if client.filter != "" {
|
||||||
|
client.logger.Tracef("aprs-is proxy[%s]: %s filter %s", host, dir, client.filter)
|
||||||
|
if _, err = fmt.Fprintf(src, "filter %s\r\n", client.filter); err != nil {
|
||||||
|
client.logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, src.RemoteAddr(), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -147,24 +208,39 @@ func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !isCommand(line) {
|
if !isCommand(line) {
|
||||||
proxy.handleRawPacket(line)
|
client.handleRawPacket(line)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxy *Proxy) handleRawPacket(data []byte) {
|
func (client *ProxyClient) Info() *radio.Info {
|
||||||
if proxy.packets == nil {
|
// We have very little information actually, but here we go:
|
||||||
|
return &radio.Info{
|
||||||
|
Name: client.myCall,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *ProxyClient) RawPackets() <-chan *protocol.Packet {
|
||||||
|
if client.packets == nil {
|
||||||
|
client.packets = make(chan *protocol.Packet, 16)
|
||||||
|
}
|
||||||
|
return client.packets
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *ProxyClient) handleRawPacket(data []byte) {
|
||||||
|
if client.packets == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case proxy.packets <- &protocol.Packet{
|
case client.packets <- &protocol.Packet{
|
||||||
Protocol: "aprs",
|
Time: time.Now().UTC(),
|
||||||
|
Protocol: protocol.APRS,
|
||||||
Raw: data,
|
Raw: data,
|
||||||
}:
|
}:
|
||||||
default:
|
default:
|
||||||
proxy.Logger.Warn("aprs-is proxy: raw packet channel full, dropping packet")
|
client.logger.Warn("aprs-is proxy: raw packet channel full, dropping packet")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user