sl-jetson 9c2f830c8e feat: Add FC↔Orin UART verification (Issue #362)
Implements UART bridge verification between Flight Controller (STM32F722)
and Jetson Orin.

Changes:
1. jetson/scripts/uart_test.py (12.7 KB)
   - Opens /dev/ttyTHS1 at 921600 baud
   - Sends jlink binary test frames (PING, VERSION, ECHO)
   - Verifies CRC16-CCITT frame integrity
   - Logs transactions with timestamps
   - JSON result export and optional MQTT publishing

2. jetson/ros2_ws/src/saltybot_bridge/launch/uart_bridge.launch.py
   - ROS2 launch file for serial_bridge_node on UART port
   - Configurable port (default /dev/ttyTHS1), baud rate (921600)
   - Bridges FC telemetry to /saltybot/imu, /saltybot/balance_state
   - Publishes diagnostics to /diagnostics

Usage:
  Test: sudo python3 jetson/scripts/uart_test.py
  Launch: ros2 launch saltybot_bridge uart_bridge.launch.py

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-03-03 17:30:20 -05:00

418 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
uart_test.py — FC↔Orin UART verification (Issue #362)
Opens /dev/ttyTHS1 at 921600 baud, sends jlink test frames, logs responses.
Jlink frame format (binary):
[0xAA] [0x55] [len] [cmd] [payload...] [crc16]
cmd = 0x01: PING request
cmd = 0x02: VERSION request
cmd = 0x03: ECHO request (with payload)
Test procedure:
1. Open serial port with 921600 baud
2. Send PING frame → expect PONG response
3. Send VERSION frame → expect version string
4. Send ECHO frame with test data → expect echo back
5. Log all transactions with timestamps
6. Report results to MQTT (max topic)
Usage:
sudo python3 uart_test.py
# Override port/baud:
python3 uart_test.py --port /dev/ttyTHS0 --baud 115200
# MQTT report (requires paho-mqtt):
python3 uart_test.py --mqtt-host localhost --mqtt-topic saltybot/uart_test
Prerequisites:
- pyserial: pip install pyserial
- paho-mqtt (optional): pip install paho-mqtt
- Orin UART device configured and Flight Controller connected to /dev/ttyTHS1
"""
import argparse
import json
import logging
import serial
import sys
import time
from datetime import datetime
from typing import Optional, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class JlinkFrame:
"""Jlink binary frame codec."""
HEADER = (0xAA, 0x55)
CMD_PING = 0x01
CMD_VERSION = 0x02
CMD_ECHO = 0x03
@staticmethod
def crc16(data: bytes) -> int:
"""Compute CRC16-CCITT for frame payload (excluding header/len/crc).
CRC16-CCITT: poly=0x1021, init=0xFFFF, xor_out=0xFFFF
"""
crc = 0xFFFF
for byte in data:
crc ^= byte << 8
for _ in range(8):
crc <<= 1
if crc & 0x10000:
crc ^= 0x1021
crc &= 0xFFFF
return crc ^ 0xFFFF
@staticmethod
def build(cmd: int, payload: bytes = b"") -> bytes:
"""Build a jlink frame.
Returns:
bytes: [0xAA][0x55][len][cmd][payload...][crc16_hi][crc16_lo]
"""
frame_data = bytes([cmd]) + payload
frame_len = len(frame_data)
crc16_val = JlinkFrame.crc16(frame_data)
frame = bytearray([JlinkFrame.HEADER[0], JlinkFrame.HEADER[1]])
frame.append(frame_len)
frame.extend(frame_data)
frame.append((crc16_val >> 8) & 0xFF)
frame.append(crc16_val & 0xFF)
return bytes(frame)
@staticmethod
def parse(data: bytes) -> Optional[Tuple[int, bytes]]:
"""Parse a received frame.
Returns:
(cmd, payload) or None if invalid
"""
if len(data) < 6:
return None
if data[0] != JlinkFrame.HEADER[0] or data[1] != JlinkFrame.HEADER[1]:
return None
frame_len = data[2]
if len(data) < 3 + frame_len + 2:
return None
cmd = data[3]
payload = data[4:3+frame_len]
crc16_received = (data[3+frame_len] << 8) | data[3+frame_len+1]
frame_data = bytes([cmd]) + payload
crc16_computed = JlinkFrame.crc16(frame_data)
if crc16_received != crc16_computed:
logger.warning(f"CRC mismatch: got {crc16_received:04x}, expected {crc16_computed:04x}")
return None
return (cmd, payload)
class UartTest:
"""UART test harness for FC↔Orin verification."""
def __init__(self, port: str, baud: int, timeout: float = 0.5):
self.port_name = port
self.baud_rate = baud
self.timeout = timeout
self.ser: Optional[serial.Serial] = None
self.results = {
"timestamp": datetime.now().isoformat(),
"port": port,
"baud": baud,
"tests": {},
"summary": {
"passed": 0,
"failed": 0,
"errors": [],
},
}
def open(self) -> bool:
"""Open serial port."""
try:
self.ser = serial.Serial(
port=self.port_name,
baudrate=self.baud_rate,
timeout=self.timeout,
)
self.ser.reset_input_buffer()
logger.info(f"Opened {self.port_name} @ {self.baud_rate} baud")
return True
except serial.SerialException as e:
logger.error(f"Failed to open {self.port_name}: {e}")
self.results["summary"]["errors"].append(str(e))
return False
def close(self):
"""Close serial port."""
if self.ser and self.ser.is_open:
self.ser.close()
logger.info(f"Closed {self.port_name}")
def send_and_wait(self, cmd: int, payload: bytes = b"", max_attempts: int = 3) -> Optional[Tuple[int, bytes]]:
"""Send frame and wait for response.
Returns:
(response_cmd, response_payload) or None on timeout
"""
if not self.ser or not self.ser.is_open:
return None
frame = JlinkFrame.build(cmd, payload)
for attempt in range(max_attempts):
try:
self.ser.write(frame)
logger.debug(f"Sent frame: {frame.hex()}")
# Wait for response with timeout
start_time = time.time()
response = bytearray()
while time.time() - start_time < self.timeout:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
response.extend(data)
logger.debug(f"Received {len(data)} bytes: {data.hex()}")
# Try to parse complete frame
result = JlinkFrame.parse(bytes(response))
if result:
return result
time.sleep(0.01)
logger.warning(f"No response on attempt {attempt+1}/{max_attempts}")
except serial.SerialException as e:
logger.error(f"Serial error: {e}")
self.results["summary"]["errors"].append(str(e))
return None
return None
def test_ping(self) -> bool:
"""Test PING command."""
logger.info("Testing PING...")
test_name = "ping"
response = self.send_and_wait(JlinkFrame.CMD_PING)
if response:
resp_cmd, resp_payload = response
self.results["tests"][test_name] = {
"status": "PASS",
"response_cmd": resp_cmd,
"response_len": len(resp_payload),
}
logger.info(f" ✓ PING response received (cmd={resp_cmd})")
self.results["summary"]["passed"] += 1
return True
else:
self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"}
logger.error(" ✗ PING timeout")
self.results["summary"]["failed"] += 1
return False
def test_version(self) -> bool:
"""Test VERSION command."""
logger.info("Testing VERSION...")
test_name = "version"
response = self.send_and_wait(JlinkFrame.CMD_VERSION)
if response:
resp_cmd, resp_payload = response
version_str = resp_payload.decode("utf-8", errors="replace").strip()
self.results["tests"][test_name] = {
"status": "PASS",
"version": version_str,
}
logger.info(f" ✓ VERSION response: {version_str}")
self.results["summary"]["passed"] += 1
return True
else:
self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"}
logger.error(" ✗ VERSION timeout")
self.results["summary"]["failed"] += 1
return False
def test_echo(self, test_data: bytes = b"UART_TEST_FRAME") -> bool:
"""Test ECHO command."""
logger.info(f"Testing ECHO with payload: {test_data}")
test_name = "echo"
response = self.send_and_wait(JlinkFrame.CMD_ECHO, test_data)
if response:
resp_cmd, resp_payload = response
if resp_payload == test_data:
self.results["tests"][test_name] = {
"status": "PASS",
"payload_sent": test_data.hex(),
"payload_received": resp_payload.hex(),
}
logger.info(f" ✓ ECHO verified (round-trip OK)")
self.results["summary"]["passed"] += 1
return True
else:
self.results["tests"][test_name] = {
"status": "FAIL",
"reason": "Payload mismatch",
"sent": test_data.hex(),
"received": resp_payload.hex(),
}
logger.error(f" ✗ ECHO mismatch: sent {test_data.hex()}, got {resp_payload.hex()}")
self.results["summary"]["failed"] += 1
return False
else:
self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"}
logger.error(" ✗ ECHO timeout")
self.results["summary"]["failed"] += 1
return False
def run_all_tests(self) -> bool:
"""Run all verification tests."""
logger.info("=" * 60)
logger.info("FC↔Orin UART Verification Test Suite (Issue #362)")
logger.info("=" * 60)
if not self.open():
return False
try:
time.sleep(0.5) # Wait for device to be ready
self.test_ping()
time.sleep(0.1)
self.test_version()
time.sleep(0.1)
self.test_echo(b"UART_TEST")
# Summary
logger.info("=" * 60)
passed = self.results["summary"]["passed"]
failed = self.results["summary"]["failed"]
total = passed + failed
logger.info(f"Results: {passed}/{total} passed")
if failed == 0:
logger.info("✓ All tests PASSED")
return True
else:
logger.warning(f"{failed} test(s) FAILED")
return False
finally:
self.close()
def to_json(self) -> str:
"""Export results as JSON."""
return json.dumps(self.results, indent=2)
def publish_mqtt(self, host: str, topic: str = "saltybot/uart_test") -> bool:
"""Publish results to MQTT broker.
Requires: pip install paho-mqtt
"""
try:
import paho.mqtt.client as mqtt
except ImportError:
logger.error("paho-mqtt not installed. Install with: pip install paho-mqtt")
return False
try:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
client.connect(host, 1883, keepalive=10)
payload = self.to_json()
client.publish(topic, payload, qos=1)
client.disconnect()
logger.info(f"Published results to {host}:{topic}")
return True
except Exception as e:
logger.error(f"MQTT publish failed: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description="FC↔Orin UART verification test (Issue #362)"
)
parser.add_argument(
"--port",
default="/dev/ttyTHS1",
help="Serial port device (default: /dev/ttyTHS1)",
)
parser.add_argument(
"--baud",
type=int,
default=921600,
help="Baud rate (default: 921600)",
)
parser.add_argument(
"--timeout",
type=float,
default=0.5,
help="Response timeout in seconds (default: 0.5)",
)
parser.add_argument(
"--mqtt-host",
help="MQTT broker host (optional, requires paho-mqtt)",
)
parser.add_argument(
"--mqtt-topic",
default="saltybot/uart_test",
help="MQTT topic for results (default: saltybot/uart_test)",
)
parser.add_argument(
"--json-output",
help="Save results to JSON file",
)
args = parser.parse_args()
test = UartTest(args.port, args.baud, args.timeout)
success = test.run_all_tests()
# Save JSON results
if args.json_output:
try:
with open(args.json_output, "w") as f:
f.write(test.to_json())
logger.info(f"Results saved to {args.json_output}")
except Exception as e:
logger.error(f"Failed to save JSON: {e}")
# Publish to MQTT
if args.mqtt_host:
test.publish_mqtt(args.mqtt_host, args.mqtt_topic)
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())