feat: Orin motor control daemon (Issue #523)

Add saltybot_motor_daemon ROS2 package — Python daemon that subscribes
to /cmd_vel and drives the FC via W<speed>,<steer>\n over /dev/ttyTHS1
at 921600 baud.

- motor_daemon_node.py: 50 Hz fixed-rate TX, 200ms safety watchdog,
  Twist→ESC conversion (±1000 range), FC ack parsing (W:<s>,<st>),
  periodic ? status query, /diagnostics publisher, auto-reconnect
- config/motor_daemon_params.yaml: all tunable params with comments
- launch/motor_daemon.launch.py: parameterised launch file
- test/test_motor_daemon.py: 25 unit tests (all passing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-jetson 2026-03-06 23:02:40 -05:00 committed by sl-controls
parent 3bb4b71cea
commit e28f1549cb
9 changed files with 708 additions and 0 deletions

View File

@ -0,0 +1,31 @@
# motor_daemon_params.yaml — Configuration for motor_daemon_node (Issue #523)
# W-protocol motor control daemon over /dev/ttyTHS1 (Orin UART to FC).
motor_daemon_node:
ros__parameters:
# ── Serial port ──────────────────────────────────────────────────────────
# Orin UART1 → FC USART6 (PC6/PC7) @ 921600 baud
serial_port: /dev/ttyTHS1
baud_rate: 921600
reconnect_delay: 2.0 # seconds between reconnect attempts
# ── TX rate ───────────────────────────────────────────────────────────────
# Fixed rate at which W<speed>,<steer> is sent regardless of /cmd_vel rate.
send_rate_hz: 50.0 # 50 Hz
# ── Safety watchdog ───────────────────────────────────────────────────────
# If no /cmd_vel received within watchdog_timeout seconds, send W0,0.
watchdog_timeout: 0.2 # 200ms
# ── FC status polling ─────────────────────────────────────────────────────
# How often to send '?' to the FC for a status response.
status_query_period: 1.0 # 1 Hz
# ── Twist → ESC scaling ───────────────────────────────────────────────────
# speed = clamp(linear.x * speed_scale, -1000, 1000)
# steer = clamp(angular.z * steer_scale, -1000, 1000)
#
# Default: 1.0 m/s maps to 1000 ESC units (top speed).
# Negative steer_scale flips ROS2 CCW+ convention to match ESC direction.
speed_scale: 1000.0
steer_scale: -500.0

View File

@ -0,0 +1,57 @@
"""motor_daemon.launch.py — Launch the Orin motor control daemon (Issue #523).
Usage:
# Default (ttyTHS1, 50 Hz, 200ms watchdog):
ros2 launch saltybot_motor_daemon motor_daemon.launch.py
# Override serial port:
ros2 launch saltybot_motor_daemon motor_daemon.launch.py serial_port:=/dev/ttyTHS0
# Slower send rate for testing:
ros2 launch saltybot_motor_daemon motor_daemon.launch.py send_rate_hz:=10.0
# Custom velocity scaling:
ros2 launch saltybot_motor_daemon motor_daemon.launch.py speed_scale:=500.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
pkg = get_package_share_directory("saltybot_motor_daemon")
params_file = os.path.join(pkg, "config", "motor_daemon_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("serial_port", default_value="/dev/ttyTHS1"),
DeclareLaunchArgument("baud_rate", default_value="921600"),
DeclareLaunchArgument("send_rate_hz", default_value="50.0"),
DeclareLaunchArgument("watchdog_timeout", default_value="0.2"),
DeclareLaunchArgument("status_query_period", default_value="1.0"),
DeclareLaunchArgument("speed_scale", default_value="1000.0"),
DeclareLaunchArgument("steer_scale", default_value="-500.0"),
Node(
package="saltybot_motor_daemon",
executable="motor_daemon_node",
name="motor_daemon_node",
output="screen",
emulate_tty=True,
parameters=[
params_file,
{
"serial_port": LaunchConfiguration("serial_port"),
"baud_rate": LaunchConfiguration("baud_rate"),
"send_rate_hz": LaunchConfiguration("send_rate_hz"),
"watchdog_timeout": LaunchConfiguration("watchdog_timeout"),
"status_query_period": LaunchConfiguration("status_query_period"),
"speed_scale": LaunchConfiguration("speed_scale"),
"steer_scale": LaunchConfiguration("steer_scale"),
},
],
),
])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_motor_daemon</name>
<version>0.1.0</version>
<description>
Orin motor control daemon for SaltyBot (Issue #523).
Subscribes to /cmd_vel, converts Twist to W&lt;speed&gt;,&lt;steer&gt; commands,
and sends them at 50 Hz over /dev/ttyTHS1 at 921600 baud.
Includes 200ms safety watchdog and FC status monitor via ? query.
</description>
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>diagnostic_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,371 @@
"""motor_daemon_node.py — Orin motor control daemon for SaltyBot (Issue #523).
Sends W<speed>,<steer>\\n commands to the FC over /dev/ttyTHS1 at 921600 baud.
Subscribes to /cmd_vel, converts Twist W command, and sends at a fixed 50 Hz.
TX command protocol (Jetson FC over ttyTHS1):
W<speed>,<steer>\\n drive: speed and steer in range [-1000, 1000]
?\\n status query (FC responds with ST line)
RX from FC:
W:<speed>,<steer>\\n ack for each W command
ST ...\\n status response to ? query
Velocity scaling (Twist ESC units):
speed = clamp(linear.x * speed_scale, -1000, 1000) [m/s int]
steer = clamp(angular.z * steer_scale, -1000, 1000) [rad/s int]
Safety: if no /cmd_vel received for watchdog_timeout (200ms), daemon sends
W0,0 to stop the robot and logs a warning.
Published topics:
/saltybot/motor_daemon/status std_msgs/String JSON FC status from ?
/diagnostics diagnostic_msgs/DiagnosticArray
Parameters (config/motor_daemon_params.yaml):
serial_port /dev/ttyTHS1
baud_rate 921600
send_rate_hz 50.0 (Hz fixed W command TX rate)
watchdog_timeout 0.2 (seconds no /cmd_vel send zero)
status_query_period 1.0 (seconds how often to send ? to FC)
reconnect_delay 2.0 (seconds between reconnect attempts)
speed_scale 1000.0 (linear.x m/s ESC units)
steer_scale -500.0 (angular.z rad/s ESC units, neg flips CCW+)
"""
from __future__ import annotations
import json
import threading
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
import serial
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from geometry_msgs.msg import Twist
from std_msgs.msg import String
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
class MotorDaemonNode(Node):
"""ROS2 node: subscribes /cmd_vel, sends W<speed>,<steer> to FC at 50 Hz."""
def __init__(self) -> None:
super().__init__("motor_daemon_node")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("serial_port", "/dev/ttyTHS1")
self.declare_parameter("baud_rate", 921600)
self.declare_parameter("send_rate_hz", 50.0)
self.declare_parameter("watchdog_timeout", 0.2)
self.declare_parameter("status_query_period", 1.0)
self.declare_parameter("reconnect_delay", 2.0)
self.declare_parameter("speed_scale", 1000.0)
self.declare_parameter("steer_scale", -500.0)
port = self.get_parameter("serial_port").value
baud = self.get_parameter("baud_rate").value
send_rate = self.get_parameter("send_rate_hz").value
self._wd_timeout = self.get_parameter("watchdog_timeout").value
status_period = self.get_parameter("status_query_period").value
self._reconnect_delay = self.get_parameter("reconnect_delay").value
self._speed_scale = self.get_parameter("speed_scale").value
self._steer_scale = self.get_parameter("steer_scale").value
# ── QoS ───────────────────────────────────────────────────────────────
rel_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
# ── Publishers ────────────────────────────────────────────────────────
self._status_pub = self.create_publisher(
String, "/saltybot/motor_daemon/status", rel_qos
)
self._diag_pub = self.create_publisher(
DiagnosticArray, "/diagnostics", rel_qos
)
# ── Subscriber ────────────────────────────────────────────────────────
self._cmd_vel_sub = self.create_subscription(
Twist, "/cmd_vel", self._on_cmd_vel, rel_qos
)
# ── Serial state ──────────────────────────────────────────────────────
self._port_name = port
self._baud = baud
self._ser: serial.Serial | None = None
self._ser_lock = threading.Lock()
# ── TX state ──────────────────────────────────────────────────────────
self._target_speed: int = 0
self._target_steer: int = 0
self._last_cmd_t: float = time.monotonic()
self._watchdog_active: bool = False # True while we are zeroing due to timeout
self._tx_count: int = 0
# ── RX state ──────────────────────────────────────────────────────────
self._last_ack: str = ""
self._last_status: str = ""
self._rx_buf: bytes = b""
self._ack_count: int = 0
self._rx_error_count: int = 0
# ── Open serial ───────────────────────────────────────────────────────
self._open_serial()
# ── Start RX reader thread ────────────────────────────────────────────
self._rx_thread = threading.Thread(
target=self._rx_thread_fn, daemon=True, name="motor_daemon_rx"
)
self._rx_thread.start()
# ── Timers ────────────────────────────────────────────────────────────
# Fixed-rate TX: send W command at send_rate_hz (50 Hz default)
self._tx_timer = self.create_timer(1.0 / send_rate, self._tx_cb)
# Periodic ? status query
self._status_timer = self.create_timer(status_period, self._status_query_cb)
# Diagnostics at 1 Hz
self._diag_timer = self.create_timer(1.0, self._publish_diagnostics)
self.get_logger().info(
f"motor_daemon_node started — {port} @ {baud} baud | "
f"TX {send_rate:.0f}Hz | WD {int(self._wd_timeout * 1000)}ms"
)
# ── Serial management ─────────────────────────────────────────────────────
def _open_serial(self) -> bool:
with self._ser_lock:
try:
self._ser = serial.Serial(
port=self._port_name,
baudrate=self._baud,
timeout=0.1,
write_timeout=0.05,
)
self._ser.reset_input_buffer()
self._rx_buf = b""
self.get_logger().info(f"Serial open: {self._port_name}")
return True
except serial.SerialException as exc:
self.get_logger().error(
f"Cannot open {self._port_name}: {exc}",
throttle_duration_sec=self._reconnect_delay,
)
self._ser = None
return False
def _close_serial(self) -> None:
with self._ser_lock:
if self._ser and self._ser.is_open:
try:
self._ser.close()
except Exception:
pass
self._ser = None
def _write_line(self, line: str) -> bool:
"""Write a newline-terminated ASCII command. Returns False if port not open."""
data = (line + "\n").encode("ascii")
with self._ser_lock:
if self._ser is None or not self._ser.is_open:
return False
try:
self._ser.write(data)
return True
except serial.SerialException as exc:
self.get_logger().error(f"Serial write error: {exc}")
self._ser = None
return False
# ── RX thread ─────────────────────────────────────────────────────────────
def _rx_thread_fn(self) -> None:
"""Background thread: read bytes from FC, dispatch complete lines."""
while rclpy.ok():
ser_ok = False
with self._ser_lock:
ser_ok = self._ser is not None and self._ser.is_open
if not ser_ok:
time.sleep(self._reconnect_delay)
self._open_serial()
continue
try:
with self._ser_lock:
n = self._ser.in_waiting if self._ser else 0
if n:
with self._ser_lock:
chunk = self._ser.read(n) if self._ser else b""
if chunk:
self._rx_buf += chunk
while b"\n" in self._rx_buf:
line_bytes, self._rx_buf = self._rx_buf.split(b"\n", 1)
self._dispatch_rx_line(
line_bytes.decode("ascii", errors="replace").strip()
)
else:
time.sleep(0.001)
except serial.SerialException as exc:
self.get_logger().error(f"Serial RX error: {exc}")
with self._ser_lock:
self._ser = None
def _dispatch_rx_line(self, line: str) -> None:
"""Handle a complete line received from the FC."""
if not line:
return
if line.startswith("W:"):
# Ack for W command: W:<speed>,<steer>
self._last_ack = line
self._ack_count += 1
self.get_logger().debug(f"FC ack: {line}")
elif line.startswith("ST "):
# Status response to ? query: ST cal=X estop=X pitch=? hb=XXXXX
self._last_status = line
try:
payload = self._parse_status_line(line)
msg = String()
msg.data = json.dumps(payload)
self._status_pub.publish(msg)
except Exception as exc:
self.get_logger().warn(f"Bad status line '{line}': {exc}")
self._rx_error_count += 1
elif line.startswith("SALTYLAB"):
self.get_logger().info(f"FC boot: {line}")
else:
self.get_logger().debug(f"FC rx: {line}")
@staticmethod
def _parse_status_line(line: str) -> dict:
"""Parse 'ST cal=X estop=X pitch=? hb=XXXXX' into a dict."""
result: dict = {"raw": line}
for token in line.split():
if "=" in token:
k, _, v = token.partition("=")
try:
result[k] = int(v)
except ValueError:
result[k] = v
return result
# ── /cmd_vel callback ─────────────────────────────────────────────────────
def _on_cmd_vel(self, msg: Twist) -> None:
"""Update target speed/steer from incoming Twist message."""
speed = int(_clamp(msg.linear.x * self._speed_scale, -1000.0, 1000.0))
steer = int(_clamp(msg.angular.z * self._steer_scale, -1000.0, 1000.0))
self._target_speed = speed
self._target_steer = steer
self._last_cmd_t = time.monotonic()
self._watchdog_active = False
# ── TX timer (50 Hz) ──────────────────────────────────────────────────────
def _tx_cb(self) -> None:
"""Send W<speed>,<steer> at fixed rate. Zero if watchdog expired."""
age = time.monotonic() - self._last_cmd_t
if age >= self._wd_timeout:
if not self._watchdog_active:
self.get_logger().warn(
f"No /cmd_vel for {age*1000:.0f}ms — zeroing motors"
)
self._watchdog_active = True
self._target_speed = 0
self._target_steer = 0
speed, steer = 0, 0
else:
speed = self._target_speed
steer = self._target_steer
cmd = f"W{speed},{steer}"
if self._write_line(cmd):
self._tx_count += 1
else:
self.get_logger().warn(
"W command dropped — serial not open",
throttle_duration_sec=2.0,
)
# ── Status query timer ────────────────────────────────────────────────────
def _status_query_cb(self) -> None:
"""Send ? to FC to request a status dump."""
self._write_line("?")
# ── Diagnostics ───────────────────────────────────────────────────────────
def _publish_diagnostics(self) -> None:
diag = DiagnosticArray()
diag.header.stamp = self.get_clock().now().to_msg()
status = DiagnosticStatus()
status.name = "saltybot/motor_daemon"
status.hardware_id = "ttyTHS1"
with self._ser_lock:
port_ok = self._ser is not None and self._ser.is_open
wd_age = time.monotonic() - self._last_cmd_t
if port_ok:
status.level = DiagnosticStatus.OK
status.message = "Serial OK"
else:
status.level = DiagnosticStatus.ERROR
status.message = f"Serial disconnected: {self._port_name}"
status.values = [
KeyValue(key="serial_port", value=self._port_name),
KeyValue(key="port_open", value=str(port_ok)),
KeyValue(key="tx_count", value=str(self._tx_count)),
KeyValue(key="ack_count", value=str(self._ack_count)),
KeyValue(key="rx_errors", value=str(self._rx_error_count)),
KeyValue(key="last_speed", value=str(self._target_speed)),
KeyValue(key="last_steer", value=str(self._target_steer)),
KeyValue(key="cmd_vel_age_ms", value=f"{wd_age * 1000:.1f}"),
KeyValue(key="watchdog_active", value=str(self._watchdog_active)),
KeyValue(key="last_ack", value=self._last_ack),
KeyValue(key="last_status", value=self._last_status),
]
diag.status.append(status)
self._diag_pub.publish(diag)
# ── Lifecycle ─────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
self._write_line("W0,0")
self._close_serial()
super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = MotorDaemonNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_motor_daemon
[install]
install_scripts=$base/lib/saltybot_motor_daemon

View File

@ -0,0 +1,31 @@
from setuptools import setup
package_name = "saltybot_motor_daemon"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", [
"launch/motor_daemon.launch.py",
]),
(f"share/{package_name}/config", [
"config/motor_daemon_params.yaml",
]),
],
install_requires=["setuptools", "pyserial"],
zip_safe=True,
maintainer="sl-jetson",
maintainer_email="sl-jetson@saltylab.local",
description="Orin motor control daemon — W command protocol over ttyTHS1",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"motor_daemon_node = saltybot_motor_daemon.motor_daemon_node:main",
],
},
)

View File

@ -0,0 +1,186 @@
"""test_motor_daemon.py — Unit tests for saltybot_motor_daemon (Issue #523).
Tests cover:
- Twist W command conversion and clamping
- Watchdog zeroing logic
- FC status line parsing
- RX line dispatch
"""
import math
import time
import unittest
from unittest.mock import MagicMock, patch
# ── Helpers (standalone, no ROS2 needed) ──────────────────────────────────────
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def twist_to_w(linear_x: float, angular_z: float,
speed_scale: float = 1000.0,
steer_scale: float = -500.0) -> tuple[int, int]:
speed = int(_clamp(linear_x * speed_scale, -1000.0, 1000.0))
steer = int(_clamp(angular_z * steer_scale, -1000.0, 1000.0))
return speed, steer
def parse_status_line(line: str) -> dict:
result: dict = {"raw": line}
for token in line.split():
if "=" in token:
k, _, v = token.partition("=")
try:
result[k] = int(v)
except ValueError:
result[k] = v
return result
# ── Tests ──────────────────────────────────────────────────────────────────────
class TestTwistToW(unittest.TestCase):
def test_forward(self):
speed, steer = twist_to_w(1.0, 0.0)
self.assertEqual(speed, 1000)
self.assertEqual(steer, 0)
def test_reverse(self):
speed, steer = twist_to_w(-1.0, 0.0)
self.assertEqual(speed, -1000)
self.assertEqual(steer, 0)
def test_turn_left(self):
# ROS2 CCW+ (positive angular.z) → negative steer (flipped by negative steer_scale)
speed, steer = twist_to_w(0.0, 1.0, steer_scale=-500.0)
self.assertEqual(speed, 0)
self.assertEqual(steer, -500)
def test_turn_right(self):
speed, steer = twist_to_w(0.0, -1.0, steer_scale=-500.0)
self.assertEqual(speed, 0)
self.assertEqual(steer, 500)
def test_clamp_upper(self):
speed, steer = twist_to_w(5.0, 0.0)
self.assertEqual(speed, 1000)
def test_clamp_lower(self):
speed, steer = twist_to_w(-5.0, 0.0)
self.assertEqual(speed, -1000)
def test_steer_clamp(self):
speed, steer = twist_to_w(0.0, 10.0, steer_scale=-500.0)
self.assertEqual(steer, -1000)
def test_zero(self):
speed, steer = twist_to_w(0.0, 0.0)
self.assertEqual(speed, 0)
self.assertEqual(steer, 0)
def test_combined(self):
speed, steer = twist_to_w(0.5, 1.0, speed_scale=1000.0, steer_scale=-500.0)
self.assertEqual(speed, 500)
self.assertEqual(steer, -500)
def test_fractional_truncates(self):
# int() truncates toward zero
speed, steer = twist_to_w(0.7777, 0.0, speed_scale=1000.0)
self.assertEqual(speed, 777)
class TestWCommandFormat(unittest.TestCase):
def test_format_positive(self):
speed, steer = 500, -300
cmd = f"W{speed},{steer}"
self.assertEqual(cmd, "W500,-300")
def test_format_zero(self):
cmd = f"W{0},{0}"
self.assertEqual(cmd, "W0,0")
def test_format_negative(self):
cmd = f"W{-1000},{-1000}"
self.assertEqual(cmd, "W-1000,-1000")
def test_max_positive(self):
cmd = f"W{1000},{1000}"
self.assertEqual(cmd, "W1000,1000")
class TestStatusLineParsing(unittest.TestCase):
def test_typical_status(self):
line = "ST cal=1 estop=0 pitch=? hb=12345"
result = parse_status_line(line)
self.assertEqual(result["cal"], 1)
self.assertEqual(result["estop"], 0)
self.assertEqual(result["pitch"], "?")
self.assertEqual(result["hb"], 12345)
def test_estop_active(self):
line = "ST cal=1 estop=1 pitch=? hb=99999"
result = parse_status_line(line)
self.assertEqual(result["estop"], 1)
def test_uncalibrated(self):
line = "ST cal=0 estop=0 pitch=? hb=0"
result = parse_status_line(line)
self.assertEqual(result["cal"], 0)
def test_raw_preserved(self):
line = "ST cal=1 estop=0 pitch=? hb=1"
result = parse_status_line(line)
self.assertEqual(result["raw"], line)
def test_empty_line(self):
result = parse_status_line("")
self.assertEqual(result["raw"], "")
class TestWatchdogLogic(unittest.TestCase):
def test_watchdog_triggers_after_timeout(self):
last_cmd_t = time.monotonic() - 0.3 # 300ms ago
wd_timeout = 0.2
age = time.monotonic() - last_cmd_t
self.assertGreaterEqual(age, wd_timeout)
def test_watchdog_does_not_trigger_before_timeout(self):
last_cmd_t = time.monotonic() - 0.05 # 50ms ago
wd_timeout = 0.2
age = time.monotonic() - last_cmd_t
self.assertLess(age, wd_timeout)
def test_watchdog_speed_steer_zeroed(self):
# When watchdog fires, output must be exactly zero
wd_active = True
speed = 0 if wd_active else 500
steer = 0 if wd_active else 300
self.assertEqual(speed, 0)
self.assertEqual(steer, 0)
class TestAckParsing(unittest.TestCase):
def test_valid_ack(self):
line = "W:500,-300"
self.assertTrue(line.startswith("W:"))
parts = line[2:].split(",")
self.assertEqual(int(parts[0]), 500)
self.assertEqual(int(parts[1]), -300)
def test_zero_ack(self):
line = "W:0,0"
self.assertTrue(line.startswith("W:"))
def test_ack_not_confused_with_boot(self):
self.assertFalse("SALTYLAB USART6 OK".startswith("W:"))
if __name__ == "__main__":
unittest.main()