feat: Nav2 cmd_vel to STM32 autonomous drive bridge

Adds cmd_vel_bridge_node — a standalone ROS2 node that subscribes to
Nav2 /cmd_vel and drives the STM32 over USB CDC with:
  - Hard velocity limits (max_linear_vel=0.5 m/s, max_angular_vel=2.0 rad/s)
  - Smooth ESC ramp (500 ESC-units/s, 50 Hz control loop)
  - Deadman switch: zeros targets if /cmd_vel silent >500 ms
  - Mode gate: sends drive only when STM32 reports md=2 (AUTONOMOUS)
  - Telemetry RX → /saltybot/imu, /saltybot/balance_state, /diagnostics
  - Heartbeat TX every 200 ms (H\n)

Deliverables:
  saltybot_bridge/cmd_vel_bridge_node.py   — node implementation
  config/cmd_vel_bridge_params.yaml        — tunable parameters
  launch/cmd_vel_bridge.launch.py          — standalone launch file
  test/test_cmd_vel_bridge.py              — 37 pytest unit tests (no ROS2)
  setup.py                                 — register node + new data files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-controls 2026-02-28 22:50:15 -05:00
parent 2a12aac86f
commit a50f22d56b
5 changed files with 838 additions and 2 deletions

View File

@ -0,0 +1,54 @@
# cmd_vel_bridge_params.yaml
# Configuration for cmd_vel_bridge_node — Nav2 /cmd_vel → STM32 autonomous drive.
#
# Run with:
# ros2 launch saltybot_bridge cmd_vel_bridge.launch.py
# Or override individual params:
# ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=0.3
# ── Serial ─────────────────────────────────────────────────────────────────────
# Use /dev/stm32-bridge if udev rule from jetson/docs/pinout.md is applied.
serial_port: /dev/ttyACM0
baud_rate: 921600
timeout: 0.05 # serial readline timeout (s)
reconnect_delay: 2.0 # seconds between reconnect attempts
# ── Heartbeat ──────────────────────────────────────────────────────────────────
# STM32 jetson_cmd module reverts steer to 0 after JETSON_HB_TIMEOUT_MS (500ms).
# Keep heartbeat well below that threshold.
heartbeat_period: 0.2 # seconds (200ms)
# ── Velocity limits ────────────────────────────────────────────────────────────
# Hard caps applied BEFORE scaling. Prevents Nav2 planner or param tuning errors
# from commanding unsafe speeds on the balance robot.
#
# Conservative defaults — increase only after verifying balance stability.
max_linear_vel: 0.5 # m/s (0.5 = cautious first deployment)
max_angular_vel: 2.0 # rad/s
# ── Scaling: physical units → ESC counts (-1000..+1000) ───────────────────────
# speed = clamp(linear.x * speed_scale, -1000, 1000) [after limit clamp]
# steer = clamp(angular.z * steer_scale, -1000, 1000)
#
# speed_scale: 1000.0 means 1 m/s → 1000 ESC units (full scale at max_linear_vel=1.0)
# At max_linear_vel=0.5: max speed output = 500 ESC units.
# steer_scale: negative because ROS2 +angular.z = CCW (left) but ESC positive
# steer typically means right-turn. Flip sign if robot steers backwards.
speed_scale: 1000.0
steer_scale: -500.0
# ── Acceleration ramp ──────────────────────────────────────────────────────────
# Maximum ESC-unit change per second (applied to both speed and steer).
# Control loop runs at 50 Hz → step per tick = ramp_rate / 50.
#
# ramp_rate=500: 0 → 500 ESC (= 0.5 m/s at speed_scale=1000) takes 1.0s
# ramp_rate=250: same ramp takes 2.0s (gentler, safer for tuning)
#
# Lower values = smoother but slower response to Nav2 waypoint commands.
ramp_rate: 500 # ESC units/second
# ── Deadman switch ─────────────────────────────────────────────────────────────
# If /cmd_vel is not received for this many seconds, target speed/steer are
# zeroed immediately. The ramp then drives the robot to a stop.
# 500ms matches the STM32 jetson heartbeat timeout for consistency.
cmd_vel_timeout: 0.5 # seconds

View File

