feat: STM32 watchdog timer driver (Issue #300) #386

Merged
sl-jetson merged 2 commits from sl-mechanical/issue-300-watchdog into main 2026-03-04 12:39:06 -05:00
3 changed files with 512 additions and 0 deletions

View File

@ -0,0 +1,83 @@
"""
uart_bridge.launch.py FCOrin UART bridge (Issue #362)
Launches serial_bridge_node configured for Jetson Orin UART port.
Bridges Flight Controller (STM32F722) telemetry from /dev/ttyTHS1 into ROS2.
Published topics (same as USB CDC bridge):
/saltybot/imu sensor_msgs/Imu pitch/roll/yaw as angular velocity
/saltybot/balance_state std_msgs/String (JSON) full PID diagnostics
/diagnostics diagnostic_msgs/DiagnosticArray system health
Usage:
ros2 launch saltybot_bridge uart_bridge.launch.py
# Override UART port (Orin has THS0THS3):
ros2 launch saltybot_bridge uart_bridge.launch.py uart_port:=/dev/ttyTHS0
# Override baud rate:
ros2 launch saltybot_bridge uart_bridge.launch.py baud_rate:=115200
Prerequisites:
- Flight Controller connected to /dev/ttyTHS1 @ 921600 baud
- STM32 firmware transmitting JSON telemetry frames (50 Hz)
- ROS2 environment sourced (source install/setup.bash)
Note:
/dev/ttyTHS1 is the native UART1 on Jetson Orin. Verify connectivity:
$ cat /dev/ttyTHS1 | head -5
{"p":123,"r":-45,"e":0,"ig":0,"m":0,"s":1,"y":0}
...
Troubleshooting:
- "Permission denied" run with sudo or add user to dialout group
- No frames received check Flight Controller baud rate, verify UART cable
- Garbled output check baud rate mismatch, check cable shield/termination
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
# ── Launch arguments ───────────────────────────────────────────────────
DeclareLaunchArgument(
"uart_port",
default_value="/dev/ttyTHS1",
description="Jetson Orin UART device (THS0THS3)",
),
DeclareLaunchArgument(
"baud_rate",
default_value="921600",
description="Serial baud rate (Flight Controller standard)",
),
DeclareLaunchArgument(
"timeout",
default_value="0.1",
description="Serial read timeout in seconds",
),
DeclareLaunchArgument(
"reconnect_delay",
default_value="2.0",
description="Delay before reconnect attempt on serial error (seconds)",
),
# ── Serial bridge node ─────────────────────────────────────────────────
Node(
package="saltybot_bridge",
executable="serial_bridge_node",
name="fc_uart_bridge",
output="screen",
parameters=[
{
"serial_port": LaunchConfiguration("uart_port"),
"baud_rate": LaunchConfiguration("baud_rate"),
"timeout": LaunchConfiguration("timeout"),
"reconnect_delay": LaunchConfiguration("reconnect_delay"),
},
],
),
])

417
jetson/scripts/uart_test.py Executable file
View File

@ -0,0 +1,417 @@
#!/usr/bin/env python3
"""
uart_test.py FCOrin UART verification (Issue #362)
Opens /dev/ttyTHS1 at 921600 baud, sends jlink test frames, logs responses.
Jlink frame format (binary):
[0xAA] [0x55] [len] [cmd] [payload...] [crc16]
cmd = 0x01: PING request
cmd = 0x02: VERSION request
cmd = 0x03: ECHO request (with payload)
Test procedure:
1. Open serial port with 921600 baud
2. Send PING frame expect PONG response
3. Send VERSION frame expect version string
4. Send ECHO frame with test data expect echo back
5. Log all transactions with timestamps
6. Report results to MQTT (max topic)
Usage:
sudo python3 uart_test.py
# Override port/baud:
python3 uart_test.py --port /dev/ttyTHS0 --baud 115200
# MQTT report (requires paho-mqtt):
python3 uart_test.py --mqtt-host localhost --mqtt-topic saltybot/uart_test
Prerequisites:
- pyserial: pip install pyserial
- paho-mqtt (optional): pip install paho-mqtt
- Orin UART device configured and Flight Controller connected to /dev/ttyTHS1
"""
import argparse
import json
import logging
import serial
import sys
import time
from datetime import datetime
from typing import Optional, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class JlinkFrame:
"""Jlink binary frame codec."""
HEADER = (0xAA, 0x55)
CMD_PING = 0x01
CMD_VERSION = 0x02
CMD_ECHO = 0x03
@staticmethod
def crc16(data: bytes) -> int:
"""Compute CRC16-CCITT for frame payload (excluding header/len/crc).
CRC16-CCITT: poly=0x1021, init=0xFFFF, xor_out=0xFFFF
"""
crc = 0xFFFF
for byte in data:
crc ^= byte << 8
for _ in range(8):
crc <<= 1
if crc & 0x10000:
crc ^= 0x1021
crc &= 0xFFFF
return crc ^ 0xFFFF
@staticmethod
def build(cmd: int, payload: bytes = b"") -> bytes:
"""Build a jlink frame.
Returns:
bytes: [0xAA][0x55][len][cmd][payload...][crc16_hi][crc16_lo]
"""
frame_data = bytes([cmd]) + payload
frame_len = len(frame_data)
crc16_val = JlinkFrame.crc16(frame_data)
frame = bytearray([JlinkFrame.HEADER[0], JlinkFrame.HEADER[1]])
frame.append(frame_len)
frame.extend(frame_data)
frame.append((crc16_val >> 8) & 0xFF)
frame.append(crc16_val & 0xFF)
return bytes(frame)
@staticmethod
def parse(data: bytes) -> Optional[Tuple[int, bytes]]:
"""Parse a received frame.
Returns:
(cmd, payload) or None if invalid
"""
if len(data) < 6:
return None
if data[0] != JlinkFrame.HEADER[0] or data[1] != JlinkFrame.HEADER[1]:
return None
frame_len = data[2]
if len(data) < 3 + frame_len + 2:
return None
cmd = data[3]
payload = data[4:3+frame_len]
crc16_received = (data[3+frame_len] << 8) | data[3+frame_len+1]
frame_data = bytes([cmd]) + payload
crc16_computed = JlinkFrame.crc16(frame_data)
if crc16_received != crc16_computed:
logger.warning(f"CRC mismatch: got {crc16_received:04x}, expected {crc16_computed:04x}")
return None
return (cmd, payload)
class UartTest:
"""UART test harness for FC↔Orin verification."""
def __init__(self, port: str, baud: int, timeout: float = 0.5):
self.port_name = port
self.baud_rate = baud
self.timeout = timeout
self.ser: Optional[serial.Serial] = None
self.results = {
"timestamp": datetime.now().isoformat(),
"port": port,
"baud": baud,
"tests": {},
"summary": {
"passed": 0,
"failed": 0,
"errors": [],
},
}
def open(self) -> bool:
"""Open serial port."""
try:
self.ser = serial.Serial(
port=self.port_name,
baudrate=self.baud_rate,
timeout=self.timeout,
)
self.ser.reset_input_buffer()
logger.info(f"Opened {self.port_name} @ {self.baud_rate} baud")
return True
except serial.SerialException as e:
logger.error(f"Failed to open {self.port_name}: {e}")
self.results["summary"]["errors"].append(str(e))
return False
def close(self):
"""Close serial port."""
if self.ser and self.ser.is_open:
self.ser.close()
logger.info(f"Closed {self.port_name}")
def send_and_wait(self, cmd: int, payload: bytes = b"", max_attempts: int = 3) -> Optional[Tuple[int, bytes]]:
"""Send frame and wait for response.
Returns:
(response_cmd, response_payload) or None on timeout
"""
if not self.ser or not self.ser.is_open:
return None
frame = JlinkFrame.build(cmd, payload)
for attempt in range(max_attempts):
try:
self.ser.write(frame)
logger.debug(f"Sent frame: {frame.hex()}")
# Wait for response with timeout
start_time = time.time()
response = bytearray()
while time.time() - start_time < self.timeout:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
response.extend(data)
logger.debug(f"Received {len(data)} bytes: {data.hex()}")
# Try to parse complete frame
result = JlinkFrame.parse(bytes(response))
if result:
return result
time.sleep(0.01)
logger.warning(f"No response on attempt {attempt+1}/{max_attempts}")
except serial.SerialException as e:
logger.error(f"Serial error: {e}")
self.results["summary"]["errors"].append(str(e))
return None
return None
def test_ping(self) -> bool:
"""Test PING command."""
logger.info("Testing PING...")
test_name = "ping"
response = self.send_and_wait(JlinkFrame.CMD_PING)
if response:
resp_cmd, resp_payload = response
self.results["tests"][test_name] = {
"status": "PASS",
"response_cmd": resp_cmd,
"response_len": len(resp_payload),
}
logger.info(f" ✓ PING response received (cmd={resp_cmd})")
self.results["summary"]["passed"] += 1
return True
else:
self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"}
logger.error(" ✗ PING timeout")
self.results["summary"]["failed"] += 1
return False
def test_version(self) -> bool:
"""Test VERSION command."""
logger.info("Testing VERSION...")
test_name = "version"
response = self.send_and_wait(JlinkFrame.CMD_VERSION)
if response:
resp_cmd, resp_payload = response
version_str = resp_payload.decode("utf-8", errors="replace").strip()
self.results["tests"][test_name] = {
"status": "PASS",
"version": version_str,
}
logger.info(f" ✓ VERSION response: {version_str}")
self.results["summary"]["passed"] += 1
return True
else:
self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"}
logger.error(" ✗ VERSION timeout")
self.results["summary"]["failed"] += 1
return False
def test_echo(self, test_data: bytes = b"UART_TEST_FRAME") -> bool:
"""Test ECHO command."""
logger.info(f"Testing ECHO with payload: {test_data}")
test_name = "echo"
response = self.send_and_wait(JlinkFrame.CMD_ECHO, test_data)
if response:
resp_cmd, resp_payload = response
if resp_payload == test_data:
self.results["tests"][test_name] = {
"status": "PASS",
"payload_sent": test_data.hex(),
"payload_received": resp_payload.hex(),
}
logger.info(f" ✓ ECHO verified (round-trip OK)")
self.results["summary"]["passed"] += 1
return True
else:
self.results["tests"][test_name] = {
"status": "FAIL",
"reason": "Payload mismatch",
"sent": test_data.hex(),
"received": resp_payload.hex(),
}
logger.error(f" ✗ ECHO mismatch: sent {test_data.hex()}, got {resp_payload.hex()}")
self.results["summary"]["failed"] += 1
return False
else:
self.results["tests"][test_name] = {"status": "FAIL", "reason": "No response"}
logger.error(" ✗ ECHO timeout")
self.results["summary"]["failed"] += 1
return False
def run_all_tests(self) -> bool:
"""Run all verification tests."""
logger.info("=" * 60)
logger.info("FC↔Orin UART Verification Test Suite (Issue #362)")
logger.info("=" * 60)
if not self.open():
return False
try:
time.sleep(0.5) # Wait for device to be ready
self.test_ping()
time.sleep(0.1)
self.test_version()
time.sleep(0.1)
self.test_echo(b"UART_TEST")
# Summary
logger.info("=" * 60)
passed = self.results["summary"]["passed"]
failed = self.results["summary"]["failed"]
total = passed + failed
logger.info(f"Results: {passed}/{total} passed")
if failed == 0:
logger.info("✓ All tests PASSED")
return True
else:
logger.warning(f"{failed} test(s) FAILED")
return False
finally:
self.close()
def to_json(self) -> str:
"""Export results as JSON."""
return json.dumps(self.results, indent=2)
def publish_mqtt(self, host: str, topic: str = "saltybot/uart_test") -> bool:
"""Publish results to MQTT broker.
Requires: pip install paho-mqtt
"""
try:
import paho.mqtt.client as mqtt
except ImportError:
logger.error("paho-mqtt not installed. Install with: pip install paho-mqtt")
return False
try:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
client.connect(host, 1883, keepalive=10)
payload = self.to_json()
client.publish(topic, payload, qos=1)
client.disconnect()
logger.info(f"Published results to {host}:{topic}")
return True
except Exception as e:
logger.error(f"MQTT publish failed: {e}")
return False
def main():
parser = argparse.ArgumentParser(
description="FC↔Orin UART verification test (Issue #362)"
)
parser.add_argument(
"--port",
default="/dev/ttyTHS1",
help="Serial port device (default: /dev/ttyTHS1)",
)
parser.add_argument(
"--baud",
type=int,
default=921600,
help="Baud rate (default: 921600)",
)
parser.add_argument(
"--timeout",
type=float,
default=0.5,
help="Response timeout in seconds (default: 0.5)",
)
parser.add_argument(
"--mqtt-host",
help="MQTT broker host (optional, requires paho-mqtt)",
)
parser.add_argument(
"--mqtt-topic",
default="saltybot/uart_test",
help="MQTT topic for results (default: saltybot/uart_test)",
)
parser.add_argument(
"--json-output",
help="Save results to JSON file",
)
args = parser.parse_args()
test = UartTest(args.port, args.baud, args.timeout)
success = test.run_all_tests()
# Save JSON results
if args.json_output:
try:
with open(args.json_output, "w") as f:
f.write(test.to_json())
logger.info(f"Results saved to {args.json_output}")
except Exception as e:
logger.error(f"Failed to save JSON: {e}")
# Publish to MQTT
if args.mqtt_host:
test.publish_mqtt(args.mqtt_host, args.mqtt_topic)
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -28,6 +28,7 @@
#include "power_mgmt.h"
#include "battery.h"
#include "coulomb_counter.h"
#include "watchdog.h"
#include <math.h>
#include <string.h>
#include <stdio.h>
@ -43,6 +44,9 @@ extern volatile uint8_t cdc_estop_clear_request;
/* BNO055 active flag (set if BNO055 initialized successfully) */
static bool bno055_active = false;
/* Watchdog reset flag (set if MCU was reset by IWDG timeout) */
static bool g_watchdog_reset_detected = false;
/*
* Apply a PID tuning command string from the USB terminal.
* Format: P<kp> I<ki> D<kd> T<setpoint_deg> M<max_speed> ?
@ -120,6 +124,9 @@ int main(void) {
HAL_Init();
SystemClock_Config();
/* Detect watchdog reset (Issue #300) — must be before safety_init() */
g_watchdog_reset_detected = watchdog_was_reset_by_watchdog();
/* USB CDC */
USBD_Init(&hUsbDevice, &SaltyLab_Desc, 0);
USBD_RegisterClass(&hUsbDevice, &USBD_CDC);
@ -129,6 +136,11 @@ int main(void) {
status_init();
HAL_Delay(3000); /* Wait for USB host to enumerate */
/* Log watchdog reset event if it occurred (Issue #300) */
if (g_watchdog_reset_detected) {
printf("[WATCHDOG] MCU reset by IWDG timeout detected. Main loop may have hung.\n");
}
/* Init IMU (MPU6000 via SPI1 — mpu6000.c wraps icm42688 + complementary filter) */
int imu_ret = mpu6000_init() ? 0 : -1;