#!/usr/bin/env python3 """ KISS Modem Python Integration Test Client Communicates with LoRa KISS modem via KISS protocol over serial/stdio. Supports sending/receiving LoRa packets and configuring radio parameters. """ import struct import time from typing import Optional, Tuple, List import sys class KISSProtocol: """KISS (Keep It Simple, Stupid) protocol implementation.""" # Frame delimiters and escapes FEND = 0xC0 FESC = 0xDB TFEND = 0xDC TFESC = 0xDD # Port assignments PORT_DATA = 0 PORT_QUALITY = 1 PORT_CONFIG = 2 # Config command opcodes CMD_RES_OK = 0x01 CMD_RES_ERROR = 0x02 CMD_GET_RADIO = 0x10 CMD_SET_RADIO = 0x11 CMD_GET_FREQUENCY = 0x12 CMD_SET_FREQUENCY = 0x13 CMD_GET_BANDWIDTH = 0x14 CMD_SET_BANDWIDTH = 0x15 CMD_GET_SF = 0x16 CMD_SET_SF = 0x17 CMD_GET_CR = 0x18 CMD_SET_CR = 0x19 CMD_GET_POWER = 0x1A CMD_SET_POWER = 0x1B CMD_GET_SYNCWORD = 0x1C CMD_SET_SYNCWORD = 0x1D @staticmethod def encode(port: int, data: bytes) -> bytes: """Encode data into a KISS frame.""" frame = bytearray() frame.append(KISSProtocol.FEND) # Type byte: port in upper nibble, command in lower nibble (0 for data) type_byte = (port << 4) | 0x00 if type_byte == KISSProtocol.FEND: frame.append(KISSProtocol.FESC) frame.append(KISSProtocol.TFEND) elif type_byte == KISSProtocol.FESC: frame.append(KISSProtocol.FESC) frame.append(KISSProtocol.TFESC) else: frame.append(type_byte) # Payload with escaping for byte in data: if byte == KISSProtocol.FEND: frame.append(KISSProtocol.FESC) frame.append(KISSProtocol.TFEND) elif byte == KISSProtocol.FESC: frame.append(KISSProtocol.FESC) frame.append(KISSProtocol.TFESC) else: frame.append(byte) frame.append(KISSProtocol.FEND) return bytes(frame) @staticmethod def decode(data: bytes) -> Optional[Tuple[int, bytes]]: """Decode a KISS frame. Returns (port, payload) or None if incomplete.""" if len(data) < 2 or data[0] != KISSProtocol.FEND: return None state = "idle" frame_data = bytearray() for i in range(1, len(data)): byte = data[i] if byte == KISSProtocol.FEND: if len(frame_data) > 0: port = frame_data[0] >> 4 payload = bytes(frame_data[1:]) return (port, payload) else: return None if state == "idle": if byte == KISSProtocol.FESC: state = "escape" else: frame_data.append(byte) elif state == "escape": if byte == KISSProtocol.TFEND: frame_data.append(KISSProtocol.FEND) elif byte == KISSProtocol.TFESC: frame_data.append(KISSProtocol.FESC) state = "idle" return None class KISSModemClient: """Client for communicating with LoRa KISS modem.""" def __init__(self, port, baudrate=115200, timeout=1.0): """ Initialize the KISS modem client. Args: port: Serial port name (e.g. '/dev/ttyUSB0') or file object for stdio baudrate: Serial baud rate (ignored for file objects) timeout: Read timeout in seconds """ if isinstance(port, str): import serial self.serial = serial.Serial(port, baudrate, timeout=timeout) else: self.serial = port self.timeout = timeout self._buffer = bytearray() def close(self): """Close the serial connection.""" if hasattr(self.serial, "close"): self.serial.close() def write(self, data: bytes) -> int: """Write data to the modem.""" return self.serial.write(data) def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes: """Read data from the modem with timeout.""" if timeout is None: timeout = self.timeout start = time.time() data = bytearray() while time.time() - start < timeout: try: chunk = self.serial.read(size) if chunk: data.extend(chunk) # Try to decode a complete frame for i in range(len(data)): if data[i] == KISSProtocol.FEND: frame_end = data.find(KISSProtocol.FEND, i + 1) if frame_end != -1: return bytes(data[i : frame_end + 1]) else: time.sleep(0.01) except Exception: time.sleep(0.01) return bytes(data) def send_data(self, payload: bytes) -> bool: """Send raw LoRa packet data.""" frame = KISSProtocol.encode(KISSProtocol.PORT_DATA, payload) self.write(frame) return True def recv_data(self, timeout: Optional[float] = None) -> Optional[bytes]: """Receive raw LoRa packet data.""" while True: frame = self.read(timeout=timeout or self.timeout) if not frame: return None result = KISSProtocol.decode(frame) if result: port, payload = result if port == KISSProtocol.PORT_DATA: return payload # Skip non-data frames, keep reading def recv_quality(self, timeout: Optional[float] = None) -> Optional[Tuple[int, int]]: """Receive signal quality (SNR, RSSI).""" while True: frame = self.read(timeout=timeout or self.timeout) if not frame: return None result = KISSProtocol.decode(frame) if result: port, payload = result if port == KISSProtocol.PORT_QUALITY and len(payload) >= 3: snr = struct.unpack(">b", bytes([payload[0]]))[0] rssi = struct.unpack(">h", payload[1:3])[0] return (snr, rssi) # Skip non-quality frames, keep reading def _send_command(self, cmd: int, data: bytes = b"") -> bool: """Send a config command.""" payload = bytes([cmd]) + data frame = KISSProtocol.encode(KISSProtocol.PORT_CONFIG, payload) self.write(frame) return True def _recv_response( self, timeout: Optional[float] = None ) -> Optional[Tuple[int, bytes]]: """Receive a config response.""" while True: frame = self.read(timeout=timeout or self.timeout) if not frame: return None result = KISSProtocol.decode(frame) if result: port, payload = result if port == KISSProtocol.PORT_CONFIG and len(payload) > 0: return (payload[0], payload[1:]) def get_radio_config(self) -> Optional[dict]: """Get current radio configuration.""" if not self._send_command(KISSProtocol.CMD_GET_RADIO): return None resp = self._recv_response() if not resp or resp[0] != KISSProtocol.CMD_GET_RADIO: return None data = resp[1] if len(data) < 13: return None freq_khz, bw_hz, sf, cr, power_dbm = struct.unpack(">IIBBB", data[:10]) return { "freq_khz": freq_khz, "bw_hz": bw_hz, "sf": sf, "cr": cr, "power_dbm": power_dbm, } def set_frequency(self, freq_khz: int) -> bool: """Set LoRa frequency.""" data = struct.pack(">I", freq_khz) if not self._send_command(KISSProtocol.CMD_SET_FREQUENCY, data): return False resp = self._recv_response() return resp and resp[0] == KISSProtocol.CMD_RES_OK def get_frequency(self) -> Optional[int]: """Get current frequency.""" if not self._send_command(KISSProtocol.CMD_GET_FREQUENCY): return None resp = self._recv_response() if not resp or resp[0] != KISSProtocol.CMD_GET_FREQUENCY: return None if len(resp[1]) >= 4: return struct.unpack(">I", resp[1][:4])[0] return None def set_bandwidth(self, bw_hz: int) -> bool: """Set LoRa bandwidth.""" data = struct.pack(">I", bw_hz) if not self._send_command(KISSProtocol.CMD_SET_BANDWIDTH, data): return False resp = self._recv_response() return resp and resp[0] == KISSProtocol.CMD_RES_OK def get_bandwidth(self) -> Optional[int]: """Get current bandwidth.""" if not self._send_command(KISSProtocol.CMD_GET_BANDWIDTH): return None resp = self._recv_response() if not resp or resp[0] != KISSProtocol.CMD_GET_BANDWIDTH: return None if len(resp[1]) >= 4: return struct.unpack(">I", resp[1][:4])[0] return None def set_spreading_factor(self, sf: int) -> bool: """Set spreading factor (5-12).""" data = bytes([sf]) if not self._send_command(KISSProtocol.CMD_SET_SF, data): return False resp = self._recv_response() return resp and resp[0] == KISSProtocol.CMD_RES_OK def get_spreading_factor(self) -> Optional[int]: """Get current spreading factor.""" if not self._send_command(KISSProtocol.CMD_GET_SF): return None resp = self._recv_response() if not resp or resp[0] != KISSProtocol.CMD_GET_SF: return None if len(resp[1]) >= 1: return resp[1][0] return None def set_coding_rate(self, cr: int) -> bool: """Set coding rate (5-8, denominator of 4/CR).""" data = bytes([cr]) if not self._send_command(KISSProtocol.CMD_SET_CR, data): return False resp = self._recv_response() return resp and resp[0] == KISSProtocol.CMD_RES_OK def get_coding_rate(self) -> Optional[int]: """Get current coding rate.""" if not self._send_command(KISSProtocol.CMD_GET_CR): return None resp = self._recv_response() if not resp or resp[0] != KISSProtocol.CMD_GET_CR: return None if len(resp[1]) >= 1: return resp[1][0] return None def set_power(self, power_dbm: int) -> bool: """Set transmit power in dBm.""" data = struct.pack(">b", power_dbm) if not self._send_command(KISSProtocol.CMD_SET_POWER, data): return False resp = self._recv_response() return resp and resp[0] == KISSProtocol.CMD_RES_OK def get_power(self) -> Optional[int]: """Get current transmit power.""" if not self._send_command(KISSProtocol.CMD_GET_POWER): return None resp = self._recv_response() if not resp or resp[0] != KISSProtocol.CMD_GET_POWER: return None if len(resp[1]) >= 1: return struct.unpack(">b", bytes([resp[1][0]]))[0] return None def set_syncword(self, syncword: int) -> bool: """Set LoRa syncword.""" data = bytes([syncword]) if not self._send_command(KISSProtocol.CMD_SET_SYNCWORD, data): return False resp = self._recv_response() return resp and resp[0] == KISSProtocol.CMD_RES_OK def get_syncword(self) -> Optional[int]: """Get current syncword.""" if not self._send_command(KISSProtocol.CMD_GET_SYNCWORD): return None resp = self._recv_response() if not resp or resp[0] != KISSProtocol.CMD_GET_SYNCWORD: return None if len(resp[1]) >= 1: return resp[1][0] return None if __name__ == "__main__": # Example usage if len(sys.argv) > 1: port = sys.argv[1] else: port = "/dev/ttyUSB0" try: client = KISSModemClient(port, timeout=1.0) print(f"Connected to {port}") # Get current config config = client.get_radio_config() if config: print(f"Current config: {config}") else: print("Failed to get radio config") client.close() except Exception as e: print(f"Error: {e}")