#!/usr/bin/env python3 """ Integration tests for LoRa KISS modem. Tests the modem simulator via KISS protocol using the Python client. Run with: python3 test_integration.py """ import subprocess import time import sys import signal import os from pathlib import Path # Add parent directory to path for kiss_client import sys.path.insert(0, str(Path(__file__).parent)) from kiss_client import KISSModemClient, KISSProtocol class PipeInterface: """Wrapper for subprocess stdin/stdout as file-like object.""" def __init__(self, proc): self.proc = proc def write(self, data): self.proc.stdin.write(data) self.proc.stdin.flush() return len(data) def read(self, size=1): return self.proc.stdout.read(size) def close(self): self.proc.terminate() class IntegrationTestSuite: """Integration tests for KISS modem.""" def __init__(self, simulator_path): self.simulator_path = simulator_path self.proc = None self.client = None def start_simulator(self, mode="interactive"): """Start the modem simulator.""" try: self.proc = subprocess.Popen( [self.simulator_path, mode], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) time.sleep(0.5) # Let simulator initialize pipe = PipeInterface(self.proc) self.client = KISSModemClient(pipe, timeout=2.0) return True except Exception as e: print(f"Failed to start simulator: {e}") return False def stop_simulator(self): """Stop the modem simulator.""" if self.client: self.client.close() if self.proc: self.proc.terminate() self.proc.wait(timeout=2) def test_get_radio_config(self): """Test getting radio configuration.""" print("Test: get_radio_config...", end=" ") config = self.client.get_radio_config() if config and config["freq_khz"] == 868000 and config["sf"] == 7: print("PASS") return True else: print(f"FAIL: {config}") return False def test_set_frequency(self): """Test setting frequency.""" print("Test: set_frequency...", end=" ") if self.client.set_frequency(869525): freq = self.client.get_frequency() if freq == 869525: print("PASS") return True print("FAIL") return False def test_set_bandwidth(self): """Test setting bandwidth.""" print("Test: set_bandwidth...", end=" ") if self.client.set_bandwidth(62500): bw = self.client.get_bandwidth() if bw == 62500: print("PASS") return True print("FAIL") return False def test_set_spreading_factor(self): """Test setting spreading factor.""" print("Test: set_spreading_factor...", end=" ") if self.client.set_spreading_factor(10): sf = self.client.get_spreading_factor() if sf == 10: print("PASS") return True print("FAIL") return False def test_set_coding_rate(self): """Test setting coding rate.""" print("Test: set_coding_rate...", end=" ") if self.client.set_coding_rate(7): cr = self.client.get_coding_rate() if cr == 7: print("PASS") return True print("FAIL") return False def test_set_power(self): """Test setting transmit power.""" print("Test: set_power...", end=" ") if self.client.set_power(20): power = self.client.get_power() if power == 20: print("PASS") return True print("FAIL") return False def test_set_syncword(self): """Test setting syncword.""" print("Test: set_syncword...", end=" ") if self.client.set_syncword(0x34): syncword = self.client.get_syncword() if syncword == 0x34: print("PASS") return True print("FAIL") return False def test_send_data(self): """Test sending data.""" print("Test: send_data...", end=" ") data = b"Hello, LoRa!" if self.client.send_data(data): print("PASS") return True print("FAIL") return False def test_kiss_frame_encode(self): """Test KISS frame encoding.""" print("Test: kiss_frame_encode...", end=" ") data = b"test" frame = KISSProtocol.encode(0, data) # Frame should be: FEND type data FEND if (frame[0] == KISSProtocol.FEND and frame[-1] == KISSProtocol.FEND and len(frame) == 7): # FEND + type + 4 data + FEND print("PASS") return True print(f"FAIL: {frame.hex()}") return False def test_kiss_frame_encode_with_escape(self): """Test KISS frame encoding with escape sequences.""" print("Test: kiss_frame_encode_with_escape...", end=" ") data = bytes([KISSProtocol.FEND, KISSProtocol.FESC]) frame = KISSProtocol.encode(0, data) # Each special byte should be escaped if frame.count(KISSProtocol.FESC) >= 2: print("PASS") return True print(f"FAIL: {frame.hex()}") return False def test_kiss_frame_decode(self): """Test KISS frame decoding.""" print("Test: kiss_frame_decode...", end=" ") data = b"test" frame = KISSProtocol.encode(0, data) decoded = KISSProtocol.decode(frame) if decoded and decoded[1] == data: print("PASS") return True print(f"FAIL: {decoded}") return False def run_all_tests(self): """Run all tests.""" tests = [ self.test_kiss_frame_encode, self.test_kiss_frame_encode_with_escape, self.test_kiss_frame_decode, self.test_get_radio_config, self.test_set_frequency, self.test_set_bandwidth, self.test_set_spreading_factor, self.test_set_coding_rate, self.test_set_power, self.test_set_syncword, self.test_send_data, ] print("=" * 60) print("LoRa KISS Modem Integration Tests") print("=" * 60) print() passed = 0 failed = 0 for test in tests: try: if test(): passed += 1 else: failed += 1 except Exception as e: print(f"EXCEPTION: {e}") failed += 1 print() print("=" * 60) print(f"Results: {passed} passed, {failed} failed") print("=" * 60) return failed == 0 def main(): # Find simulator executable pio_build = Path(__file__).parent.parent / ".pio" / "build" / "native" / "program" if not pio_build.exists(): print(f"Error: Simulator not found at {pio_build}") print("Run: pio run -e native") sys.exit(1) suite = IntegrationTestSuite(str(pio_build)) try: if not suite.start_simulator(): sys.exit(1) success = suite.run_all_tests() sys.exit(0 if success else 1) finally: suite.stop_simulator() if __name__ == "__main__": main()