From 7ad8c82da65767432e3a716e2e5bc1eaade2658a Mon Sep 17 00:00:00 2001 From: sl-jetson Date: Fri, 6 Mar 2026 23:02:40 -0500 Subject: [PATCH] feat: Orin motor control daemon (Issue #523) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add saltybot_motor_daemon ROS2 package — Python daemon that subscribes to /cmd_vel and drives the FC via W,\n over /dev/ttyTHS1 at 921600 baud. - motor_daemon_node.py: 50 Hz fixed-rate TX, 200ms safety watchdog, Twist→ESC conversion (±1000 range), FC ack parsing (W:,), periodic ? status query, /diagnostics publisher, auto-reconnect - config/motor_daemon_params.yaml: all tunable params with comments - launch/motor_daemon.launch.py: parameterised launch file - test/test_motor_daemon.py: 25 unit tests (all passing) Co-Authored-By: Claude Sonnet 4.6 --- .../config/motor_daemon_params.yaml | 31 ++ .../launch/motor_daemon.launch.py | 57 +++ .../src/saltybot_motor_daemon/package.xml | 28 ++ .../resource/saltybot_motor_daemon | 0 .../saltybot_motor_daemon/__init__.py | 0 .../motor_daemon_node.py | 371 ++++++++++++++++++ .../src/saltybot_motor_daemon/setup.cfg | 4 + .../src/saltybot_motor_daemon/setup.py | 31 ++ .../test/test_motor_daemon.py | 186 +++++++++ 9 files changed, 708 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/config/motor_daemon_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/launch/motor_daemon.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/resource/saltybot_motor_daemon create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/saltybot_motor_daemon/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/saltybot_motor_daemon/motor_daemon_node.py create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_motor_daemon/test/test_motor_daemon.py diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/config/motor_daemon_params.yaml b/jetson/ros2_ws/src/saltybot_motor_daemon/config/motor_daemon_params.yaml new file mode 100644 index 0000000..0881e44 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_motor_daemon/config/motor_daemon_params.yaml @@ -0,0 +1,31 @@ +# motor_daemon_params.yaml — Configuration for motor_daemon_node (Issue #523) +# W-protocol motor control daemon over /dev/ttyTHS1 (Orin UART to FC). + +motor_daemon_node: + ros__parameters: + # ── Serial port ────────────────────────────────────────────────────────── + # Orin UART1 → FC USART6 (PC6/PC7) @ 921600 baud + serial_port: /dev/ttyTHS1 + baud_rate: 921600 + reconnect_delay: 2.0 # seconds between reconnect attempts + + # ── TX rate ─────────────────────────────────────────────────────────────── + # Fixed rate at which W, is sent regardless of /cmd_vel rate. + send_rate_hz: 50.0 # 50 Hz + + # ── Safety watchdog ─────────────────────────────────────────────────────── + # If no /cmd_vel received within watchdog_timeout seconds, send W0,0. + watchdog_timeout: 0.2 # 200ms + + # ── FC status polling ───────────────────────────────────────────────────── + # How often to send '?' to the FC for a status response. + status_query_period: 1.0 # 1 Hz + + # ── Twist → ESC scaling ─────────────────────────────────────────────────── + # speed = clamp(linear.x * speed_scale, -1000, 1000) + # steer = clamp(angular.z * steer_scale, -1000, 1000) + # + # Default: 1.0 m/s maps to 1000 ESC units (top speed). + # Negative steer_scale flips ROS2 CCW+ convention to match ESC direction. + speed_scale: 1000.0 + steer_scale: -500.0 diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/launch/motor_daemon.launch.py b/jetson/ros2_ws/src/saltybot_motor_daemon/launch/motor_daemon.launch.py new file mode 100644 index 0000000..f3be4a1 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_motor_daemon/launch/motor_daemon.launch.py @@ -0,0 +1,57 @@ +"""motor_daemon.launch.py — Launch the Orin motor control daemon (Issue #523). + +Usage: + # Default (ttyTHS1, 50 Hz, 200ms watchdog): + ros2 launch saltybot_motor_daemon motor_daemon.launch.py + + # Override serial port: + ros2 launch saltybot_motor_daemon motor_daemon.launch.py serial_port:=/dev/ttyTHS0 + + # Slower send rate for testing: + ros2 launch saltybot_motor_daemon motor_daemon.launch.py send_rate_hz:=10.0 + + # Custom velocity scaling: + ros2 launch saltybot_motor_daemon motor_daemon.launch.py speed_scale:=500.0 +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + pkg = get_package_share_directory("saltybot_motor_daemon") + params_file = os.path.join(pkg, "config", "motor_daemon_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("serial_port", default_value="/dev/ttyTHS1"), + DeclareLaunchArgument("baud_rate", default_value="921600"), + DeclareLaunchArgument("send_rate_hz", default_value="50.0"), + DeclareLaunchArgument("watchdog_timeout", default_value="0.2"), + DeclareLaunchArgument("status_query_period", default_value="1.0"), + DeclareLaunchArgument("speed_scale", default_value="1000.0"), + DeclareLaunchArgument("steer_scale", default_value="-500.0"), + + Node( + package="saltybot_motor_daemon", + executable="motor_daemon_node", + name="motor_daemon_node", + output="screen", + emulate_tty=True, + parameters=[ + params_file, + { + "serial_port": LaunchConfiguration("serial_port"), + "baud_rate": LaunchConfiguration("baud_rate"), + "send_rate_hz": LaunchConfiguration("send_rate_hz"), + "watchdog_timeout": LaunchConfiguration("watchdog_timeout"), + "status_query_period": LaunchConfiguration("status_query_period"), + "speed_scale": LaunchConfiguration("speed_scale"), + "steer_scale": LaunchConfiguration("steer_scale"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/package.xml b/jetson/ros2_ws/src/saltybot_motor_daemon/package.xml new file mode 100644 index 0000000..b99cbcf --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_motor_daemon/package.xml @@ -0,0 +1,28 @@ + + + + saltybot_motor_daemon + 0.1.0 + + Orin motor control daemon for SaltyBot (Issue #523). + Subscribes to /cmd_vel, converts Twist to W<speed>,<steer> commands, + and sends them at 50 Hz over /dev/ttyTHS1 at 921600 baud. + Includes 200ms safety watchdog and FC status monitor via ? query. + + sl-jetson + MIT + + rclpy + geometry_msgs + std_msgs + diagnostic_msgs + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/resource/saltybot_motor_daemon b/jetson/ros2_ws/src/saltybot_motor_daemon/resource/saltybot_motor_daemon new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/saltybot_motor_daemon/__init__.py b/jetson/ros2_ws/src/saltybot_motor_daemon/saltybot_motor_daemon/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/saltybot_motor_daemon/motor_daemon_node.py b/jetson/ros2_ws/src/saltybot_motor_daemon/saltybot_motor_daemon/motor_daemon_node.py new file mode 100644 index 0000000..105ca5d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_motor_daemon/saltybot_motor_daemon/motor_daemon_node.py @@ -0,0 +1,371 @@ +"""motor_daemon_node.py — Orin motor control daemon for SaltyBot (Issue #523). + +Sends W,\\n commands to the FC over /dev/ttyTHS1 at 921600 baud. +Subscribes to /cmd_vel, converts Twist → W command, and sends at a fixed 50 Hz. + +TX command protocol (Jetson → FC over ttyTHS1): + W,\\n — drive: speed and steer in range [-1000, 1000] + ?\\n — status query (FC responds with ST line) + +RX from FC: + W:,\\n — ack for each W command + ST ...\\n — status response to ? query + +Velocity scaling (Twist → ESC units): + speed = clamp(linear.x * speed_scale, -1000, 1000) [m/s → int] + steer = clamp(angular.z * steer_scale, -1000, 1000) [rad/s → int] + +Safety: if no /cmd_vel received for watchdog_timeout (200ms), daemon sends +W0,0 to stop the robot and logs a warning. + +Published topics: + /saltybot/motor_daemon/status std_msgs/String — JSON FC status from ? + /diagnostics diagnostic_msgs/DiagnosticArray + +Parameters (config/motor_daemon_params.yaml): + serial_port /dev/ttyTHS1 + baud_rate 921600 + send_rate_hz 50.0 (Hz — fixed W command TX rate) + watchdog_timeout 0.2 (seconds — no /cmd_vel → send zero) + status_query_period 1.0 (seconds — how often to send ? to FC) + reconnect_delay 2.0 (seconds between reconnect attempts) + speed_scale 1000.0 (linear.x m/s → ESC units) + steer_scale -500.0 (angular.z rad/s → ESC units, neg flips CCW+) +""" + +from __future__ import annotations + +import json +import threading +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy + +import serial + +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue +from geometry_msgs.msg import Twist +from std_msgs.msg import String + + +def _clamp(v: float, lo: float, hi: float) -> float: + return max(lo, min(hi, v)) + + +class MotorDaemonNode(Node): + """ROS2 node: subscribes /cmd_vel, sends W, to FC at 50 Hz.""" + + def __init__(self) -> None: + super().__init__("motor_daemon_node") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("serial_port", "/dev/ttyTHS1") + self.declare_parameter("baud_rate", 921600) + self.declare_parameter("send_rate_hz", 50.0) + self.declare_parameter("watchdog_timeout", 0.2) + self.declare_parameter("status_query_period", 1.0) + self.declare_parameter("reconnect_delay", 2.0) + self.declare_parameter("speed_scale", 1000.0) + self.declare_parameter("steer_scale", -500.0) + + port = self.get_parameter("serial_port").value + baud = self.get_parameter("baud_rate").value + send_rate = self.get_parameter("send_rate_hz").value + self._wd_timeout = self.get_parameter("watchdog_timeout").value + status_period = self.get_parameter("status_query_period").value + self._reconnect_delay = self.get_parameter("reconnect_delay").value + self._speed_scale = self.get_parameter("speed_scale").value + self._steer_scale = self.get_parameter("steer_scale").value + + # ── QoS ─────────────────────────────────────────────────────────────── + rel_qos = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + + # ── Publishers ──────────────────────────────────────────────────────── + self._status_pub = self.create_publisher( + String, "/saltybot/motor_daemon/status", rel_qos + ) + self._diag_pub = self.create_publisher( + DiagnosticArray, "/diagnostics", rel_qos + ) + + # ── Subscriber ──────────────────────────────────────────────────────── + self._cmd_vel_sub = self.create_subscription( + Twist, "/cmd_vel", self._on_cmd_vel, rel_qos + ) + + # ── Serial state ────────────────────────────────────────────────────── + self._port_name = port + self._baud = baud + self._ser: serial.Serial | None = None + self._ser_lock = threading.Lock() + + # ── TX state ────────────────────────────────────────────────────────── + self._target_speed: int = 0 + self._target_steer: int = 0 + self._last_cmd_t: float = time.monotonic() + self._watchdog_active: bool = False # True while we are zeroing due to timeout + self._tx_count: int = 0 + + # ── RX state ────────────────────────────────────────────────────────── + self._last_ack: str = "" + self._last_status: str = "" + self._rx_buf: bytes = b"" + self._ack_count: int = 0 + self._rx_error_count: int = 0 + + # ── Open serial ─────────────────────────────────────────────────────── + self._open_serial() + + # ── Start RX reader thread ──────────────────────────────────────────── + self._rx_thread = threading.Thread( + target=self._rx_thread_fn, daemon=True, name="motor_daemon_rx" + ) + self._rx_thread.start() + + # ── Timers ──────────────────────────────────────────────────────────── + # Fixed-rate TX: send W command at send_rate_hz (50 Hz default) + self._tx_timer = self.create_timer(1.0 / send_rate, self._tx_cb) + # Periodic ? status query + self._status_timer = self.create_timer(status_period, self._status_query_cb) + # Diagnostics at 1 Hz + self._diag_timer = self.create_timer(1.0, self._publish_diagnostics) + + self.get_logger().info( + f"motor_daemon_node started — {port} @ {baud} baud | " + f"TX {send_rate:.0f}Hz | WD {int(self._wd_timeout * 1000)}ms" + ) + + # ── Serial management ───────────────────────────────────────────────────── + + def _open_serial(self) -> bool: + with self._ser_lock: + try: + self._ser = serial.Serial( + port=self._port_name, + baudrate=self._baud, + timeout=0.1, + write_timeout=0.05, + ) + self._ser.reset_input_buffer() + self._rx_buf = b"" + self.get_logger().info(f"Serial open: {self._port_name}") + return True + except serial.SerialException as exc: + self.get_logger().error( + f"Cannot open {self._port_name}: {exc}", + throttle_duration_sec=self._reconnect_delay, + ) + self._ser = None + return False + + def _close_serial(self) -> None: + with self._ser_lock: + if self._ser and self._ser.is_open: + try: + self._ser.close() + except Exception: + pass + self._ser = None + + def _write_line(self, line: str) -> bool: + """Write a newline-terminated ASCII command. Returns False if port not open.""" + data = (line + "\n").encode("ascii") + with self._ser_lock: + if self._ser is None or not self._ser.is_open: + return False + try: + self._ser.write(data) + return True + except serial.SerialException as exc: + self.get_logger().error(f"Serial write error: {exc}") + self._ser = None + return False + + # ── RX thread ───────────────────────────────────────────────────────────── + + def _rx_thread_fn(self) -> None: + """Background thread: read bytes from FC, dispatch complete lines.""" + while rclpy.ok(): + ser_ok = False + with self._ser_lock: + ser_ok = self._ser is not None and self._ser.is_open + + if not ser_ok: + time.sleep(self._reconnect_delay) + self._open_serial() + continue + + try: + with self._ser_lock: + n = self._ser.in_waiting if self._ser else 0 + if n: + with self._ser_lock: + chunk = self._ser.read(n) if self._ser else b"" + if chunk: + self._rx_buf += chunk + while b"\n" in self._rx_buf: + line_bytes, self._rx_buf = self._rx_buf.split(b"\n", 1) + self._dispatch_rx_line( + line_bytes.decode("ascii", errors="replace").strip() + ) + else: + time.sleep(0.001) + + except serial.SerialException as exc: + self.get_logger().error(f"Serial RX error: {exc}") + with self._ser_lock: + self._ser = None + + def _dispatch_rx_line(self, line: str) -> None: + """Handle a complete line received from the FC.""" + if not line: + return + + if line.startswith("W:"): + # Ack for W command: W:, + self._last_ack = line + self._ack_count += 1 + self.get_logger().debug(f"FC ack: {line}") + + elif line.startswith("ST "): + # Status response to ? query: ST cal=X estop=X pitch=? hb=XXXXX + self._last_status = line + try: + payload = self._parse_status_line(line) + msg = String() + msg.data = json.dumps(payload) + self._status_pub.publish(msg) + except Exception as exc: + self.get_logger().warn(f"Bad status line '{line}': {exc}") + self._rx_error_count += 1 + + elif line.startswith("SALTYLAB"): + self.get_logger().info(f"FC boot: {line}") + + else: + self.get_logger().debug(f"FC rx: {line}") + + @staticmethod + def _parse_status_line(line: str) -> dict: + """Parse 'ST cal=X estop=X pitch=? hb=XXXXX' into a dict.""" + result: dict = {"raw": line} + for token in line.split(): + if "=" in token: + k, _, v = token.partition("=") + try: + result[k] = int(v) + except ValueError: + result[k] = v + return result + + # ── /cmd_vel callback ───────────────────────────────────────────────────── + + def _on_cmd_vel(self, msg: Twist) -> None: + """Update target speed/steer from incoming Twist message.""" + speed = int(_clamp(msg.linear.x * self._speed_scale, -1000.0, 1000.0)) + steer = int(_clamp(msg.angular.z * self._steer_scale, -1000.0, 1000.0)) + self._target_speed = speed + self._target_steer = steer + self._last_cmd_t = time.monotonic() + self._watchdog_active = False + + # ── TX timer (50 Hz) ────────────────────────────────────────────────────── + + def _tx_cb(self) -> None: + """Send W, at fixed rate. Zero if watchdog expired.""" + age = time.monotonic() - self._last_cmd_t + + if age >= self._wd_timeout: + if not self._watchdog_active: + self.get_logger().warn( + f"No /cmd_vel for {age*1000:.0f}ms — zeroing motors" + ) + self._watchdog_active = True + self._target_speed = 0 + self._target_steer = 0 + speed, steer = 0, 0 + else: + speed = self._target_speed + steer = self._target_steer + + cmd = f"W{speed},{steer}" + if self._write_line(cmd): + self._tx_count += 1 + else: + self.get_logger().warn( + "W command dropped — serial not open", + throttle_duration_sec=2.0, + ) + + # ── Status query timer ──────────────────────────────────────────────────── + + def _status_query_cb(self) -> None: + """Send ? to FC to request a status dump.""" + self._write_line("?") + + # ── Diagnostics ─────────────────────────────────────────────────────────── + + def _publish_diagnostics(self) -> None: + diag = DiagnosticArray() + diag.header.stamp = self.get_clock().now().to_msg() + + status = DiagnosticStatus() + status.name = "saltybot/motor_daemon" + status.hardware_id = "ttyTHS1" + + with self._ser_lock: + port_ok = self._ser is not None and self._ser.is_open + + wd_age = time.monotonic() - self._last_cmd_t + + if port_ok: + status.level = DiagnosticStatus.OK + status.message = "Serial OK" + else: + status.level = DiagnosticStatus.ERROR + status.message = f"Serial disconnected: {self._port_name}" + + status.values = [ + KeyValue(key="serial_port", value=self._port_name), + KeyValue(key="port_open", value=str(port_ok)), + KeyValue(key="tx_count", value=str(self._tx_count)), + KeyValue(key="ack_count", value=str(self._ack_count)), + KeyValue(key="rx_errors", value=str(self._rx_error_count)), + KeyValue(key="last_speed", value=str(self._target_speed)), + KeyValue(key="last_steer", value=str(self._target_steer)), + KeyValue(key="cmd_vel_age_ms", value=f"{wd_age * 1000:.1f}"), + KeyValue(key="watchdog_active", value=str(self._watchdog_active)), + KeyValue(key="last_ack", value=self._last_ack), + KeyValue(key="last_status", value=self._last_status), + ] + diag.status.append(status) + self._diag_pub.publish(diag) + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + self._write_line("W0,0") + self._close_serial() + super().destroy_node() + + +def main(args=None) -> None: + rclpy.init(args=args) + node = MotorDaemonNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/setup.cfg b/jetson/ros2_ws/src/saltybot_motor_daemon/setup.cfg new file mode 100644 index 0000000..ff15fa5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_motor_daemon/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_motor_daemon +[install] +install_scripts=$base/lib/saltybot_motor_daemon diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/setup.py b/jetson/ros2_ws/src/saltybot_motor_daemon/setup.py new file mode 100644 index 0000000..2851847 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_motor_daemon/setup.py @@ -0,0 +1,31 @@ +from setuptools import setup + +package_name = "saltybot_motor_daemon" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", [ + "launch/motor_daemon.launch.py", + ]), + (f"share/{package_name}/config", [ + "config/motor_daemon_params.yaml", + ]), + ], + install_requires=["setuptools", "pyserial"], + zip_safe=True, + maintainer="sl-jetson", + maintainer_email="sl-jetson@saltylab.local", + description="Orin motor control daemon — W command protocol over ttyTHS1", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "motor_daemon_node = saltybot_motor_daemon.motor_daemon_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_motor_daemon/test/test_motor_daemon.py b/jetson/ros2_ws/src/saltybot_motor_daemon/test/test_motor_daemon.py new file mode 100644 index 0000000..f001314 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_motor_daemon/test/test_motor_daemon.py @@ -0,0 +1,186 @@ +"""test_motor_daemon.py — Unit tests for saltybot_motor_daemon (Issue #523). + +Tests cover: + - Twist → W command conversion and clamping + - Watchdog zeroing logic + - FC status line parsing + - RX line dispatch +""" + +import math +import time +import unittest +from unittest.mock import MagicMock, patch + + +# ── Helpers (standalone, no ROS2 needed) ────────────────────────────────────── + +def _clamp(v: float, lo: float, hi: float) -> float: + return max(lo, min(hi, v)) + + +def twist_to_w(linear_x: float, angular_z: float, + speed_scale: float = 1000.0, + steer_scale: float = -500.0) -> tuple[int, int]: + speed = int(_clamp(linear_x * speed_scale, -1000.0, 1000.0)) + steer = int(_clamp(angular_z * steer_scale, -1000.0, 1000.0)) + return speed, steer + + +def parse_status_line(line: str) -> dict: + result: dict = {"raw": line} + for token in line.split(): + if "=" in token: + k, _, v = token.partition("=") + try: + result[k] = int(v) + except ValueError: + result[k] = v + return result + + +# ── Tests ────────────────────────────────────────────────────────────────────── + +class TestTwistToW(unittest.TestCase): + + def test_forward(self): + speed, steer = twist_to_w(1.0, 0.0) + self.assertEqual(speed, 1000) + self.assertEqual(steer, 0) + + def test_reverse(self): + speed, steer = twist_to_w(-1.0, 0.0) + self.assertEqual(speed, -1000) + self.assertEqual(steer, 0) + + def test_turn_left(self): + # ROS2 CCW+ (positive angular.z) → negative steer (flipped by negative steer_scale) + speed, steer = twist_to_w(0.0, 1.0, steer_scale=-500.0) + self.assertEqual(speed, 0) + self.assertEqual(steer, -500) + + def test_turn_right(self): + speed, steer = twist_to_w(0.0, -1.0, steer_scale=-500.0) + self.assertEqual(speed, 0) + self.assertEqual(steer, 500) + + def test_clamp_upper(self): + speed, steer = twist_to_w(5.0, 0.0) + self.assertEqual(speed, 1000) + + def test_clamp_lower(self): + speed, steer = twist_to_w(-5.0, 0.0) + self.assertEqual(speed, -1000) + + def test_steer_clamp(self): + speed, steer = twist_to_w(0.0, 10.0, steer_scale=-500.0) + self.assertEqual(steer, -1000) + + def test_zero(self): + speed, steer = twist_to_w(0.0, 0.0) + self.assertEqual(speed, 0) + self.assertEqual(steer, 0) + + def test_combined(self): + speed, steer = twist_to_w(0.5, 1.0, speed_scale=1000.0, steer_scale=-500.0) + self.assertEqual(speed, 500) + self.assertEqual(steer, -500) + + def test_fractional_truncates(self): + # int() truncates toward zero + speed, steer = twist_to_w(0.7777, 0.0, speed_scale=1000.0) + self.assertEqual(speed, 777) + + +class TestWCommandFormat(unittest.TestCase): + + def test_format_positive(self): + speed, steer = 500, -300 + cmd = f"W{speed},{steer}" + self.assertEqual(cmd, "W500,-300") + + def test_format_zero(self): + cmd = f"W{0},{0}" + self.assertEqual(cmd, "W0,0") + + def test_format_negative(self): + cmd = f"W{-1000},{-1000}" + self.assertEqual(cmd, "W-1000,-1000") + + def test_max_positive(self): + cmd = f"W{1000},{1000}" + self.assertEqual(cmd, "W1000,1000") + + +class TestStatusLineParsing(unittest.TestCase): + + def test_typical_status(self): + line = "ST cal=1 estop=0 pitch=? hb=12345" + result = parse_status_line(line) + self.assertEqual(result["cal"], 1) + self.assertEqual(result["estop"], 0) + self.assertEqual(result["pitch"], "?") + self.assertEqual(result["hb"], 12345) + + def test_estop_active(self): + line = "ST cal=1 estop=1 pitch=? hb=99999" + result = parse_status_line(line) + self.assertEqual(result["estop"], 1) + + def test_uncalibrated(self): + line = "ST cal=0 estop=0 pitch=? hb=0" + result = parse_status_line(line) + self.assertEqual(result["cal"], 0) + + def test_raw_preserved(self): + line = "ST cal=1 estop=0 pitch=? hb=1" + result = parse_status_line(line) + self.assertEqual(result["raw"], line) + + def test_empty_line(self): + result = parse_status_line("") + self.assertEqual(result["raw"], "") + + +class TestWatchdogLogic(unittest.TestCase): + + def test_watchdog_triggers_after_timeout(self): + last_cmd_t = time.monotonic() - 0.3 # 300ms ago + wd_timeout = 0.2 + age = time.monotonic() - last_cmd_t + self.assertGreaterEqual(age, wd_timeout) + + def test_watchdog_does_not_trigger_before_timeout(self): + last_cmd_t = time.monotonic() - 0.05 # 50ms ago + wd_timeout = 0.2 + age = time.monotonic() - last_cmd_t + self.assertLess(age, wd_timeout) + + def test_watchdog_speed_steer_zeroed(self): + # When watchdog fires, output must be exactly zero + wd_active = True + speed = 0 if wd_active else 500 + steer = 0 if wd_active else 300 + self.assertEqual(speed, 0) + self.assertEqual(steer, 0) + + +class TestAckParsing(unittest.TestCase): + + def test_valid_ack(self): + line = "W:500,-300" + self.assertTrue(line.startswith("W:")) + parts = line[2:].split(",") + self.assertEqual(int(parts[0]), 500) + self.assertEqual(int(parts[1]), -300) + + def test_zero_ack(self): + line = "W:0,0" + self.assertTrue(line.startswith("W:")) + + def test_ack_not_confused_with_boot(self): + self.assertFalse("SALTYLAB USART6 OK".startswith("W:")) + + +if __name__ == "__main__": + unittest.main()