From f73d44067200f1cf5771f4e0c81bc0eba2066fa9 Mon Sep 17 00:00:00 2001 From: maze Date: Wed, 18 Feb 2026 13:32:22 +0100 Subject: [PATCH] protocol/aprs: refactored --- protocol/aprs/aprsis/proxy.go | 160 +++++++++++++++++++++++++--------- 1 file changed, 118 insertions(+), 42 deletions(-) diff --git a/protocol/aprs/aprsis/proxy.go b/protocol/aprs/aprsis/proxy.go index 3e61579..e15fc4d 100644 --- a/protocol/aprs/aprsis/proxy.go +++ b/protocol/aprs/aprsis/proxy.go @@ -4,11 +4,14 @@ import ( "bufio" "bytes" "fmt" + "math/rand/v2" "net" "strings" "sync" + "time" "git.maze.io/go/ham/protocol" + "git.maze.io/go/ham/radio" "github.com/sirupsen/logrus" ) @@ -17,12 +20,23 @@ const ( DefaultServerAddr = "rotate.aprs2.net:14580" ) +// OnProxyClientFunc callback. +type OnProxyClientFunc func(callsign string, client *ProxyClient) + type Proxy struct { - Logger *logrus.Logger - Filter string - server string - listen net.Listener - packets chan *protocol.Packet + // Logger used for the proxy. + Logger *logrus.Logger + + // Filter is an APRS range filter. + Filter string + + // OnClient gets called when a new client is logged in through the proxy. + OnClient OnProxyClientFunc + + server string + listen net.Listener + clientsMu sync.RWMutex + clients map[uint64]*ProxyClient } func NewProxy(listen, server string) (*Proxy, error) { @@ -41,9 +55,10 @@ func NewProxy(listen, server string) (*Proxy, error) { } proxy := &Proxy{ - Logger: logrus.New(), - server: server, - listen: listener, + Logger: logrus.New(), + server: server, + listen: listener, + clients: make(map[uint64]*ProxyClient), } go proxy.accept() @@ -51,34 +66,72 @@ func NewProxy(listen, server string) (*Proxy, error) { } func (proxy *Proxy) Close() error { - if proxy.packets != nil { - close(proxy.packets) - proxy.packets = nil - } return proxy.listen.Close() } -func (proxy *Proxy) RawPackets() <-chan *protocol.Packet { - if proxy.packets == nil { - proxy.packets = make(chan *protocol.Packet, 16) - } - return proxy.packets -} - func (proxy *Proxy) accept() { for { - client, err := proxy.listen.Accept() + conn, err := proxy.listen.Accept() if err != nil { proxy.Logger.Errorf("aprs-is proxy: error accepting client: %v", err) continue } - go proxy.handle(client) + client := NewProxyClient(conn) + client.filter = proxy.Filter + client.callback = func(callsign string, client *ProxyClient) { + if proxy.OnClient != nil { + proxy.OnClient(callsign, client) + } + } + go client.Run(proxy) } } -func (proxy *Proxy) handle(client net.Conn) { - defer func() { _ = client.Close() }() +func (proxy *Proxy) addClient(client *ProxyClient) { + proxy.clientsMu.Lock() + proxy.clients[client.id] = client + proxy.clientsMu.Unlock() +} + +func (proxy *Proxy) removeClient(client *ProxyClient) { + proxy.clientsMu.Lock() + delete(proxy.clients, client.id) + proxy.clientsMu.Unlock() +} + +type ProxyClient struct { + net.Conn + logger *logrus.Logger + id uint64 + filter string + myCall string + packets chan *protocol.Packet + callback OnProxyClientFunc +} + +func NewProxyClient(conn net.Conn) *ProxyClient { + return &ProxyClient{ + Conn: conn, + id: uint64(rand.Int64()), + logger: logrus.New(), + } +} + +func (client *ProxyClient) Close() error { + if client.packets != nil { + close(client.packets) + client.packets = nil + } + return client.Conn.Close() +} + +func (client *ProxyClient) Run(proxy *Proxy) { + proxy.addClient(client) + defer func() { + proxy.removeClient(client) + _ = client.Close() + }() host, _, _ := net.SplitHostPort(client.RemoteAddr().String()) proxy.Logger.Infof("aprs-is proxy[%s]: new connection", host) @@ -94,12 +147,12 @@ func (proxy *Proxy) handle(client net.Conn) { wait sync.WaitGroup call string ) - wait.Go(func() { proxy.proxy(client, server, host, "->", &call) }) - wait.Go(func() { proxy.proxy(server, client, host, "<-", nil) }) + wait.Go(func() { client.copy(client, server, host, "->", &call) }) + wait.Go(func() { client.copy(server, client, host, "<-", nil) }) wait.Wait() } -func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) { +func (client *ProxyClient) copy(dst, src net.Conn, host, dir string, call *string) { defer func() { if tcp, ok := dst.(*net.TCPConn); ok { _ = tcp.CloseWrite() @@ -112,14 +165,14 @@ func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) { for { line, err := reader.ReadBytes('\n') if err != nil { - proxy.Logger.Warnf("aprs-is proxy[%s]: %s read error: %v", host, src.RemoteAddr(), err) + client.logger.Warnf("aprs-is proxy[%s]: %s read error: %v", host, src.RemoteAddr(), err) return } // proxy to remote unaltered if len(line) > 0 { if _, err = dst.Write(line); err != nil { - proxy.Logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, dst.RemoteAddr(), err) + client.logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, dst.RemoteAddr(), err) return } } @@ -127,19 +180,27 @@ func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) { // parse line line = bytes.TrimRight(line, "\r\n") if len(line) > 0 { - proxy.Logger.Tracef("aprs-is proxy[%s]: %s %s", host, dir, string(line)) + client.logger.Tracef("aprs-is proxy[%s]: %s %s", host, dir, string(line)) - if call != nil && strings.HasPrefix(string(line), "# logresp ") { + if strings.HasPrefix(string(line), "# logresp ") { // server responds to client login part := strings.SplitN(string(line), " ", 5) if len(part) > 4 && part[3] == "verified," { - *call = part[2] - proxy.Logger.Infof("aprs-is proxy[%s]: logged in as %s", host, *call) + // Keep track of our call. + client.myCall = part[2] + if call != nil { + *call = part[2] + } + client.logger.Infof("aprs-is proxy[%s]: logged in as %s", host, *call) - if proxy.Filter != "" { - proxy.Logger.Tracef("aprs-is proxy[%s]: %s filter %s", host, dir, proxy.Filter) - if _, err = fmt.Fprintf(src, "filter %s\r\n", proxy.Filter); err != nil { - proxy.Logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, src.RemoteAddr(), err) + // Invoke OnClient callback. + client.callback(client.myCall, client) + + // Send optional range filter to the APRS-IS server. + if client.filter != "" { + client.logger.Tracef("aprs-is proxy[%s]: %s filter %s", host, dir, client.filter) + if _, err = fmt.Fprintf(src, "filter %s\r\n", client.filter); err != nil { + client.logger.Warnf("aprs-is proxy[%s]: %s write error: %v", host, src.RemoteAddr(), err) return } } @@ -147,24 +208,39 @@ func (proxy *Proxy) proxy(dst, src net.Conn, host, dir string, call *string) { } if !isCommand(line) { - proxy.handleRawPacket(line) + client.handleRawPacket(line) } } } } -func (proxy *Proxy) handleRawPacket(data []byte) { - if proxy.packets == nil { +func (client *ProxyClient) Info() *radio.Info { + // We have very little information actually, but here we go: + return &radio.Info{ + Name: client.myCall, + } +} + +func (client *ProxyClient) RawPackets() <-chan *protocol.Packet { + if client.packets == nil { + client.packets = make(chan *protocol.Packet, 16) + } + return client.packets +} + +func (client *ProxyClient) handleRawPacket(data []byte) { + if client.packets == nil { return } select { - case proxy.packets <- &protocol.Packet{ - Protocol: "aprs", + case client.packets <- &protocol.Packet{ + Time: time.Now().UTC(), + Protocol: protocol.APRS, Raw: data, }: default: - proxy.Logger.Warn("aprs-is proxy: raw packet channel full, dropping packet") + client.logger.Warn("aprs-is proxy: raw packet channel full, dropping packet") } }