Files
lorakiss/test/kiss_client.py
Maze X 20b3aae1e3 Add Python KISS client and integration test framework
- test/kiss_client.py: Full-featured KISS modem client library
  Supports all 14 config commands, KISS frame encoding/decoding,
  signal quality reporting, packet send/receive
  Can communicate over serial ports or file-like objects (stdin/stdout)

- test/test_integration.py: Integration test suite using kiss_client.py
  Tests radio configuration, parameter setting, KISS frame encoding/decoding
  Designed to run against modem or simulator
  Run with: python3 test/test_integration.py

- test/main_simulator.cpp: Hardware simulator scaffold for native platform
  Mocks SX126X radio with configurable state
  Communicates via stdin/stdout for easy testing
  Note: Full compilation requires Arduino.h mock (future work)

Verified: seeed_xiao_s3_wio_sx1262 and heltec_v3 still build without errors
2026-03-27 17:46:33 +01:00

403 lines
12 KiB
Python

#!/usr/bin/env python3
"""
KISS Modem Python Integration Test Client
Communicates with LoRa KISS modem via KISS protocol over serial/stdio.
Supports sending/receiving LoRa packets and configuring radio parameters.
"""
import struct
import time
from typing import Optional, Tuple, List
import sys
class KISSProtocol:
"""KISS (Keep It Simple, Stupid) protocol implementation."""
# Frame delimiters and escapes
FEND = 0xC0
FESC = 0xDB
TFEND = 0xDC
TFESC = 0xDD
# Port assignments
PORT_DATA = 0
PORT_QUALITY = 1
PORT_CONFIG = 2
# Config command opcodes
CMD_RES_OK = 0x01
CMD_RES_ERROR = 0x02
CMD_GET_RADIO = 0x10
CMD_SET_RADIO = 0x11
CMD_GET_FREQUENCY = 0x12
CMD_SET_FREQUENCY = 0x13
CMD_GET_BANDWIDTH = 0x14
CMD_SET_BANDWIDTH = 0x15
CMD_GET_SF = 0x16
CMD_SET_SF = 0x17
CMD_GET_CR = 0x18
CMD_SET_CR = 0x19
CMD_GET_POWER = 0x1A
CMD_SET_POWER = 0x1B
CMD_GET_SYNCWORD = 0x1C
CMD_SET_SYNCWORD = 0x1D
@staticmethod
def encode(port: int, data: bytes) -> bytes:
"""Encode data into a KISS frame."""
frame = bytearray()
frame.append(KISSProtocol.FEND)
# Type byte: port in upper nibble, command in lower nibble (0 for data)
type_byte = (port << 4) | 0x00
if type_byte == KISSProtocol.FEND:
frame.append(KISSProtocol.FESC)
frame.append(KISSProtocol.TFEND)
elif type_byte == KISSProtocol.FESC:
frame.append(KISSProtocol.FESC)
frame.append(KISSProtocol.TFESC)
else:
frame.append(type_byte)
# Payload with escaping
for byte in data:
if byte == KISSProtocol.FEND:
frame.append(KISSProtocol.FESC)
frame.append(KISSProtocol.TFEND)
elif byte == KISSProtocol.FESC:
frame.append(KISSProtocol.FESC)
frame.append(KISSProtocol.TFESC)
else:
frame.append(byte)
frame.append(KISSProtocol.FEND)
return bytes(frame)
@staticmethod
def decode(data: bytes) -> Optional[Tuple[int, bytes]]:
"""Decode a KISS frame. Returns (port, payload) or None if incomplete."""
if len(data) < 2 or data[0] != KISSProtocol.FEND:
return None
state = "idle"
frame_data = bytearray()
for i in range(1, len(data)):
byte = data[i]
if byte == KISSProtocol.FEND:
if len(frame_data) > 0:
port = frame_data[0] >> 4
payload = bytes(frame_data[1:])
return (port, payload)
else:
return None
if state == "idle":
if byte == KISSProtocol.FESC:
state = "escape"
else:
frame_data.append(byte)
elif state == "escape":
if byte == KISSProtocol.TFEND:
frame_data.append(KISSProtocol.FEND)
elif byte == KISSProtocol.TFESC:
frame_data.append(KISSProtocol.FESC)
state = "idle"
return None
class KISSModemClient:
"""Client for communicating with LoRa KISS modem."""
def __init__(self, port, baudrate=115200, timeout=1.0):
"""
Initialize the KISS modem client.
Args:
port: Serial port name (e.g. '/dev/ttyUSB0') or file object for stdio
baudrate: Serial baud rate (ignored for file objects)
timeout: Read timeout in seconds
"""
if isinstance(port, str):
import serial
self.serial = serial.Serial(port, baudrate, timeout=timeout)
else:
self.serial = port
self.timeout = timeout
self._buffer = bytearray()
def close(self):
"""Close the serial connection."""
if hasattr(self.serial, "close"):
self.serial.close()
def write(self, data: bytes) -> int:
"""Write data to the modem."""
return self.serial.write(data)
def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
"""Read data from the modem with timeout."""
if timeout is None:
timeout = self.timeout
start = time.time()
data = bytearray()
while time.time() - start < timeout:
try:
chunk = self.serial.read(size)
if chunk:
data.extend(chunk)
# Try to decode a complete frame
for i in range(len(data)):
if data[i] == KISSProtocol.FEND:
frame_end = data.find(KISSProtocol.FEND, i + 1)
if frame_end != -1:
return bytes(data[i : frame_end + 1])
else:
time.sleep(0.01)
except Exception:
time.sleep(0.01)
return bytes(data)
def send_data(self, payload: bytes) -> bool:
"""Send raw LoRa packet data."""
frame = KISSProtocol.encode(KISSProtocol.PORT_DATA, payload)
self.write(frame)
return True
def recv_data(self, timeout: Optional[float] = None) -> Optional[bytes]:
"""Receive raw LoRa packet data."""
while True:
frame = self.read(timeout=timeout or self.timeout)
if not frame:
return None
result = KISSProtocol.decode(frame)
if result:
port, payload = result
if port == KISSProtocol.PORT_DATA:
return payload
# Skip non-data frames, keep reading
def recv_quality(self, timeout: Optional[float] = None) -> Optional[Tuple[int, int]]:
"""Receive signal quality (SNR, RSSI)."""
while True:
frame = self.read(timeout=timeout or self.timeout)
if not frame:
return None
result = KISSProtocol.decode(frame)
if result:
port, payload = result
if port == KISSProtocol.PORT_QUALITY and len(payload) >= 3:
snr = struct.unpack(">b", bytes([payload[0]]))[0]
rssi = struct.unpack(">h", payload[1:3])[0]
return (snr, rssi)
# Skip non-quality frames, keep reading
def _send_command(self, cmd: int, data: bytes = b"") -> bool:
"""Send a config command."""
payload = bytes([cmd]) + data
frame = KISSProtocol.encode(KISSProtocol.PORT_CONFIG, payload)
self.write(frame)
return True
def _recv_response(
self, timeout: Optional[float] = None
) -> Optional[Tuple[int, bytes]]:
"""Receive a config response."""
while True:
frame = self.read(timeout=timeout or self.timeout)
if not frame:
return None
result = KISSProtocol.decode(frame)
if result:
port, payload = result
if port == KISSProtocol.PORT_CONFIG and len(payload) > 0:
return (payload[0], payload[1:])
def get_radio_config(self) -> Optional[dict]:
"""Get current radio configuration."""
if not self._send_command(KISSProtocol.CMD_GET_RADIO):
return None
resp = self._recv_response()
if not resp or resp[0] != KISSProtocol.CMD_GET_RADIO:
return None
data = resp[1]
if len(data) < 13:
return None
freq_khz, bw_hz, sf, cr, power_dbm = struct.unpack(">IIBBB", data[:10])
return {
"freq_khz": freq_khz,
"bw_hz": bw_hz,
"sf": sf,
"cr": cr,
"power_dbm": power_dbm,
}
def set_frequency(self, freq_khz: int) -> bool:
"""Set LoRa frequency."""
data = struct.pack(">I", freq_khz)
if not self._send_command(KISSProtocol.CMD_SET_FREQUENCY, data):
return False
resp = self._recv_response()
return resp and resp[0] == KISSProtocol.CMD_RES_OK
def get_frequency(self) -> Optional[int]:
"""Get current frequency."""
if not self._send_command(KISSProtocol.CMD_GET_FREQUENCY):
return None
resp = self._recv_response()
if not resp or resp[0] != KISSProtocol.CMD_GET_FREQUENCY:
return None
if len(resp[1]) >= 4:
return struct.unpack(">I", resp[1][:4])[0]
return None
def set_bandwidth(self, bw_hz: int) -> bool:
"""Set LoRa bandwidth."""
data = struct.pack(">I", bw_hz)
if not self._send_command(KISSProtocol.CMD_SET_BANDWIDTH, data):
return False
resp = self._recv_response()
return resp and resp[0] == KISSProtocol.CMD_RES_OK
def get_bandwidth(self) -> Optional[int]:
"""Get current bandwidth."""
if not self._send_command(KISSProtocol.CMD_GET_BANDWIDTH):
return None
resp = self._recv_response()
if not resp or resp[0] != KISSProtocol.CMD_GET_BANDWIDTH:
return None
if len(resp[1]) >= 4:
return struct.unpack(">I", resp[1][:4])[0]
return None
def set_spreading_factor(self, sf: int) -> bool:
"""Set spreading factor (5-12)."""
data = bytes([sf])
if not self._send_command(KISSProtocol.CMD_SET_SF, data):
return False
resp = self._recv_response()
return resp and resp[0] == KISSProtocol.CMD_RES_OK
def get_spreading_factor(self) -> Optional[int]:
"""Get current spreading factor."""
if not self._send_command(KISSProtocol.CMD_GET_SF):
return None
resp = self._recv_response()
if not resp or resp[0] != KISSProtocol.CMD_GET_SF:
return None
if len(resp[1]) >= 1:
return resp[1][0]
return None
def set_coding_rate(self, cr: int) -> bool:
"""Set coding rate (5-8, denominator of 4/CR)."""
data = bytes([cr])
if not self._send_command(KISSProtocol.CMD_SET_CR, data):
return False
resp = self._recv_response()
return resp and resp[0] == KISSProtocol.CMD_RES_OK
def get_coding_rate(self) -> Optional[int]:
"""Get current coding rate."""
if not self._send_command(KISSProtocol.CMD_GET_CR):
return None
resp = self._recv_response()
if not resp or resp[0] != KISSProtocol.CMD_GET_CR:
return None
if len(resp[1]) >= 1:
return resp[1][0]
return None
def set_power(self, power_dbm: int) -> bool:
"""Set transmit power in dBm."""
data = struct.pack(">b", power_dbm)
if not self._send_command(KISSProtocol.CMD_SET_POWER, data):
return False
resp = self._recv_response()
return resp and resp[0] == KISSProtocol.CMD_RES_OK
def get_power(self) -> Optional[int]:
"""Get current transmit power."""
if not self._send_command(KISSProtocol.CMD_GET_POWER):
return None
resp = self._recv_response()
if not resp or resp[0] != KISSProtocol.CMD_GET_POWER:
return None
if len(resp[1]) >= 1:
return struct.unpack(">b", bytes([resp[1][0]]))[0]
return None
def set_syncword(self, syncword: int) -> bool:
"""Set LoRa syncword."""
data = bytes([syncword])
if not self._send_command(KISSProtocol.CMD_SET_SYNCWORD, data):
return False
resp = self._recv_response()
return resp and resp[0] == KISSProtocol.CMD_RES_OK
def get_syncword(self) -> Optional[int]:
"""Get current syncword."""
if not self._send_command(KISSProtocol.CMD_GET_SYNCWORD):
return None
resp = self._recv_response()
if not resp or resp[0] != KISSProtocol.CMD_GET_SYNCWORD:
return None
if len(resp[1]) >= 1:
return resp[1][0]
return None
if __name__ == "__main__":
# Example usage
if len(sys.argv) > 1:
port = sys.argv[1]
else:
port = "/dev/ttyUSB0"
try:
client = KISSModemClient(port, timeout=1.0)
print(f"Connected to {port}")
# Get current config
config = client.get_radio_config()
if config:
print(f"Current config: {config}")
else:
print("Failed to get radio config")
client.close()
except Exception as e:
print(f"Error: {e}")