#!/usr/bin/env python3 """ uart_test.py — FC↔Orin UART verification (Issue #362) Opens /dev/ttyTHS1 at 921600 baud, sends jlink test frames, logs responses. Jlink frame format (binary): [0xAA] [0x55] [len] [cmd] [payload...] [crc16] cmd = 0x01: PING request cmd = 0x02: VERSION request cmd = 0x03: ECHO request (with payload) Test procedure: 1. Open serial port with 921600 baud 2. Send PING frame → expect PONG response 3. Send VERSION frame → expect version string 4. Send ECHO frame with test data → expect echo back 5. Log all transactions with timestamps 6. Report results to MQTT (max topic) Usage: sudo python3 uart_test.py # Override port/baud: python3 uart_test.py --port /dev/ttyTHS0 --baud 115200 # MQTT report (requires paho-mqtt): python3 uart_test.py --mqtt-host localhost --mqtt-topic saltybot/uart_test Prerequisites: - pyserial: pip install pyserial - paho-mqtt (optional): pip install paho-mqtt - Orin UART device configured and Flight Controller connected to /dev/ttyTHS1 """ import argparse import json import logging import serial import sys import time from datetime import datetime from typing import Optional, Tuple # Configure logging logging.basicConfig( level=logging.INFO, format="[%(asctime)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) class JlinkFrame: """Jlink binary frame codec.""" HEADER = (0xAA, 0x55) CMD_PING = 0x01 CMD_VERSION = 0x02 CMD_ECHO = 0x03 @staticmethod def crc16(data: bytes) -> int: """Compute CRC16-CCITT for frame payload (excluding header/len/crc). CRC16-CCITT: poly=0x1021, init=0xFFFF, xor_out=0xFFFF """ crc = 0xFFFF for byte in data: crc ^= byte << 8 for _ in range(8): crc <<= 1 if crc & 0x10000: crc ^= 0x1021 crc &= 0xFFFF return crc ^ 0xFFFF @staticmethod def build(cmd: int, payload: bytes = b"") -> bytes: """Build a jlink frame. Returns: bytes: [0xAA][0x55][len][cmd][payload...][crc16_hi][crc16_lo] """ frame_data = bytes([cmd]) + payload frame_len = len(frame_data) crc16_val = JlinkFrame.crc16(frame_data) frame = bytearray([JlinkFrame.HEADER[0], JlinkFrame.HEADER[1]]) frame.append(frame_len) frame.extend(frame_data) frame.append((crc16_val >> 8) & 0xFF) frame.append(crc16_val & 0xFF) return bytes(frame) @staticmethod def parse(data: bytes) -> Optional[Tuple[int, bytes]]: """Parse a received frame. Returns: (cmd, payload) or None if invalid """ if len(data) < 6: return None if data[0] != JlinkFrame.HEADER[0] or data[1] != JlinkFrame.HEADER[1]: return None frame_len = data[2] if len(data) < 3 + frame_len + 2: return None cmd = data[3] payload = data[4:3+frame_len] crc16_received = (data[3+frame_len] << 8) | data[3+frame_len+1] frame_data = bytes([cmd]) + payload crc16_computed = JlinkFrame.crc16(frame_data) if crc16_received != crc16_computed: logger.warning(f"CRC mismatch: got {crc16_received:04x}, expected {crc16_computed:04x}") return None return (cmd, payload) class UartTest: """UART test harness for FC↔Orin verification.""" def __init__(self, port: str, baud: int, timeout: float = 0.5): self.port_name = port self.baud_rate = baud self.timeout = timeout self.ser: Optional[serial.Serial] = None self.results = { "timestamp": datetime.now().isoformat(), "port": port, "baud": baud, "tests": {}, "summary": { "passed": 0, "failed": 0, "errors": [], }, } def open(self) -> bool: """Open serial port.""" try: self.ser = serial.Serial( port=self.port_name, baudrate=self.baud_rate, timeout=self.timeout, ) self.ser.reset_input_buffer() logger.info(f"Opened {self.port_name} @ {self.baud_rate} baud") return True except serial.SerialException as e: logger.error(f"Failed to open {self.port_name}: {e}") self.results["summary"]["errors"].append(str(e)) return False def close(self): """Close serial port.""" if self.ser and self.ser.is_open: self.ser.close() logger.info(f"Closed {self.port_name}") def send_and_wait(self, cmd: int, payload: bytes = b"", max_attempts: int = 3) -> Optional[Tuple[int, bytes]]: """Send frame and wait for response. Returns: (response_cmd, response_payload) or None on timeout """ if not self.ser or not self.ser.is_open: return None frame = JlinkFrame.build(cmd, payload) for attempt in range(max_attempts): try: self.ser.write(frame) logger.debug(f"Sent frame: {frame.hex()}") # Wait for response with timeout start_time = time.time() response = bytearray() while time.time() - start_time < self.timeout: if self.ser.in_waiting: data = self.ser.read(self.ser.in_waiting) response.extend(data) logger.debug(f"Received {len(data)} bytes: {data.hex()}") # Try to parse complete frame result = JlinkFrame.parse(bytes(response)) if result: return result time.sleep(0.01) logger.warning(f"No response on attempt {attempt+1}/{max_attempts}") except serial.SerialException as e: logger.error(f"Serial error: {e}") self.results["summary"]["errors"].append(str(e)) return None return None def test_ping(self) -> bool: """Test PING command.""" logger.info("Testing PING...") test_name = "ping" response = self.send_and_wait(JlinkFrame.CMD_PING) if response: resp_cmd, resp_payload = response self.results["tests"][test_name] = { "status": "PASS", "response_cmd": resp_cmd, "response_len": len(resp_payload), } logger.info(f" ✓ PING response received (cmd={resp_cmd})") self.results["summary"]["passed"] += 1 return True else: self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"} logger.error(" ✗ PING timeout") self.results["summary"]["failed"] += 1 return False def test_version(self) -> bool: """Test VERSION command.""" logger.info("Testing VERSION...") test_name = "version" response = self.send_and_wait(JlinkFrame.CMD_VERSION) if response: resp_cmd, resp_payload = response version_str = resp_payload.decode("utf-8", errors="replace").strip() self.results["tests"][test_name] = { "status": "PASS", "version": version_str, } logger.info(f" ✓ VERSION response: {version_str}") self.results["summary"]["passed"] += 1 return True else: self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"} logger.error(" ✗ VERSION timeout") self.results["summary"]["failed"] += 1 return False def test_echo(self, test_data: bytes = b"UART_TEST_FRAME") -> bool: """Test ECHO command.""" logger.info(f"Testing ECHO with payload: {test_data}") test_name = "echo" response = self.send_and_wait(JlinkFrame.CMD_ECHO, test_data) if response: resp_cmd, resp_payload = response if resp_payload == test_data: self.results["tests"][test_name] = { "status": "PASS", "payload_sent": test_data.hex(), "payload_received": resp_payload.hex(), } logger.info(f" ✓ ECHO verified (round-trip OK)") self.results["summary"]["passed"] += 1 return True else: self.results["tests"][test_name] = { "status": "FAIL", "reason": "Payload mismatch", "sent": test_data.hex(), "received": resp_payload.hex(), } logger.error(f" ✗ ECHO mismatch: sent {test_data.hex()}, got {resp_payload.hex()}") self.results["summary"]["failed"] += 1 return False else: self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"} logger.error(" ✗ ECHO timeout") self.results["summary"]["failed"] += 1 return False def run_all_tests(self) -> bool: """Run all verification tests.""" logger.info("=" * 60) logger.info("FC↔Orin UART Verification Test Suite (Issue #362)") logger.info("=" * 60) if not self.open(): return False try: time.sleep(0.5) # Wait for device to be ready self.test_ping() time.sleep(0.1) self.test_version() time.sleep(0.1) self.test_echo(b"UART_TEST") # Summary logger.info("=" * 60) passed = self.results["summary"]["passed"] failed = self.results["summary"]["failed"] total = passed + failed logger.info(f"Results: {passed}/{total} passed") if failed == 0: logger.info("✓ All tests PASSED") return True else: logger.warning(f"✗ {failed} test(s) FAILED") return False finally: self.close() def to_json(self) -> str: """Export results as JSON.""" return json.dumps(self.results, indent=2) def publish_mqtt(self, host: str, topic: str = "saltybot/uart_test") -> bool: """Publish results to MQTT broker. Requires: pip install paho-mqtt """ try: import paho.mqtt.client as mqtt except ImportError: logger.error("paho-mqtt not installed. Install with: pip install paho-mqtt") return False try: client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) client.connect(host, 1883, keepalive=10) payload = self.to_json() client.publish(topic, payload, qos=1) client.disconnect() logger.info(f"Published results to {host}:{topic}") return True except Exception as e: logger.error(f"MQTT publish failed: {e}") return False def main(): parser = argparse.ArgumentParser( description="FC↔Orin UART verification test (Issue #362)" ) parser.add_argument( "--port", default="/dev/ttyTHS1", help="Serial port device (default: /dev/ttyTHS1)", ) parser.add_argument( "--baud", type=int, default=921600, help="Baud rate (default: 921600)", ) parser.add_argument( "--timeout", type=float, default=0.5, help="Response timeout in seconds (default: 0.5)", ) parser.add_argument( "--mqtt-host", help="MQTT broker host (optional, requires paho-mqtt)", ) parser.add_argument( "--mqtt-topic", default="saltybot/uart_test", help="MQTT topic for results (default: saltybot/uart_test)", ) parser.add_argument( "--json-output", help="Save results to JSON file", ) args = parser.parse_args() test = UartTest(args.port, args.baud, args.timeout) success = test.run_all_tests() # Save JSON results if args.json_output: try: with open(args.json_output, "w") as f: f.write(test.to_json()) logger.info(f"Results saved to {args.json_output}") except Exception as e: logger.error(f"Failed to save JSON: {e}") # Publish to MQTT if args.mqtt_host: test.publish_mqtt(args.mqtt_host, args.mqtt_topic) return 0 if success else 1 if __name__ == "__main__": sys.exit(main())