diff --git a/platformio.ini b/platformio.ini index 5ee23ef..69d5eb2 100644 --- a/platformio.ini +++ b/platformio.ini @@ -59,6 +59,10 @@ build_flags = ; KISS serial baud rate -DKISS_BAUD=115200 +build_src_filter = + +<*> + - + ; ------------------------------------------------------------------ ; Native test environment — run unit tests on host ; ------------------------------------------------------------------ @@ -69,3 +73,12 @@ test_build_src = yes build_flags = -std=c99 build_src_filter = +<*> - - + +; ------------------------------------------------------------------ +; Native simulator environment — runs modem on host with stdio I/O +; ------------------------------------------------------------------ +[env:native] +platform = native +build_flags = + -std=c99 +build_src_filter = + diff --git a/test/kiss_client.py b/test/kiss_client.py new file mode 100644 index 0000000..80c8b77 --- /dev/null +++ b/test/kiss_client.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +""" +KISS Modem Python Integration Test Client + +Communicates with LoRa KISS modem via KISS protocol over serial/stdio. +Supports sending/receiving LoRa packets and configuring radio parameters. +""" + +import struct +import time +from typing import Optional, Tuple, List +import sys + + +class KISSProtocol: + """KISS (Keep It Simple, Stupid) protocol implementation.""" + + # Frame delimiters and escapes + FEND = 0xC0 + FESC = 0xDB + TFEND = 0xDC + TFESC = 0xDD + + # Port assignments + PORT_DATA = 0 + PORT_QUALITY = 1 + PORT_CONFIG = 2 + + # Config command opcodes + CMD_RES_OK = 0x01 + CMD_RES_ERROR = 0x02 + CMD_GET_RADIO = 0x10 + CMD_SET_RADIO = 0x11 + CMD_GET_FREQUENCY = 0x12 + CMD_SET_FREQUENCY = 0x13 + CMD_GET_BANDWIDTH = 0x14 + CMD_SET_BANDWIDTH = 0x15 + CMD_GET_SF = 0x16 + CMD_SET_SF = 0x17 + CMD_GET_CR = 0x18 + CMD_SET_CR = 0x19 + CMD_GET_POWER = 0x1A + CMD_SET_POWER = 0x1B + CMD_GET_SYNCWORD = 0x1C + CMD_SET_SYNCWORD = 0x1D + + @staticmethod + def encode(port: int, data: bytes) -> bytes: + """Encode data into a KISS frame.""" + frame = bytearray() + frame.append(KISSProtocol.FEND) + + # Type byte: port in upper nibble, command in lower nibble (0 for data) + type_byte = (port << 4) | 0x00 + if type_byte == KISSProtocol.FEND: + frame.append(KISSProtocol.FESC) + frame.append(KISSProtocol.TFEND) + elif type_byte == KISSProtocol.FESC: + frame.append(KISSProtocol.FESC) + frame.append(KISSProtocol.TFESC) + else: + frame.append(type_byte) + + # Payload with escaping + for byte in data: + if byte == KISSProtocol.FEND: + frame.append(KISSProtocol.FESC) + frame.append(KISSProtocol.TFEND) + elif byte == KISSProtocol.FESC: + frame.append(KISSProtocol.FESC) + frame.append(KISSProtocol.TFESC) + else: + frame.append(byte) + + frame.append(KISSProtocol.FEND) + return bytes(frame) + + @staticmethod + def decode(data: bytes) -> Optional[Tuple[int, bytes]]: + """Decode a KISS frame. Returns (port, payload) or None if incomplete.""" + if len(data) < 2 or data[0] != KISSProtocol.FEND: + return None + + state = "idle" + frame_data = bytearray() + + for i in range(1, len(data)): + byte = data[i] + + if byte == KISSProtocol.FEND: + if len(frame_data) > 0: + port = frame_data[0] >> 4 + payload = bytes(frame_data[1:]) + return (port, payload) + else: + return None + + if state == "idle": + if byte == KISSProtocol.FESC: + state = "escape" + else: + frame_data.append(byte) + elif state == "escape": + if byte == KISSProtocol.TFEND: + frame_data.append(KISSProtocol.FEND) + elif byte == KISSProtocol.TFESC: + frame_data.append(KISSProtocol.FESC) + state = "idle" + + return None + + +class KISSModemClient: + """Client for communicating with LoRa KISS modem.""" + + def __init__(self, port, baudrate=115200, timeout=1.0): + """ + Initialize the KISS modem client. + + Args: + port: Serial port name (e.g. '/dev/ttyUSB0') or file object for stdio + baudrate: Serial baud rate (ignored for file objects) + timeout: Read timeout in seconds + """ + if isinstance(port, str): + import serial + + self.serial = serial.Serial(port, baudrate, timeout=timeout) + else: + self.serial = port + self.timeout = timeout + self._buffer = bytearray() + + def close(self): + """Close the serial connection.""" + if hasattr(self.serial, "close"): + self.serial.close() + + def write(self, data: bytes) -> int: + """Write data to the modem.""" + return self.serial.write(data) + + def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes: + """Read data from the modem with timeout.""" + if timeout is None: + timeout = self.timeout + + start = time.time() + data = bytearray() + + while time.time() - start < timeout: + try: + chunk = self.serial.read(size) + if chunk: + data.extend(chunk) + # Try to decode a complete frame + for i in range(len(data)): + if data[i] == KISSProtocol.FEND: + frame_end = data.find(KISSProtocol.FEND, i + 1) + if frame_end != -1: + return bytes(data[i : frame_end + 1]) + else: + time.sleep(0.01) + except Exception: + time.sleep(0.01) + + return bytes(data) + + def send_data(self, payload: bytes) -> bool: + """Send raw LoRa packet data.""" + frame = KISSProtocol.encode(KISSProtocol.PORT_DATA, payload) + self.write(frame) + return True + + def recv_data(self, timeout: Optional[float] = None) -> Optional[bytes]: + """Receive raw LoRa packet data.""" + while True: + frame = self.read(timeout=timeout or self.timeout) + if not frame: + return None + + result = KISSProtocol.decode(frame) + if result: + port, payload = result + if port == KISSProtocol.PORT_DATA: + return payload + # Skip non-data frames, keep reading + + def recv_quality(self, timeout: Optional[float] = None) -> Optional[Tuple[int, int]]: + """Receive signal quality (SNR, RSSI).""" + while True: + frame = self.read(timeout=timeout or self.timeout) + if not frame: + return None + + result = KISSProtocol.decode(frame) + if result: + port, payload = result + if port == KISSProtocol.PORT_QUALITY and len(payload) >= 3: + snr = struct.unpack(">b", bytes([payload[0]]))[0] + rssi = struct.unpack(">h", payload[1:3])[0] + return (snr, rssi) + # Skip non-quality frames, keep reading + + def _send_command(self, cmd: int, data: bytes = b"") -> bool: + """Send a config command.""" + payload = bytes([cmd]) + data + frame = KISSProtocol.encode(KISSProtocol.PORT_CONFIG, payload) + self.write(frame) + return True + + def _recv_response( + self, timeout: Optional[float] = None + ) -> Optional[Tuple[int, bytes]]: + """Receive a config response.""" + while True: + frame = self.read(timeout=timeout or self.timeout) + if not frame: + return None + + result = KISSProtocol.decode(frame) + if result: + port, payload = result + if port == KISSProtocol.PORT_CONFIG and len(payload) > 0: + return (payload[0], payload[1:]) + + def get_radio_config(self) -> Optional[dict]: + """Get current radio configuration.""" + if not self._send_command(KISSProtocol.CMD_GET_RADIO): + return None + + resp = self._recv_response() + if not resp or resp[0] != KISSProtocol.CMD_GET_RADIO: + return None + + data = resp[1] + if len(data) < 13: + return None + + freq_khz, bw_hz, sf, cr, power_dbm = struct.unpack(">IIBBB", data[:10]) + return { + "freq_khz": freq_khz, + "bw_hz": bw_hz, + "sf": sf, + "cr": cr, + "power_dbm": power_dbm, + } + + def set_frequency(self, freq_khz: int) -> bool: + """Set LoRa frequency.""" + data = struct.pack(">I", freq_khz) + if not self._send_command(KISSProtocol.CMD_SET_FREQUENCY, data): + return False + + resp = self._recv_response() + return resp and resp[0] == KISSProtocol.CMD_RES_OK + + def get_frequency(self) -> Optional[int]: + """Get current frequency.""" + if not self._send_command(KISSProtocol.CMD_GET_FREQUENCY): + return None + + resp = self._recv_response() + if not resp or resp[0] != KISSProtocol.CMD_GET_FREQUENCY: + return None + + if len(resp[1]) >= 4: + return struct.unpack(">I", resp[1][:4])[0] + return None + + def set_bandwidth(self, bw_hz: int) -> bool: + """Set LoRa bandwidth.""" + data = struct.pack(">I", bw_hz) + if not self._send_command(KISSProtocol.CMD_SET_BANDWIDTH, data): + return False + + resp = self._recv_response() + return resp and resp[0] == KISSProtocol.CMD_RES_OK + + def get_bandwidth(self) -> Optional[int]: + """Get current bandwidth.""" + if not self._send_command(KISSProtocol.CMD_GET_BANDWIDTH): + return None + + resp = self._recv_response() + if not resp or resp[0] != KISSProtocol.CMD_GET_BANDWIDTH: + return None + + if len(resp[1]) >= 4: + return struct.unpack(">I", resp[1][:4])[0] + return None + + def set_spreading_factor(self, sf: int) -> bool: + """Set spreading factor (5-12).""" + data = bytes([sf]) + if not self._send_command(KISSProtocol.CMD_SET_SF, data): + return False + + resp = self._recv_response() + return resp and resp[0] == KISSProtocol.CMD_RES_OK + + def get_spreading_factor(self) -> Optional[int]: + """Get current spreading factor.""" + if not self._send_command(KISSProtocol.CMD_GET_SF): + return None + + resp = self._recv_response() + if not resp or resp[0] != KISSProtocol.CMD_GET_SF: + return None + + if len(resp[1]) >= 1: + return resp[1][0] + return None + + def set_coding_rate(self, cr: int) -> bool: + """Set coding rate (5-8, denominator of 4/CR).""" + data = bytes([cr]) + if not self._send_command(KISSProtocol.CMD_SET_CR, data): + return False + + resp = self._recv_response() + return resp and resp[0] == KISSProtocol.CMD_RES_OK + + def get_coding_rate(self) -> Optional[int]: + """Get current coding rate.""" + if not self._send_command(KISSProtocol.CMD_GET_CR): + return None + + resp = self._recv_response() + if not resp or resp[0] != KISSProtocol.CMD_GET_CR: + return None + + if len(resp[1]) >= 1: + return resp[1][0] + return None + + def set_power(self, power_dbm: int) -> bool: + """Set transmit power in dBm.""" + data = struct.pack(">b", power_dbm) + if not self._send_command(KISSProtocol.CMD_SET_POWER, data): + return False + + resp = self._recv_response() + return resp and resp[0] == KISSProtocol.CMD_RES_OK + + def get_power(self) -> Optional[int]: + """Get current transmit power.""" + if not self._send_command(KISSProtocol.CMD_GET_POWER): + return None + + resp = self._recv_response() + if not resp or resp[0] != KISSProtocol.CMD_GET_POWER: + return None + + if len(resp[1]) >= 1: + return struct.unpack(">b", bytes([resp[1][0]]))[0] + return None + + def set_syncword(self, syncword: int) -> bool: + """Set LoRa syncword.""" + data = bytes([syncword]) + if not self._send_command(KISSProtocol.CMD_SET_SYNCWORD, data): + return False + + resp = self._recv_response() + return resp and resp[0] == KISSProtocol.CMD_RES_OK + + def get_syncword(self) -> Optional[int]: + """Get current syncword.""" + if not self._send_command(KISSProtocol.CMD_GET_SYNCWORD): + return None + + resp = self._recv_response() + if not resp or resp[0] != KISSProtocol.CMD_GET_SYNCWORD: + return None + + if len(resp[1]) >= 1: + return resp[1][0] + return None + + +if __name__ == "__main__": + # Example usage + if len(sys.argv) > 1: + port = sys.argv[1] + else: + port = "/dev/ttyUSB0" + + try: + client = KISSModemClient(port, timeout=1.0) + print(f"Connected to {port}") + + # Get current config + config = client.get_radio_config() + if config: + print(f"Current config: {config}") + else: + print("Failed to get radio config") + + client.close() + except Exception as e: + print(f"Error: {e}") diff --git a/test/main_simulator.cpp b/test/main_simulator.cpp new file mode 100644 index 0000000..c162952 --- /dev/null +++ b/test/main_simulator.cpp @@ -0,0 +1,204 @@ +/* + * Dummy LoRa Radio Hardware Simulator + * + * Simulates a LoRa radio on the native (host) platform using stdio for communication. + * Allows testing the KISS protocol and modem logic without hardware. + * + * Compiled as PlatformIO native environment for integration testing. + */ + +#include +#include +#include +#include +#include +#include + +/* Include the KISS implementation directly */ +#include "../src/kiss.cpp" + +/* ─────────────────────────────────────────────────────────────────────────── */ +/* Simulated Radio State */ +/* ─────────────────────────────────────────────────────────────────────────── */ + +struct { + uint32_t freq_khz; + uint32_t bw_hz; + uint8_t sf; + uint8_t cr; + int8_t power_dbm; + uint8_t syncword; + bool rx_active; + uint8_t rx_buf[256]; + int rx_len; + int8_t rx_snr; + int16_t rx_rssi; +} radio_state = { + .freq_khz = 868000, + .bw_hz = 125000, + .sf = 7, + .cr = 5, + .power_dbm = 14, + .syncword = 0x12, + .rx_active = true, + .rx_len = 0, +}; + +/* ─────────────────────────────────────────────────────────────────────────── */ +/* Mock C API (must match radio.h) */ +/* ─────────────────────────────────────────────────────────────────────────── */ + +typedef struct { + uint32_t freq_khz; + uint32_t bw_hz; + uint8_t sf; + uint8_t cr; + int8_t power_dbm; + uint8_t syncword; +} radio_config_t; + +typedef struct { + int8_t snr; + int16_t rssi; +} radio_rx_info_t; + +int radio_init(void) { + fprintf(stderr, "[radio] Initialized: %lu kHz, %lu Hz BW, SF %d, CR %d, %d dBm\n", + radio_state.freq_khz, radio_state.bw_hz, radio_state.sf, radio_state.cr, + radio_state.power_dbm); + return 0; +} + +int radio_tx(const uint8_t *data, size_t len) { + fprintf(stderr, "[radio] TX %zu bytes at %lu kHz\n", len, radio_state.freq_khz); + return 0; +} + +int radio_rx_start(void) { + radio_state.rx_active = true; + fprintf(stderr, "[radio] RX started\n"); + return 0; +} + +bool radio_rx_available(void) { return radio_state.rx_len > 0; } + +int radio_rx_read(uint8_t *buf, size_t buf_cap, radio_rx_info_t *info) { + if (radio_state.rx_len == 0) + return 0; + + size_t n = (radio_state.rx_len < (int)buf_cap) ? radio_state.rx_len : buf_cap; + memcpy(buf, radio_state.rx_buf, n); + + if (info) { + info->snr = radio_state.rx_snr; + info->rssi = radio_state.rx_rssi; + } + + radio_state.rx_len = 0; + radio_rx_start(); + return (int)n; +} + +int radio_set_config(const radio_config_t *cfg) { + radio_state.freq_khz = cfg->freq_khz; + radio_state.bw_hz = cfg->bw_hz; + radio_state.sf = cfg->sf; + radio_state.cr = cfg->cr; + radio_state.power_dbm = cfg->power_dbm; + radio_state.syncword = cfg->syncword; + fprintf(stderr, "[radio] Config updated: %lu kHz, %lu Hz BW, SF %d, CR %d, %d dBm\n", + cfg->freq_khz, cfg->bw_hz, cfg->sf, cfg->cr, cfg->power_dbm); + return 0; +} + +void radio_get_config(radio_config_t *cfg) { + cfg->freq_khz = radio_state.freq_khz; + cfg->bw_hz = radio_state.bw_hz; + cfg->sf = radio_state.sf; + cfg->cr = radio_state.cr; + cfg->power_dbm = radio_state.power_dbm; + cfg->syncword = radio_state.syncword; +} + +/* ─────────────────────────────────────────────────────────────────────────── */ +/* Serial I/O Wrapper (stdio) */ +/* ─────────────────────────────────────────────────────────────────────────── */ + +static void set_stdin_nonblocking(void) { + int flags = fcntl(STDIN_FILENO, F_GETFL, 0); + fcntl(STDIN_FILENO, F_SETFL, flags | O_NONBLOCK); +} + +static int stdin_available(void) { + fd_set readfds; + FD_ZERO(&readfds); + FD_SET(STDIN_FILENO, &readfds); + struct timeval tv = {.tv_sec = 0, .tv_usec = 0}; + return select(STDIN_FILENO + 1, &readfds, NULL, NULL, &tv) > 0; +} + +/* ─────────────────────────────────────────────────────────────────────────── */ +/* Dummy Serial replacement (for main.cpp) */ +/* ─────────────────────────────────────────────────────────────────────────── */ + +namespace DummySerial { +bool initialized = false; +} + +/* Mock Serial object for Arduino compatibility */ +class DummySerialClass { +public: + void begin(unsigned long baud) { + fprintf(stderr, "[serial] begin(%lu)\n", baud); + DummySerial::initialized = true; + set_stdin_nonblocking(); + } + + int available(void) { return stdin_available(); } + + int read(void) { + unsigned char c; + if (::read(STDIN_FILENO, &c, 1) == 1) + return c; + return -1; + } + + size_t write(const uint8_t *buf, size_t size) { + return fwrite(buf, 1, size, stdout); + } + + size_t write(uint8_t c) { return putchar(c) == EOF ? 0 : 1; } + + int peek(void) { return -1; } + + void flush(void) { fflush(stdout); } +}; + +DummySerialClass Serial; + +/* ─────────────────────────────────────────────────────────────────────────── */ +/* Include main modem logic */ +/* ─────────────────────────────────────────────────────────────────────────── */ + +#include "../src/main.cpp" + +/* ─────────────────────────────────────────────────────────────────────────── */ +/* Main Entry Point */ +/* ─────────────────────────────────────────────────────────────────────────── */ + +int main(int argc, char *argv[]) { + fprintf(stderr, "[simulator] LoRa KISS Modem Simulator\n"); + fprintf(stderr, "[simulator] Reading KISS frames from stdin, writing to stdout\n"); + fprintf(stderr, "[simulator] Ctrl+D to exit\n\n"); + + setup(); + + int iterations = 0; + while (true) { + loop(); + iterations++; + usleep(1000); /* 1ms sleep to avoid busy waiting */ + } + + return 0; +} diff --git a/test/test_integration.py b/test/test_integration.py new file mode 100644 index 0000000..19c1fa3 --- /dev/null +++ b/test/test_integration.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 +""" +Integration tests for LoRa KISS modem. + +Tests the modem simulator via KISS protocol using the Python client. +Run with: python3 test_integration.py +""" + +import subprocess +import time +import sys +import signal +import os +from pathlib import Path + +# Add parent directory to path for kiss_client import +sys.path.insert(0, str(Path(__file__).parent)) + +from kiss_client import KISSModemClient, KISSProtocol + + +class PipeInterface: + """Wrapper for subprocess stdin/stdout as file-like object.""" + + def __init__(self, proc): + self.proc = proc + + def write(self, data): + self.proc.stdin.write(data) + self.proc.stdin.flush() + return len(data) + + def read(self, size=1): + return self.proc.stdout.read(size) + + def close(self): + self.proc.terminate() + + +class IntegrationTestSuite: + """Integration tests for KISS modem.""" + + def __init__(self, simulator_path): + self.simulator_path = simulator_path + self.proc = None + self.client = None + + def start_simulator(self, mode="interactive"): + """Start the modem simulator.""" + try: + self.proc = subprocess.Popen( + [self.simulator_path, mode], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + time.sleep(0.5) # Let simulator initialize + + pipe = PipeInterface(self.proc) + self.client = KISSModemClient(pipe, timeout=2.0) + return True + except Exception as e: + print(f"Failed to start simulator: {e}") + return False + + def stop_simulator(self): + """Stop the modem simulator.""" + if self.client: + self.client.close() + if self.proc: + self.proc.terminate() + self.proc.wait(timeout=2) + + def test_get_radio_config(self): + """Test getting radio configuration.""" + print("Test: get_radio_config...", end=" ") + config = self.client.get_radio_config() + if config and config["freq_khz"] == 868000 and config["sf"] == 7: + print("PASS") + return True + else: + print(f"FAIL: {config}") + return False + + def test_set_frequency(self): + """Test setting frequency.""" + print("Test: set_frequency...", end=" ") + if self.client.set_frequency(869525): + freq = self.client.get_frequency() + if freq == 869525: + print("PASS") + return True + print("FAIL") + return False + + def test_set_bandwidth(self): + """Test setting bandwidth.""" + print("Test: set_bandwidth...", end=" ") + if self.client.set_bandwidth(62500): + bw = self.client.get_bandwidth() + if bw == 62500: + print("PASS") + return True + print("FAIL") + return False + + def test_set_spreading_factor(self): + """Test setting spreading factor.""" + print("Test: set_spreading_factor...", end=" ") + if self.client.set_spreading_factor(10): + sf = self.client.get_spreading_factor() + if sf == 10: + print("PASS") + return True + print("FAIL") + return False + + def test_set_coding_rate(self): + """Test setting coding rate.""" + print("Test: set_coding_rate...", end=" ") + if self.client.set_coding_rate(7): + cr = self.client.get_coding_rate() + if cr == 7: + print("PASS") + return True + print("FAIL") + return False + + def test_set_power(self): + """Test setting transmit power.""" + print("Test: set_power...", end=" ") + if self.client.set_power(20): + power = self.client.get_power() + if power == 20: + print("PASS") + return True + print("FAIL") + return False + + def test_set_syncword(self): + """Test setting syncword.""" + print("Test: set_syncword...", end=" ") + if self.client.set_syncword(0x34): + syncword = self.client.get_syncword() + if syncword == 0x34: + print("PASS") + return True + print("FAIL") + return False + + def test_send_data(self): + """Test sending data.""" + print("Test: send_data...", end=" ") + data = b"Hello, LoRa!" + if self.client.send_data(data): + print("PASS") + return True + print("FAIL") + return False + + def test_kiss_frame_encode(self): + """Test KISS frame encoding.""" + print("Test: kiss_frame_encode...", end=" ") + data = b"test" + frame = KISSProtocol.encode(0, data) + + # Frame should be: FEND type data FEND + if (frame[0] == KISSProtocol.FEND and + frame[-1] == KISSProtocol.FEND and + len(frame) == 7): # FEND + type + 4 data + FEND + print("PASS") + return True + print(f"FAIL: {frame.hex()}") + return False + + def test_kiss_frame_encode_with_escape(self): + """Test KISS frame encoding with escape sequences.""" + print("Test: kiss_frame_encode_with_escape...", end=" ") + data = bytes([KISSProtocol.FEND, KISSProtocol.FESC]) + frame = KISSProtocol.encode(0, data) + + # Each special byte should be escaped + if frame.count(KISSProtocol.FESC) >= 2: + print("PASS") + return True + print(f"FAIL: {frame.hex()}") + return False + + def test_kiss_frame_decode(self): + """Test KISS frame decoding.""" + print("Test: kiss_frame_decode...", end=" ") + data = b"test" + frame = KISSProtocol.encode(0, data) + decoded = KISSProtocol.decode(frame) + + if decoded and decoded[1] == data: + print("PASS") + return True + print(f"FAIL: {decoded}") + return False + + def run_all_tests(self): + """Run all tests.""" + tests = [ + self.test_kiss_frame_encode, + self.test_kiss_frame_encode_with_escape, + self.test_kiss_frame_decode, + self.test_get_radio_config, + self.test_set_frequency, + self.test_set_bandwidth, + self.test_set_spreading_factor, + self.test_set_coding_rate, + self.test_set_power, + self.test_set_syncword, + self.test_send_data, + ] + + print("=" * 60) + print("LoRa KISS Modem Integration Tests") + print("=" * 60) + print() + + passed = 0 + failed = 0 + + for test in tests: + try: + if test(): + passed += 1 + else: + failed += 1 + except Exception as e: + print(f"EXCEPTION: {e}") + failed += 1 + + print() + print("=" * 60) + print(f"Results: {passed} passed, {failed} failed") + print("=" * 60) + + return failed == 0 + + +def main(): + # Find simulator executable + pio_build = Path(__file__).parent.parent / ".pio" / "build" / "native" / "program" + if not pio_build.exists(): + print(f"Error: Simulator not found at {pio_build}") + print("Run: pio run -e native") + sys.exit(1) + + suite = IntegrationTestSuite(str(pio_build)) + + try: + if not suite.start_simulator(): + sys.exit(1) + + success = suite.run_all_tests() + sys.exit(0 if success else 1) + finally: + suite.stop_simulator() + + +if __name__ == "__main__": + main()