@ -0,0 +1,100 @@
"""
cmd_vel_bridge.launch.py Nav2 cmd_vel STM32 autonomous drive bridge.
Starts cmd_vel_bridge_node, which owns the serial port exclusively and provides:
- /cmd_vel subscription with velocity limits + smooth ramp
- Deadman switch (zero speed if /cmd_vel silent > cmd_vel_timeout)
- Mode gate (drives only when STM32 is in AUTONOMOUS mode, md=2)
- Telemetry RX /saltybot/imu, /saltybot/balance_state, /diagnostics
- /saltybot/cmd publisher (observability)
Do NOT run alongside serial_bridge_node or saltybot_cmd_node on the same port.
Usage:
# Defaults (max 0.5 m/s, 2.0 rad/s, 1s ramp, 500ms deadman):
ros2 launch saltybot_bridge cmd_vel_bridge.launch.py
# Override speed limit for cautious tuning:
ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=0.3
# Load all params from YAML:
ros2 launch saltybot_bridge cmd_vel_bridge.launch.py \\
params_file:=/path/to/cmd_vel_bridge_params.yaml
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def _launch_bridge(context, *args, **kwargs):
params_file = LaunchConfiguration("params_file").perform(context)
# Build parameter dict from individual launch args (allow YAML override)
inline_params = {
"serial_port": LaunchConfiguration("serial_port").perform(context),
"baud_rate": int(LaunchConfiguration("baud_rate").perform(context)),
"timeout": 0.05,
"reconnect_delay": 2.0,
"heartbeat_period":float(LaunchConfiguration("heartbeat_period").perform(context)),
"max_linear_vel": float(LaunchConfiguration("max_linear_vel").perform(context)),
"max_angular_vel": float(LaunchConfiguration("max_angular_vel").perform(context)),
"speed_scale": float(LaunchConfiguration("speed_scale").perform(context)),
"steer_scale": float(LaunchConfiguration("steer_scale").perform(context)),
"ramp_rate": int(LaunchConfiguration("ramp_rate").perform(context)),
"cmd_vel_timeout": float(LaunchConfiguration("cmd_vel_timeout").perform(context)),
}
# If a params file is provided, it takes precedence (ROS2 merges inline last)
node_params = [params_file, inline_params] if params_file else [inline_params]
return [Node(
package="saltybot_bridge",
executable="cmd_vel_bridge_node",
name="cmd_vel_bridge",
output="screen",
parameters=node_params,
)]
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_bridge")
default_params = os.path.join(pkg_share, "config", "cmd_vel_bridge_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=default_params,
description="Full path to cmd_vel_bridge_params.yaml (overrides inline args)"),
DeclareLaunchArgument(
"serial_port", default_value="/dev/ttyACM0",
description="STM32 USB CDC device node"),
DeclareLaunchArgument(
"baud_rate", default_value="921600"),
DeclareLaunchArgument(
"heartbeat_period",default_value="0.2",
description="Heartbeat interval (s); must be < STM32 HB timeout (0.5s)"),
DeclareLaunchArgument(
"max_linear_vel", default_value="0.5",
description="Hard speed cap before scaling (m/s)"),
DeclareLaunchArgument(
"max_angular_vel", default_value="2.0",
description="Hard angular rate cap before scaling (rad/s)"),
DeclareLaunchArgument(
"speed_scale", default_value="1000.0",
description="m/s → ESC units (linear.x)"),
DeclareLaunchArgument(
"steer_scale", default_value="-500.0",
description="rad/s → ESC units (angular.z, neg=flip CCW)"),
DeclareLaunchArgument(
"ramp_rate", default_value="500",
description="Max ESC-unit change per second (acceleration limit)"),
DeclareLaunchArgument(
"cmd_vel_timeout", default_value="0.5",
description="Deadman: zero targets if /cmd_vel silent this long (s)"),
OpaqueFunction(function=_launch_bridge),
])

View File

@ -0,0 +1,436 @@
"""
cmd_vel_bridge_node Nav2 /cmd_vel STM32 drive command bridge.
Extends the basic saltybot_cmd_node with four additions required for safe
autonomous operation on a self-balancing robot:
1. Velocity limits clamp linear.x / angular.z before scaling so Nav2
planner misconfiguration can't command unsafe speed.
2. Smooth ramp interpolate ESC commands toward target at ramp_rate
ESC-units/s to prevent the sudden torque changes that
topple a balance robot.
3. Deadman switch if /cmd_vel is silent for cmd_vel_timeout seconds,
zero targets immediately (Nav2 node crash / planner
stall robot coasts to stop rather than running away).
4. Mode gate only issue non-zero drive commands when STM32 reports
md=2 (AUTONOMOUS). In any other mode (RC_MANUAL,
RC_ASSISTED) Jetson cannot override the RC pilot.
On mode re-entry current ramp state resets to 0 so
acceleration is always smooth from rest.
Serial protocol (C<speed>,<steer>\\n / H\\n same as saltybot_cmd_node):
C<spd>,<str>\\n drive command. speed/steer: -1000..+1000 integers.
H\\n heartbeat. STM32 reverts steer to 0 after 500ms silence.
Telemetry (50 Hz from STM32):
Same RX/publish pipeline as saltybot_cmd_node.
The "md" field (0=MANUAL,1=ASSISTED,2=AUTO) is parsed for the mode gate.
Topics published:
/saltybot/imu sensor_msgs/Imu
/saltybot/balance_state std_msgs/String (JSON)
/saltybot/cmd std_msgs/String formatted command for observability
/diagnostics diagnostic_msgs/DiagnosticArray
Topics subscribed:
/cmd_vel geometry_msgs/Twist
This node owns the serial port exclusively. Do NOT run alongside
serial_bridge_node or saltybot_cmd_node on the same port.
"""
import json
import math
import threading
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
import serial
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from geometry_msgs.msg import Twist
from sensor_msgs.msg import Imu
from std_msgs.msg import String
_STATE_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"}
_MODE_LABEL = {0: "RC_MANUAL", 1: "RC_ASSISTED", 2: "AUTONOMOUS"}
IMU_FRAME_ID = "imu_link"
MODE_AUTONOMOUS = 2
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def _ramp_toward(current: int, target: int, max_step: int) -> int:
"""Step `current` toward `target` by at most `max_step`."""
diff = target - current
if abs(diff) <= max_step:
return target
return current + (max_step if diff > 0 else -max_step)
class CmdVelBridgeNode(Node):
def __init__(self):
super().__init__("cmd_vel_bridge")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("serial_port", "/dev/ttyACM0")
self.declare_parameter("baud_rate", 921600)
self.declare_parameter("timeout", 0.05)
self.declare_parameter("reconnect_delay", 2.0)
self.declare_parameter("heartbeat_period", 0.2)
# Velocity limits (applied before scale → hard cap on robot speed)
self.declare_parameter("max_linear_vel", 0.5) # m/s
self.declare_parameter("max_angular_vel", 2.0) # rad/s
# Scaling: physical units → ESC counts (-1000..+1000)
self.declare_parameter("speed_scale", 1000.0) # m/s → ESC
self.declare_parameter("steer_scale", -500.0) # rad/s→ ESC (neg=flip CCW→right)
# Ramp: max ESC-unit change per second (both speed and steer)
self.declare_parameter("ramp_rate", 500) # ESC units/s
# Deadman: zero targets if /cmd_vel silent longer than this
self.declare_parameter("cmd_vel_timeout", 0.5) # seconds
port = self.get_parameter("serial_port").value
baud = self.get_parameter("baud_rate").value
timeout = self.get_parameter("timeout").value
self._reconnect_delay = self.get_parameter("reconnect_delay").value
self._hb_period = self.get_parameter("heartbeat_period").value
self._max_linear_vel = self.get_parameter("max_linear_vel").value
self._max_angular_vel = self.get_parameter("max_angular_vel").value
self._speed_scale = self.get_parameter("speed_scale").value
self._steer_scale = self.get_parameter("steer_scale").value
self._ramp_rate = int(self.get_parameter("ramp_rate").value)
self._cmd_vel_timeout = self.get_parameter("cmd_vel_timeout").value
# Control loop runs at 50 Hz; ramp step per tick = ramp_rate / 50
_CONTROL_HZ = 50
self._ramp_step = max(1, self._ramp_rate // _CONTROL_HZ)
# ── QoS ───────────────────────────────────────────────────────────────
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST, depth=10)
reliable_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST, depth=10)
# ── Publishers ────────────────────────────────────────────────────────
self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos)
self._balance_pub = self.create_publisher(String, "/saltybot/balance_state", reliable_qos)
self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", reliable_qos)
self._cmd_pub = self.create_publisher(String, "/saltybot/cmd", reliable_qos)
# ── Subscription ──────────────────────────────────────────────────────
self._cmd_vel_sub = self.create_subscription(
Twist, "/cmd_vel", self._cmd_vel_cb, reliable_qos)
# ── Drive state ───────────────────────────────────────────────────────
self._target_speed = 0 # desired ESC units from latest /cmd_vel
self._target_steer = 0
self._current_speed = 0 # ramped output actually sent
self._current_steer = 0
self._last_cmd_vel = 0.0 # wall clock (seconds) of last /cmd_vel msg
self._stm32_mode = 0 # parsed "md" field: 0=MANUAL,1=ASSISTED,2=AUTO
self._last_state = -1
self._frame_count = 0
self._error_count = 0
self._last_speed_sent = 0
self._last_steer_sent = 0
# ── Serial ────────────────────────────────────────────────────────────
self._port_name = port
self._baud = baud
self._timeout = timeout
self._ser: serial.Serial | None = None
self._ser_lock = threading.Lock()
self._open_serial()
# ── Timers ────────────────────────────────────────────────────────────
# Telemetry read at 100 Hz (STM32 sends at 50 Hz)
self._read_timer = self.create_timer(0.01, self._read_cb)
# Control loop at 50 Hz: ramp + deadman + mode gate + send
self._control_timer = self.create_timer(1.0 / _CONTROL_HZ, self._control_cb)
# Heartbeat TX
self._hb_timer = self.create_timer(self._hb_period, self._heartbeat_cb)
self.get_logger().info(
f"cmd_vel_bridge started — {port} @ {baud} baud | "
f"max_vel={self._max_linear_vel:.1f}m/s "
f"max_ang={self._max_angular_vel:.1f}rad/s "
f"ramp={self._ramp_rate}ESC/s "
f"deadman={self._cmd_vel_timeout:.1f}s"
)
# ── Serial management ─────────────────────────────────────────────────────
def _open_serial(self) -> bool:
with self._ser_lock:
try:
self._ser = serial.Serial(
port=self._port_name, baudrate=self._baud,
timeout=self._timeout)
self._ser.reset_input_buffer()
self.get_logger().info(f"Serial open: {self._port_name}")
return True
except serial.SerialException as exc:
self.get_logger().error(f"Cannot open {self._port_name}: {exc}")
self._ser = None
return False
def _close_serial(self):
with self._ser_lock:
if self._ser and self._ser.is_open:
try:
self._ser.close()
except Exception:
pass
self._ser = None
def _write(self, data: bytes) -> bool:
with self._ser_lock:
if self._ser is None or not self._ser.is_open:
return False
try:
self._ser.write(data)
return True
except serial.SerialException as exc:
self.get_logger().error(f"Serial write error: {exc}")
self._ser = None
return False
# ── /cmd_vel callback ─────────────────────────────────────────────────────
def _cmd_vel_cb(self, msg: Twist):
"""Clamp velocity to limits then convert to ESC units."""
lin_x = _clamp(msg.linear.x, -self._max_linear_vel, self._max_linear_vel)
ang_z = _clamp(msg.angular.z, -self._max_angular_vel, self._max_angular_vel)
self._target_speed = int(_clamp(lin_x * self._speed_scale, -1000.0, 1000.0))
self._target_steer = int(_clamp(ang_z * self._steer_scale, -1000.0, 1000.0))
self._last_cmd_vel = time.monotonic()
# ── 50 Hz control loop ────────────────────────────────────────────────────
def _control_cb(self):
now = time.monotonic()
# Deadman: zero targets if /cmd_vel went silent
if (self._last_cmd_vel > 0.0 and
now - self._last_cmd_vel > self._cmd_vel_timeout):
self._target_speed = 0
self._target_steer = 0
# Mode gate: in non-AUTONOMOUS mode, zero and reset ramp state so
# re-entry always accelerates smoothly from 0.
if self._stm32_mode != MODE_AUTONOMOUS:
self._current_speed = 0
self._current_steer = 0
speed, steer = 0, 0
else:
# Smooth ramp toward targets
self._current_speed = _ramp_toward(
self._current_speed, self._target_speed, self._ramp_step)
self._current_steer = _ramp_toward(
self._current_steer, self._target_steer, self._ramp_step)
speed = self._current_speed
steer = self._current_steer
# Send to STM32
frame = f"C{speed},{steer}\n".encode("ascii")
if not self._write(frame):
self.get_logger().warn(
"Cannot send cmd — serial not open",
throttle_duration_sec=2.0)
self._last_speed_sent = speed
self._last_steer_sent = steer
# Publish command for observability / rosbag
cmd_msg = String()
cmd_msg.data = f"C{speed},{steer}"
self._cmd_pub.publish(cmd_msg)
# ── Heartbeat TX ──────────────────────────────────────────────────────────
def _heartbeat_cb(self):
"""H\\n keeps STM32 jetson_cmd heartbeat alive regardless of mode."""
self._write(b"H\n")
# ── Telemetry RX ──────────────────────────────────────────────────────────
def _read_cb(self):
with self._ser_lock:
if self._ser is None or not self._ser.is_open:
pass
else:
try:
lines = []
while self._ser.in_waiting:
raw = self._ser.readline()
if raw:
lines.append(raw)
except serial.SerialException as exc:
self.get_logger().error(f"Serial read error: {exc}")
self._ser = None
lines = []
for raw in lines:
self._parse_and_publish(raw)
with self._ser_lock:
if self._ser is None:
self.get_logger().warn(
"Serial lost — reconnecting…",
throttle_duration_sec=self._reconnect_delay)
if self._ser is None:
self._open_serial()
def _parse_and_publish(self, raw: bytes):
try:
text = raw.decode("ascii", errors="ignore").strip()
if not text:
return
data = json.loads(text)
except (ValueError, UnicodeDecodeError) as exc:
self.get_logger().debug(f"Parse error ({exc}): {raw!r}")
self._error_count += 1
return
now = self.get_clock().now().to_msg()
if "err" in data:
self._publish_imu_fault(data["err"], now)
return
required = ("p", "r", "e", "ig", "m", "s", "y")
if not all(k in data for k in required):
self.get_logger().debug(f"Incomplete frame: {text}")
return
pitch_deg = data["p"] / 10.0
roll_deg = data["r"] / 10.0
yaw_deg = data["y"] / 10.0
error_deg = data["e"] / 10.0
integral = data["ig"] / 10.0
motor_cmd = float(data["m"])
state = int(data["s"])
mode = int(data.get("md", 0)) # 0=MANUAL if not present
self._stm32_mode = mode
self._frame_count += 1
self._publish_imu(pitch_deg, roll_deg, yaw_deg, now)
self._publish_balance_state(
pitch_deg, roll_deg, yaw_deg,
error_deg, integral, motor_cmd, state, mode, now)
if state != self._last_state:
self.get_logger().info(
f"Balance state → {_STATE_LABEL.get(state, f'UNKNOWN({state})')}"
f" mode={_MODE_LABEL.get(mode, str(mode))}"
)
self._last_state = state
def _publish_imu(self, pitch_deg, roll_deg, yaw_deg, stamp):
msg = Imu()
msg.header.stamp = stamp
msg.header.frame_id = IMU_FRAME_ID
msg.orientation_covariance[0] = -1.0
msg.angular_velocity.x = math.radians(pitch_deg)
msg.angular_velocity.y = math.radians(roll_deg)
msg.angular_velocity.z = math.radians(yaw_deg)
cov = math.radians(0.5) ** 2
msg.angular_velocity_covariance[0] = cov
msg.angular_velocity_covariance[4] = cov
msg.angular_velocity_covariance[8] = cov
msg.linear_acceleration_covariance[0] = -1.0
self._imu_pub.publish(msg)
def _publish_balance_state(
self, pitch, roll, yaw, error, integral,
motor_cmd, state, mode, stamp
):
state_label = _STATE_LABEL.get(state, f"UNKNOWN({state})")
mode_label = _MODE_LABEL.get(mode, str(mode))
payload = {
"stamp": f"{stamp.sec}.{stamp.nanosec:09d}",
"state": state_label,
"mode": mode_label,
"pitch_deg": round(pitch, 1),
"roll_deg": round(roll, 1),
"yaw_deg": round(yaw, 1),
"pid_error_deg": round(error, 1),
"integral": round(integral, 1),
"motor_cmd": int(motor_cmd),
"bridge_speed": self._last_speed_sent,
"bridge_steer": self._last_steer_sent,
"frames": self._frame_count,
"parse_errors": self._error_count,
}
str_msg = String()
str_msg.data = json.dumps(payload)
self._balance_pub.publish(str_msg)
diag = DiagnosticArray()
diag.header.stamp = stamp
status = DiagnosticStatus()
status.name = "saltybot/balance_controller"
status.hardware_id = "stm32f722"
status.message = f"{state_label} [{mode_label}]"
status.level = (
DiagnosticStatus.OK if state == 1 else
DiagnosticStatus.WARN if state == 0 else
DiagnosticStatus.ERROR
)
status.values = [
KeyValue(key="pitch_deg", value=f"{pitch:.1f}"),
KeyValue(key="roll_deg", value=f"{roll:.1f}"),
KeyValue(key="yaw_deg", value=f"{yaw:.1f}"),
KeyValue(key="pid_error", value=f"{error:.1f}"),
KeyValue(key="integral", value=f"{integral:.1f}"),
KeyValue(key="motor_cmd", value=str(int(motor_cmd))),
KeyValue(key="mode", value=mode_label),
KeyValue(key="bridge_speed", value=str(self._last_speed_sent)),
KeyValue(key="bridge_steer", value=str(self._last_steer_sent)),
KeyValue(key="state", value=state_label),
]
diag.status.append(status)
self._diag_pub.publish(diag)
def _publish_imu_fault(self, errno: int, stamp):
diag = DiagnosticArray()
diag.header.stamp = stamp
status = DiagnosticStatus()
status.level = DiagnosticStatus.ERROR
status.name = "saltybot/balance_controller"
status.hardware_id = "stm32f722"
status.message = f"IMU fault errno={errno}"
diag.status.append(status)
self._diag_pub.publish(diag)
self.get_logger().error(f"STM32 IMU fault: errno={errno}")
# ── Lifecycle ─────────────────────────────────────────────────────────────
def destroy_node(self):
self._write(b"C0,0\n") # safety: zero on shutdown
self._close_serial()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = CmdVelBridgeNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -9,8 +9,14 @@ setup(
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/bridge.launch.py"]),
(f"share/{package_name}/config", ["config/bridge_params.yaml"]),
(f"share/{package_name}/launch", [
"launch/bridge.launch.py",
"launch/cmd_vel_bridge.launch.py",
]),
(f"share/{package_name}/config", [
"config/bridge_params.yaml",
"config/cmd_vel_bridge_params.yaml",
]),
],
install_requires=["setuptools", "pyserial"],
zip_safe=True,
@ -25,6 +31,8 @@ setup(
"serial_bridge_node = saltybot_bridge.serial_bridge_node:main",
# Full bidirectional bridge: telemetry RX + /cmd_vel TX + heartbeat
"saltybot_cmd_node = saltybot_bridge.saltybot_cmd_node:main",
# Nav2 cmd_vel bridge: velocity limits + ramp + deadman + mode gate
"cmd_vel_bridge_node = saltybot_bridge.cmd_vel_bridge_node:main",
],
},
)

View File

@ -0,0 +1,238 @@
"""
Unit tests for cmd_vel_bridge_node logic.
No ROS2 runtime required tests pure functions extracted from the node.
Run with: pytest jetson/ros2_ws/src/saltybot_bridge/test/test_cmd_vel_bridge.py
"""
import pytest
# ── Pure function mirrors (no ROS2 imports) ──────────────────────────────────
def _clamp(v, lo, hi):
return max(lo, min(hi, v))
def _ramp_toward(current, target, max_step):
diff = target - current
if abs(diff) <= max_step:
return target
return current + (max_step if diff > 0 else -max_step)
def cmd_vel_to_targets(linear_x, angular_z,
max_linear_vel=0.5, max_angular_vel=2.0,
speed_scale=1000.0, steer_scale=-500.0):
"""Mirror of CmdVelBridgeNode._cmd_vel_cb target computation."""
lin_x = _clamp(linear_x, -max_linear_vel, max_linear_vel)
ang_z = _clamp(angular_z, -max_angular_vel, max_angular_vel)
speed = int(_clamp(lin_x * speed_scale, -1000.0, 1000.0))
steer = int(_clamp(ang_z * steer_scale, -1000.0, 1000.0))
return speed, steer
def apply_ramp(current_speed, current_steer, target_speed, target_steer, step):
"""Mirror of CmdVelBridgeNode._control_cb ramp logic."""
new_speed = _ramp_toward(current_speed, target_speed, step)
new_steer = _ramp_toward(current_steer, target_steer, step)
return new_speed, new_steer
# ── Velocity limit tests ──────────────────────────────────────────────────────
class TestVelocityLimits:
def test_zero_twist_zero_targets(self):
speed, steer = cmd_vel_to_targets(0.0, 0.0)
assert speed == 0
assert steer == 0
def test_within_limits_passes_through(self):
speed, steer = cmd_vel_to_targets(0.3, 1.0)
assert speed == 300
assert steer == -500 # steer_scale=-500 * 1.0
def test_speed_clamped_at_max_linear_vel(self):
# 2.0 m/s >> 0.5 m/s limit → clamp to 0.5 m/s → 500 ESC
speed, _ = cmd_vel_to_targets(2.0, 0.0)
assert speed == 500
def test_speed_clamped_at_min_linear_vel(self):
speed, _ = cmd_vel_to_targets(-2.0, 0.0)
assert speed == -500
def test_angular_clamped_at_max(self):
# 5.0 rad/s >> 2.0 rad/s limit → 2.0 * -500 = -1000
_, steer = cmd_vel_to_targets(0.0, 5.0)
assert steer == -1000
def test_angular_clamped_at_min(self):
_, steer = cmd_vel_to_targets(0.0, -5.0)
assert steer == 1000
def test_full_forward_at_max_limit(self):
# Exactly at limit: 0.5 m/s * 1000 = 500
speed, _ = cmd_vel_to_targets(0.5, 0.0)
assert speed == 500
def test_custom_limits(self):
speed, _ = cmd_vel_to_targets(1.0, 0.0,
max_linear_vel=1.0, speed_scale=1000.0)
assert speed == 1000
def test_negative_steer_scale_flips_sign(self):
# +angular.z (CCW/left) → negative ESC steer (right turn in ESC convention)
_, steer = cmd_vel_to_targets(0.0, 1.0, steer_scale=-500.0)
assert steer == -500
def test_positive_steer_scale_no_flip(self):
_, steer = cmd_vel_to_targets(0.0, 1.0, steer_scale=500.0)
assert steer == 500
# ── Ramp tests ────────────────────────────────────────────────────────────────
class TestRamp:
def test_ramp_toward_positive_target(self):
# step=10: 0 → 10 in one tick
new_s, new_t = apply_ramp(0, 0, 100, 0, step=10)
assert new_s == 10
assert new_t == 0
def test_ramp_does_not_overshoot(self):
# step=10, gap=5: should snap to target
new_s, _ = apply_ramp(95, 0, 100, 0, step=10)
assert new_s == 100
def test_ramp_toward_zero(self):
new_s, _ = apply_ramp(50, 0, 0, 0, step=10)
assert new_s == 40
def test_ramp_toward_negative(self):
new_s, _ = apply_ramp(0, 0, -30, 0, step=10)
assert new_s == -10
def test_ramp_already_at_target(self):
new_s, new_t = apply_ramp(100, -200, 100, -200, step=10)
assert new_s == 100
assert new_t == -200
def test_ramp_rate_500_at_50hz(self):
# ramp_rate=500 ESC/s, 50Hz → step=10 per tick
step = 500 // 50
assert step == 10
# 50 ticks to go 0→500
speed = 0
for _ in range(50):
speed = _ramp_toward(speed, 500, step)
assert speed == 500
def test_ramp_both_channels_independent(self):
new_s, new_t = apply_ramp(0, 0, 100, -100, step=10)
assert new_s == 10
assert new_t == -10
# ── Mode gate tests ───────────────────────────────────────────────────────────
class TestModeGate:
MODE_MANUAL = 0
MODE_ASSISTED = 1
MODE_AUTONOMOUS = 2
def _apply_mode_gate(self, stm32_mode, current_speed, current_steer,
target_speed, target_steer, step=10):
"""Mirror of _control_cb mode gate logic."""
if stm32_mode != self.MODE_AUTONOMOUS:
# Reset ramp state, send zero
return 0, 0, 0, 0 # (current_speed, current_steer, sent_speed, sent_steer)
new_s = _ramp_toward(current_speed, target_speed, step)
new_t = _ramp_toward(current_steer, target_steer, step)
return new_s, new_t, new_s, new_t
def test_manual_mode_sends_zero(self):
_, _, sent_s, sent_t = self._apply_mode_gate(
self.MODE_MANUAL, 100, -50, 500, -300)
assert sent_s == 0
assert sent_t == 0
def test_assisted_mode_sends_zero(self):
_, _, sent_s, sent_t = self._apply_mode_gate(
self.MODE_ASSISTED, 100, -50, 500, -300)
assert sent_s == 0
assert sent_t == 0
def test_autonomous_mode_ramps(self):
_, _, sent_s, sent_t = self._apply_mode_gate(
self.MODE_AUTONOMOUS, 0, 0, 500, -300, step=10)
assert sent_s == 10
assert sent_t == -10
def test_mode_gate_resets_ramp_state(self):
# While in MANUAL, current is reset to 0
cur_s, cur_t, _, _ = self._apply_mode_gate(
self.MODE_MANUAL, 500, -200, 500, -200)
assert cur_s == 0
assert cur_t == 0
# Re-entering AUTONOMOUS ramps from 0, not from 500
_, _, sent_s, _ = self._apply_mode_gate(
self.MODE_AUTONOMOUS, cur_s, cur_t, 500, 0, step=10)
assert sent_s == 10 # ramped from 0, not jumped to 500
# ── Deadman switch tests ──────────────────────────────────────────────────────
class TestDeadman:
def test_deadman_zeros_targets_on_timeout(self):
"""Simulate no /cmd_vel for > timeout → targets zeroed."""
target_speed = 500
target_steer = -300
# last_cmd_vel was 1.0s ago, timeout is 0.5s → trigger
last_cmd_vel = 0.0
now = 0.6 # 0.6s elapsed
timeout = 0.5
if last_cmd_vel > 0.0 and now - last_cmd_vel > timeout:
target_speed = 0
target_steer = 0
assert target_speed == 0
assert target_steer == 0
def test_deadman_does_not_trigger_before_timeout(self):
target_speed = 500
last_cmd_vel = 0.0
now = 0.3 # only 0.3s elapsed
timeout = 0.5
if last_cmd_vel > 0.0 and now - last_cmd_vel > timeout:
target_speed = 0
assert target_speed == 500 # unchanged
def test_deadman_does_not_trigger_at_t0_before_first_cmd(self):
# last_cmd_vel==0 means no /cmd_vel ever received — don't zero yet
target_speed = 0
last_cmd_vel = 0.0 # never received
now = 10.0
timeout = 0.5
if last_cmd_vel > 0.0 and now - last_cmd_vel > timeout:
target_speed = 999 # should not reach here
assert target_speed == 0
# ── Frame format ──────────────────────────────────────────────────────────────
class TestFrameFormat:
def test_zero_frame(self):
frame = f"C{0},{0}\n".encode("ascii")
assert frame == b"C0,0\n"
def test_drive_frame(self):
speed, steer = cmd_vel_to_targets(0.3, -1.0)
frame = f"C{speed},{steer}\n".encode("ascii")
assert frame == b"C300,500\n"
def test_heartbeat_frame(self):
assert b"H\n" == b"H\n"