From a50f22d56bc6003261d3c882db694744b1558cf3 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Sat, 28 Feb 2026 22:50:15 -0500 Subject: [PATCH] feat: Nav2 cmd_vel to STM32 autonomous drive bridge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds cmd_vel_bridge_node — a standalone ROS2 node that subscribes to Nav2 /cmd_vel and drives the STM32 over USB CDC with: - Hard velocity limits (max_linear_vel=0.5 m/s, max_angular_vel=2.0 rad/s) - Smooth ESC ramp (500 ESC-units/s, 50 Hz control loop) - Deadman switch: zeros targets if /cmd_vel silent >500 ms - Mode gate: sends drive only when STM32 reports md=2 (AUTONOMOUS) - Telemetry RX → /saltybot/imu, /saltybot/balance_state, /diagnostics - Heartbeat TX every 200 ms (H\n) Deliverables: saltybot_bridge/cmd_vel_bridge_node.py — node implementation config/cmd_vel_bridge_params.yaml — tunable parameters launch/cmd_vel_bridge.launch.py — standalone launch file test/test_cmd_vel_bridge.py — 37 pytest unit tests (no ROS2) setup.py — register node + new data files Co-Authored-By: Claude Sonnet 4.6 --- .../config/cmd_vel_bridge_params.yaml | 54 +++ .../launch/cmd_vel_bridge.launch.py | 100 ++++ .../saltybot_bridge/cmd_vel_bridge_node.py | 436 ++++++++++++++++++ jetson/ros2_ws/src/saltybot_bridge/setup.py | 12 +- .../test/test_cmd_vel_bridge.py | 238 ++++++++++ 5 files changed, 838 insertions(+), 2 deletions(-) create mode 100644 jetson/ros2_ws/src/saltybot_bridge/config/cmd_vel_bridge_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_bridge/launch/cmd_vel_bridge.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py create mode 100644 jetson/ros2_ws/src/saltybot_bridge/test/test_cmd_vel_bridge.py diff --git a/jetson/ros2_ws/src/saltybot_bridge/config/cmd_vel_bridge_params.yaml b/jetson/ros2_ws/src/saltybot_bridge/config/cmd_vel_bridge_params.yaml new file mode 100644 index 0000000..3a82ae2 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bridge/config/cmd_vel_bridge_params.yaml @@ -0,0 +1,54 @@ +# cmd_vel_bridge_params.yaml +# Configuration for cmd_vel_bridge_node — Nav2 /cmd_vel → STM32 autonomous drive. +# +# Run with: +# ros2 launch saltybot_bridge cmd_vel_bridge.launch.py +# Or override individual params: +# ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=0.3 + +# ── Serial ───────────────────────────────────────────────────────────────────── +# Use /dev/stm32-bridge if udev rule from jetson/docs/pinout.md is applied. +serial_port: /dev/ttyACM0 +baud_rate: 921600 +timeout: 0.05 # serial readline timeout (s) +reconnect_delay: 2.0 # seconds between reconnect attempts + +# ── Heartbeat ────────────────────────────────────────────────────────────────── +# STM32 jetson_cmd module reverts steer to 0 after JETSON_HB_TIMEOUT_MS (500ms). +# Keep heartbeat well below that threshold. +heartbeat_period: 0.2 # seconds (200ms) + +# ── Velocity limits ──────────────────────────────────────────────────────────── +# Hard caps applied BEFORE scaling. Prevents Nav2 planner or param tuning errors +# from commanding unsafe speeds on the balance robot. +# +# Conservative defaults — increase only after verifying balance stability. +max_linear_vel: 0.5 # m/s (0.5 = cautious first deployment) +max_angular_vel: 2.0 # rad/s + +# ── Scaling: physical units → ESC counts (-1000..+1000) ─────────────────────── +# speed = clamp(linear.x * speed_scale, -1000, 1000) [after limit clamp] +# steer = clamp(angular.z * steer_scale, -1000, 1000) +# +# speed_scale: 1000.0 means 1 m/s → 1000 ESC units (full scale at max_linear_vel=1.0) +# At max_linear_vel=0.5: max speed output = 500 ESC units. +# steer_scale: negative because ROS2 +angular.z = CCW (left) but ESC positive +# steer typically means right-turn. Flip sign if robot steers backwards. +speed_scale: 1000.0 +steer_scale: -500.0 + +# ── Acceleration ramp ────────────────────────────────────────────────────────── +# Maximum ESC-unit change per second (applied to both speed and steer). +# Control loop runs at 50 Hz → step per tick = ramp_rate / 50. +# +# ramp_rate=500: 0 → 500 ESC (= 0.5 m/s at speed_scale=1000) takes 1.0s +# ramp_rate=250: same ramp takes 2.0s (gentler, safer for tuning) +# +# Lower values = smoother but slower response to Nav2 waypoint commands. +ramp_rate: 500 # ESC units/second + +# ── Deadman switch ───────────────────────────────────────────────────────────── +# If /cmd_vel is not received for this many seconds, target speed/steer are +# zeroed immediately. The ramp then drives the robot to a stop. +# 500ms matches the STM32 jetson heartbeat timeout for consistency. +cmd_vel_timeout: 0.5 # seconds diff --git a/jetson/ros2_ws/src/saltybot_bridge/launch/cmd_vel_bridge.launch.py b/jetson/ros2_ws/src/saltybot_bridge/launch/cmd_vel_bridge.launch.py new file mode 100644 index 0000000..284bff6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bridge/launch/cmd_vel_bridge.launch.py @@ -0,0 +1,100 @@ +""" +cmd_vel_bridge.launch.py — Nav2 cmd_vel → STM32 autonomous drive bridge. + +Starts cmd_vel_bridge_node, which owns the serial port exclusively and provides: + - /cmd_vel subscription with velocity limits + smooth ramp + - Deadman switch (zero speed if /cmd_vel silent > cmd_vel_timeout) + - Mode gate (drives only when STM32 is in AUTONOMOUS mode, md=2) + - Telemetry RX → /saltybot/imu, /saltybot/balance_state, /diagnostics + - /saltybot/cmd publisher (observability) + +Do NOT run alongside serial_bridge_node or saltybot_cmd_node on the same port. + +Usage: + # Defaults (max 0.5 m/s, 2.0 rad/s, 1s ramp, 500ms deadman): + ros2 launch saltybot_bridge cmd_vel_bridge.launch.py + + # Override speed limit for cautious tuning: + ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=0.3 + + # Load all params from YAML: + ros2 launch saltybot_bridge cmd_vel_bridge.launch.py \\ + params_file:=/path/to/cmd_vel_bridge_params.yaml +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument, OpaqueFunction +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def _launch_bridge(context, *args, **kwargs): + params_file = LaunchConfiguration("params_file").perform(context) + + # Build parameter dict from individual launch args (allow YAML override) + inline_params = { + "serial_port": LaunchConfiguration("serial_port").perform(context), + "baud_rate": int(LaunchConfiguration("baud_rate").perform(context)), + "timeout": 0.05, + "reconnect_delay": 2.0, + "heartbeat_period":float(LaunchConfiguration("heartbeat_period").perform(context)), + "max_linear_vel": float(LaunchConfiguration("max_linear_vel").perform(context)), + "max_angular_vel": float(LaunchConfiguration("max_angular_vel").perform(context)), + "speed_scale": float(LaunchConfiguration("speed_scale").perform(context)), + "steer_scale": float(LaunchConfiguration("steer_scale").perform(context)), + "ramp_rate": int(LaunchConfiguration("ramp_rate").perform(context)), + "cmd_vel_timeout": float(LaunchConfiguration("cmd_vel_timeout").perform(context)), + } + + # If a params file is provided, it takes precedence (ROS2 merges inline last) + node_params = [params_file, inline_params] if params_file else [inline_params] + + return [Node( + package="saltybot_bridge", + executable="cmd_vel_bridge_node", + name="cmd_vel_bridge", + output="screen", + parameters=node_params, + )] + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_bridge") + default_params = os.path.join(pkg_share, "config", "cmd_vel_bridge_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", + default_value=default_params, + description="Full path to cmd_vel_bridge_params.yaml (overrides inline args)"), + DeclareLaunchArgument( + "serial_port", default_value="/dev/ttyACM0", + description="STM32 USB CDC device node"), + DeclareLaunchArgument( + "baud_rate", default_value="921600"), + DeclareLaunchArgument( + "heartbeat_period",default_value="0.2", + description="Heartbeat interval (s); must be < STM32 HB timeout (0.5s)"), + DeclareLaunchArgument( + "max_linear_vel", default_value="0.5", + description="Hard speed cap before scaling (m/s)"), + DeclareLaunchArgument( + "max_angular_vel", default_value="2.0", + description="Hard angular rate cap before scaling (rad/s)"), + DeclareLaunchArgument( + "speed_scale", default_value="1000.0", + description="m/s → ESC units (linear.x)"), + DeclareLaunchArgument( + "steer_scale", default_value="-500.0", + description="rad/s → ESC units (angular.z, neg=flip CCW)"), + DeclareLaunchArgument( + "ramp_rate", default_value="500", + description="Max ESC-unit change per second (acceleration limit)"), + DeclareLaunchArgument( + "cmd_vel_timeout", default_value="0.5", + description="Deadman: zero targets if /cmd_vel silent this long (s)"), + OpaqueFunction(function=_launch_bridge), + ]) diff --git a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py new file mode 100644 index 0000000..993d2f4 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py @@ -0,0 +1,436 @@ +""" +cmd_vel_bridge_node — Nav2 /cmd_vel → STM32 drive command bridge. + +Extends the basic saltybot_cmd_node with four additions required for safe +autonomous operation on a self-balancing robot: + + 1. Velocity limits — clamp linear.x / angular.z before scaling so Nav2 + planner misconfiguration can't command unsafe speed. + 2. Smooth ramp — interpolate ESC commands toward target at ramp_rate + ESC-units/s to prevent the sudden torque changes that + topple a balance robot. + 3. Deadman switch — if /cmd_vel is silent for cmd_vel_timeout seconds, + zero targets immediately (Nav2 node crash / planner + stall → robot coasts to stop rather than running away). + 4. Mode gate — only issue non-zero drive commands when STM32 reports + md=2 (AUTONOMOUS). In any other mode (RC_MANUAL, + RC_ASSISTED) Jetson cannot override the RC pilot. + On mode re-entry current ramp state resets to 0 so + acceleration is always smooth from rest. + +Serial protocol (C,\\n / H\\n — same as saltybot_cmd_node): + C,\\n — drive command. speed/steer: -1000..+1000 integers. + H\\n — heartbeat. STM32 reverts steer to 0 after 500ms silence. + +Telemetry (50 Hz from STM32): + Same RX/publish pipeline as saltybot_cmd_node. + The "md" field (0=MANUAL,1=ASSISTED,2=AUTO) is parsed for the mode gate. + +Topics published: + /saltybot/imu sensor_msgs/Imu + /saltybot/balance_state std_msgs/String (JSON) + /saltybot/cmd std_msgs/String — formatted command for observability + /diagnostics diagnostic_msgs/DiagnosticArray + +Topics subscribed: + /cmd_vel geometry_msgs/Twist + +This node owns the serial port exclusively. Do NOT run alongside +serial_bridge_node or saltybot_cmd_node on the same port. +""" + +import json +import math +import threading +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy + +import serial + +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue +from geometry_msgs.msg import Twist +from sensor_msgs.msg import Imu +from std_msgs.msg import String + +_STATE_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"} +_MODE_LABEL = {0: "RC_MANUAL", 1: "RC_ASSISTED", 2: "AUTONOMOUS"} +IMU_FRAME_ID = "imu_link" +MODE_AUTONOMOUS = 2 + + +def _clamp(v: float, lo: float, hi: float) -> float: + return max(lo, min(hi, v)) + + +def _ramp_toward(current: int, target: int, max_step: int) -> int: + """Step `current` toward `target` by at most `max_step`.""" + diff = target - current + if abs(diff) <= max_step: + return target + return current + (max_step if diff > 0 else -max_step) + + +class CmdVelBridgeNode(Node): + def __init__(self): + super().__init__("cmd_vel_bridge") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("serial_port", "/dev/ttyACM0") + self.declare_parameter("baud_rate", 921600) + self.declare_parameter("timeout", 0.05) + self.declare_parameter("reconnect_delay", 2.0) + self.declare_parameter("heartbeat_period", 0.2) + # Velocity limits (applied before scale → hard cap on robot speed) + self.declare_parameter("max_linear_vel", 0.5) # m/s + self.declare_parameter("max_angular_vel", 2.0) # rad/s + # Scaling: physical units → ESC counts (-1000..+1000) + self.declare_parameter("speed_scale", 1000.0) # m/s → ESC + self.declare_parameter("steer_scale", -500.0) # rad/s→ ESC (neg=flip CCW→right) + # Ramp: max ESC-unit change per second (both speed and steer) + self.declare_parameter("ramp_rate", 500) # ESC units/s + # Deadman: zero targets if /cmd_vel silent longer than this + self.declare_parameter("cmd_vel_timeout", 0.5) # seconds + + port = self.get_parameter("serial_port").value + baud = self.get_parameter("baud_rate").value + timeout = self.get_parameter("timeout").value + self._reconnect_delay = self.get_parameter("reconnect_delay").value + self._hb_period = self.get_parameter("heartbeat_period").value + self._max_linear_vel = self.get_parameter("max_linear_vel").value + self._max_angular_vel = self.get_parameter("max_angular_vel").value + self._speed_scale = self.get_parameter("speed_scale").value + self._steer_scale = self.get_parameter("steer_scale").value + self._ramp_rate = int(self.get_parameter("ramp_rate").value) + self._cmd_vel_timeout = self.get_parameter("cmd_vel_timeout").value + + # Control loop runs at 50 Hz; ramp step per tick = ramp_rate / 50 + _CONTROL_HZ = 50 + self._ramp_step = max(1, self._ramp_rate // _CONTROL_HZ) + + # ── QoS ─────────────────────────────────────────────────────────────── + sensor_qos = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, depth=10) + reliable_qos = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, depth=10) + + # ── Publishers ──────────────────────────────────────────────────────── + self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos) + self._balance_pub = self.create_publisher(String, "/saltybot/balance_state", reliable_qos) + self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", reliable_qos) + self._cmd_pub = self.create_publisher(String, "/saltybot/cmd", reliable_qos) + + # ── Subscription ────────────────────────────────────────────────────── + self._cmd_vel_sub = self.create_subscription( + Twist, "/cmd_vel", self._cmd_vel_cb, reliable_qos) + + # ── Drive state ─────────────────────────────────────────────────────── + self._target_speed = 0 # desired ESC units from latest /cmd_vel + self._target_steer = 0 + self._current_speed = 0 # ramped output actually sent + self._current_steer = 0 + self._last_cmd_vel = 0.0 # wall clock (seconds) of last /cmd_vel msg + self._stm32_mode = 0 # parsed "md" field: 0=MANUAL,1=ASSISTED,2=AUTO + self._last_state = -1 + self._frame_count = 0 + self._error_count = 0 + self._last_speed_sent = 0 + self._last_steer_sent = 0 + + # ── Serial ──────────────────────────────────────────────────────────── + self._port_name = port + self._baud = baud + self._timeout = timeout + self._ser: serial.Serial | None = None + self._ser_lock = threading.Lock() + self._open_serial() + + # ── Timers ──────────────────────────────────────────────────────────── + # Telemetry read at 100 Hz (STM32 sends at 50 Hz) + self._read_timer = self.create_timer(0.01, self._read_cb) + # Control loop at 50 Hz: ramp + deadman + mode gate + send + self._control_timer = self.create_timer(1.0 / _CONTROL_HZ, self._control_cb) + # Heartbeat TX + self._hb_timer = self.create_timer(self._hb_period, self._heartbeat_cb) + + self.get_logger().info( + f"cmd_vel_bridge started — {port} @ {baud} baud | " + f"max_vel={self._max_linear_vel:.1f}m/s " + f"max_ang={self._max_angular_vel:.1f}rad/s " + f"ramp={self._ramp_rate}ESC/s " + f"deadman={self._cmd_vel_timeout:.1f}s" + ) + + # ── Serial management ───────────────────────────────────────────────────── + + def _open_serial(self) -> bool: + with self._ser_lock: + try: + self._ser = serial.Serial( + port=self._port_name, baudrate=self._baud, + timeout=self._timeout) + self._ser.reset_input_buffer() + self.get_logger().info(f"Serial open: {self._port_name}") + return True + except serial.SerialException as exc: + self.get_logger().error(f"Cannot open {self._port_name}: {exc}") + self._ser = None + return False + + def _close_serial(self): + with self._ser_lock: + if self._ser and self._ser.is_open: + try: + self._ser.close() + except Exception: + pass + self._ser = None + + def _write(self, data: bytes) -> bool: + with self._ser_lock: + if self._ser is None or not self._ser.is_open: + return False + try: + self._ser.write(data) + return True + except serial.SerialException as exc: + self.get_logger().error(f"Serial write error: {exc}") + self._ser = None + return False + + # ── /cmd_vel callback ───────────────────────────────────────────────────── + + def _cmd_vel_cb(self, msg: Twist): + """Clamp velocity to limits then convert to ESC units.""" + lin_x = _clamp(msg.linear.x, -self._max_linear_vel, self._max_linear_vel) + ang_z = _clamp(msg.angular.z, -self._max_angular_vel, self._max_angular_vel) + self._target_speed = int(_clamp(lin_x * self._speed_scale, -1000.0, 1000.0)) + self._target_steer = int(_clamp(ang_z * self._steer_scale, -1000.0, 1000.0)) + self._last_cmd_vel = time.monotonic() + + # ── 50 Hz control loop ──────────────────────────────────────────────────── + + def _control_cb(self): + now = time.monotonic() + + # Deadman: zero targets if /cmd_vel went silent + if (self._last_cmd_vel > 0.0 and + now - self._last_cmd_vel > self._cmd_vel_timeout): + self._target_speed = 0 + self._target_steer = 0 + + # Mode gate: in non-AUTONOMOUS mode, zero and reset ramp state so + # re-entry always accelerates smoothly from 0. + if self._stm32_mode != MODE_AUTONOMOUS: + self._current_speed = 0 + self._current_steer = 0 + speed, steer = 0, 0 + else: + # Smooth ramp toward targets + self._current_speed = _ramp_toward( + self._current_speed, self._target_speed, self._ramp_step) + self._current_steer = _ramp_toward( + self._current_steer, self._target_steer, self._ramp_step) + speed = self._current_speed + steer = self._current_steer + + # Send to STM32 + frame = f"C{speed},{steer}\n".encode("ascii") + if not self._write(frame): + self.get_logger().warn( + "Cannot send cmd — serial not open", + throttle_duration_sec=2.0) + + self._last_speed_sent = speed + self._last_steer_sent = steer + + # Publish command for observability / rosbag + cmd_msg = String() + cmd_msg.data = f"C{speed},{steer}" + self._cmd_pub.publish(cmd_msg) + + # ── Heartbeat TX ────────────────────────────────────────────────────────── + + def _heartbeat_cb(self): + """H\\n keeps STM32 jetson_cmd heartbeat alive regardless of mode.""" + self._write(b"H\n") + + # ── Telemetry RX ────────────────────────────────────────────────────────── + + def _read_cb(self): + with self._ser_lock: + if self._ser is None or not self._ser.is_open: + pass + else: + try: + lines = [] + while self._ser.in_waiting: + raw = self._ser.readline() + if raw: + lines.append(raw) + except serial.SerialException as exc: + self.get_logger().error(f"Serial read error: {exc}") + self._ser = None + lines = [] + + for raw in lines: + self._parse_and_publish(raw) + + with self._ser_lock: + if self._ser is None: + self.get_logger().warn( + "Serial lost — reconnecting…", + throttle_duration_sec=self._reconnect_delay) + if self._ser is None: + self._open_serial() + + def _parse_and_publish(self, raw: bytes): + try: + text = raw.decode("ascii", errors="ignore").strip() + if not text: + return + data = json.loads(text) + except (ValueError, UnicodeDecodeError) as exc: + self.get_logger().debug(f"Parse error ({exc}): {raw!r}") + self._error_count += 1 + return + + now = self.get_clock().now().to_msg() + + if "err" in data: + self._publish_imu_fault(data["err"], now) + return + + required = ("p", "r", "e", "ig", "m", "s", "y") + if not all(k in data for k in required): + self.get_logger().debug(f"Incomplete frame: {text}") + return + + pitch_deg = data["p"] / 10.0 + roll_deg = data["r"] / 10.0 + yaw_deg = data["y"] / 10.0 + error_deg = data["e"] / 10.0 + integral = data["ig"] / 10.0 + motor_cmd = float(data["m"]) + state = int(data["s"]) + mode = int(data.get("md", 0)) # 0=MANUAL if not present + + self._stm32_mode = mode + self._frame_count += 1 + + self._publish_imu(pitch_deg, roll_deg, yaw_deg, now) + self._publish_balance_state( + pitch_deg, roll_deg, yaw_deg, + error_deg, integral, motor_cmd, state, mode, now) + + if state != self._last_state: + self.get_logger().info( + f"Balance state → {_STATE_LABEL.get(state, f'UNKNOWN({state})')}" + f" mode={_MODE_LABEL.get(mode, str(mode))}" + ) + self._last_state = state + + def _publish_imu(self, pitch_deg, roll_deg, yaw_deg, stamp): + msg = Imu() + msg.header.stamp = stamp + msg.header.frame_id = IMU_FRAME_ID + msg.orientation_covariance[0] = -1.0 + msg.angular_velocity.x = math.radians(pitch_deg) + msg.angular_velocity.y = math.radians(roll_deg) + msg.angular_velocity.z = math.radians(yaw_deg) + cov = math.radians(0.5) ** 2 + msg.angular_velocity_covariance[0] = cov + msg.angular_velocity_covariance[4] = cov + msg.angular_velocity_covariance[8] = cov + msg.linear_acceleration_covariance[0] = -1.0 + self._imu_pub.publish(msg) + + def _publish_balance_state( + self, pitch, roll, yaw, error, integral, + motor_cmd, state, mode, stamp + ): + state_label = _STATE_LABEL.get(state, f"UNKNOWN({state})") + mode_label = _MODE_LABEL.get(mode, str(mode)) + payload = { + "stamp": f"{stamp.sec}.{stamp.nanosec:09d}", + "state": state_label, + "mode": mode_label, + "pitch_deg": round(pitch, 1), + "roll_deg": round(roll, 1), + "yaw_deg": round(yaw, 1), + "pid_error_deg": round(error, 1), + "integral": round(integral, 1), + "motor_cmd": int(motor_cmd), + "bridge_speed": self._last_speed_sent, + "bridge_steer": self._last_steer_sent, + "frames": self._frame_count, + "parse_errors": self._error_count, + } + str_msg = String() + str_msg.data = json.dumps(payload) + self._balance_pub.publish(str_msg) + + diag = DiagnosticArray() + diag.header.stamp = stamp + status = DiagnosticStatus() + status.name = "saltybot/balance_controller" + status.hardware_id = "stm32f722" + status.message = f"{state_label} [{mode_label}]" + status.level = ( + DiagnosticStatus.OK if state == 1 else + DiagnosticStatus.WARN if state == 0 else + DiagnosticStatus.ERROR + ) + status.values = [ + KeyValue(key="pitch_deg", value=f"{pitch:.1f}"), + KeyValue(key="roll_deg", value=f"{roll:.1f}"), + KeyValue(key="yaw_deg", value=f"{yaw:.1f}"), + KeyValue(key="pid_error", value=f"{error:.1f}"), + KeyValue(key="integral", value=f"{integral:.1f}"), + KeyValue(key="motor_cmd", value=str(int(motor_cmd))), + KeyValue(key="mode", value=mode_label), + KeyValue(key="bridge_speed", value=str(self._last_speed_sent)), + KeyValue(key="bridge_steer", value=str(self._last_steer_sent)), + KeyValue(key="state", value=state_label), + ] + diag.status.append(status) + self._diag_pub.publish(diag) + + def _publish_imu_fault(self, errno: int, stamp): + diag = DiagnosticArray() + diag.header.stamp = stamp + status = DiagnosticStatus() + status.level = DiagnosticStatus.ERROR + status.name = "saltybot/balance_controller" + status.hardware_id = "stm32f722" + status.message = f"IMU fault errno={errno}" + diag.status.append(status) + self._diag_pub.publish(diag) + self.get_logger().error(f"STM32 IMU fault: errno={errno}") + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + def destroy_node(self): + self._write(b"C0,0\n") # safety: zero on shutdown + self._close_serial() + super().destroy_node() + + +def main(args=None): + rclpy.init(args=args) + node = CmdVelBridgeNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_bridge/setup.py b/jetson/ros2_ws/src/saltybot_bridge/setup.py index a02acf3..33626da 100644 --- a/jetson/ros2_ws/src/saltybot_bridge/setup.py +++ b/jetson/ros2_ws/src/saltybot_bridge/setup.py @@ -9,8 +9,14 @@ setup( data_files=[ ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), (f"share/{package_name}", ["package.xml"]), - (f"share/{package_name}/launch", ["launch/bridge.launch.py"]), - (f"share/{package_name}/config", ["config/bridge_params.yaml"]), + (f"share/{package_name}/launch", [ + "launch/bridge.launch.py", + "launch/cmd_vel_bridge.launch.py", + ]), + (f"share/{package_name}/config", [ + "config/bridge_params.yaml", + "config/cmd_vel_bridge_params.yaml", + ]), ], install_requires=["setuptools", "pyserial"], zip_safe=True, @@ -25,6 +31,8 @@ setup( "serial_bridge_node = saltybot_bridge.serial_bridge_node:main", # Full bidirectional bridge: telemetry RX + /cmd_vel TX + heartbeat "saltybot_cmd_node = saltybot_bridge.saltybot_cmd_node:main", + # Nav2 cmd_vel bridge: velocity limits + ramp + deadman + mode gate + "cmd_vel_bridge_node = saltybot_bridge.cmd_vel_bridge_node:main", ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_bridge/test/test_cmd_vel_bridge.py b/jetson/ros2_ws/src/saltybot_bridge/test/test_cmd_vel_bridge.py new file mode 100644 index 0000000..870dcf0 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bridge/test/test_cmd_vel_bridge.py @@ -0,0 +1,238 @@ +""" +Unit tests for cmd_vel_bridge_node logic. +No ROS2 runtime required — tests pure functions extracted from the node. +Run with: pytest jetson/ros2_ws/src/saltybot_bridge/test/test_cmd_vel_bridge.py +""" + +import pytest + + +# ── Pure function mirrors (no ROS2 imports) ────────────────────────────────── + +def _clamp(v, lo, hi): + return max(lo, min(hi, v)) + + +def _ramp_toward(current, target, max_step): + diff = target - current + if abs(diff) <= max_step: + return target + return current + (max_step if diff > 0 else -max_step) + + +def cmd_vel_to_targets(linear_x, angular_z, + max_linear_vel=0.5, max_angular_vel=2.0, + speed_scale=1000.0, steer_scale=-500.0): + """Mirror of CmdVelBridgeNode._cmd_vel_cb target computation.""" + lin_x = _clamp(linear_x, -max_linear_vel, max_linear_vel) + ang_z = _clamp(angular_z, -max_angular_vel, max_angular_vel) + speed = int(_clamp(lin_x * speed_scale, -1000.0, 1000.0)) + steer = int(_clamp(ang_z * steer_scale, -1000.0, 1000.0)) + return speed, steer + + +def apply_ramp(current_speed, current_steer, target_speed, target_steer, step): + """Mirror of CmdVelBridgeNode._control_cb ramp logic.""" + new_speed = _ramp_toward(current_speed, target_speed, step) + new_steer = _ramp_toward(current_steer, target_steer, step) + return new_speed, new_steer + + +# ── Velocity limit tests ────────────────────────────────────────────────────── + +class TestVelocityLimits: + def test_zero_twist_zero_targets(self): + speed, steer = cmd_vel_to_targets(0.0, 0.0) + assert speed == 0 + assert steer == 0 + + def test_within_limits_passes_through(self): + speed, steer = cmd_vel_to_targets(0.3, 1.0) + assert speed == 300 + assert steer == -500 # steer_scale=-500 * 1.0 + + def test_speed_clamped_at_max_linear_vel(self): + # 2.0 m/s >> 0.5 m/s limit → clamp to 0.5 m/s → 500 ESC + speed, _ = cmd_vel_to_targets(2.0, 0.0) + assert speed == 500 + + def test_speed_clamped_at_min_linear_vel(self): + speed, _ = cmd_vel_to_targets(-2.0, 0.0) + assert speed == -500 + + def test_angular_clamped_at_max(self): + # 5.0 rad/s >> 2.0 rad/s limit → 2.0 * -500 = -1000 + _, steer = cmd_vel_to_targets(0.0, 5.0) + assert steer == -1000 + + def test_angular_clamped_at_min(self): + _, steer = cmd_vel_to_targets(0.0, -5.0) + assert steer == 1000 + + def test_full_forward_at_max_limit(self): + # Exactly at limit: 0.5 m/s * 1000 = 500 + speed, _ = cmd_vel_to_targets(0.5, 0.0) + assert speed == 500 + + def test_custom_limits(self): + speed, _ = cmd_vel_to_targets(1.0, 0.0, + max_linear_vel=1.0, speed_scale=1000.0) + assert speed == 1000 + + def test_negative_steer_scale_flips_sign(self): + # +angular.z (CCW/left) → negative ESC steer (right turn in ESC convention) + _, steer = cmd_vel_to_targets(0.0, 1.0, steer_scale=-500.0) + assert steer == -500 + + def test_positive_steer_scale_no_flip(self): + _, steer = cmd_vel_to_targets(0.0, 1.0, steer_scale=500.0) + assert steer == 500 + + +# ── Ramp tests ──────────────────────────────────────────────────────────────── + +class TestRamp: + def test_ramp_toward_positive_target(self): + # step=10: 0 → 10 in one tick + new_s, new_t = apply_ramp(0, 0, 100, 0, step=10) + assert new_s == 10 + assert new_t == 0 + + def test_ramp_does_not_overshoot(self): + # step=10, gap=5: should snap to target + new_s, _ = apply_ramp(95, 0, 100, 0, step=10) + assert new_s == 100 + + def test_ramp_toward_zero(self): + new_s, _ = apply_ramp(50, 0, 0, 0, step=10) + assert new_s == 40 + + def test_ramp_toward_negative(self): + new_s, _ = apply_ramp(0, 0, -30, 0, step=10) + assert new_s == -10 + + def test_ramp_already_at_target(self): + new_s, new_t = apply_ramp(100, -200, 100, -200, step=10) + assert new_s == 100 + assert new_t == -200 + + def test_ramp_rate_500_at_50hz(self): + # ramp_rate=500 ESC/s, 50Hz → step=10 per tick + step = 500 // 50 + assert step == 10 + # 50 ticks to go 0→500 + speed = 0 + for _ in range(50): + speed = _ramp_toward(speed, 500, step) + assert speed == 500 + + def test_ramp_both_channels_independent(self): + new_s, new_t = apply_ramp(0, 0, 100, -100, step=10) + assert new_s == 10 + assert new_t == -10 + + +# ── Mode gate tests ─────────────────────────────────────────────────────────── + +class TestModeGate: + MODE_MANUAL = 0 + MODE_ASSISTED = 1 + MODE_AUTONOMOUS = 2 + + def _apply_mode_gate(self, stm32_mode, current_speed, current_steer, + target_speed, target_steer, step=10): + """Mirror of _control_cb mode gate logic.""" + if stm32_mode != self.MODE_AUTONOMOUS: + # Reset ramp state, send zero + return 0, 0, 0, 0 # (current_speed, current_steer, sent_speed, sent_steer) + new_s = _ramp_toward(current_speed, target_speed, step) + new_t = _ramp_toward(current_steer, target_steer, step) + return new_s, new_t, new_s, new_t + + def test_manual_mode_sends_zero(self): + _, _, sent_s, sent_t = self._apply_mode_gate( + self.MODE_MANUAL, 100, -50, 500, -300) + assert sent_s == 0 + assert sent_t == 0 + + def test_assisted_mode_sends_zero(self): + _, _, sent_s, sent_t = self._apply_mode_gate( + self.MODE_ASSISTED, 100, -50, 500, -300) + assert sent_s == 0 + assert sent_t == 0 + + def test_autonomous_mode_ramps(self): + _, _, sent_s, sent_t = self._apply_mode_gate( + self.MODE_AUTONOMOUS, 0, 0, 500, -300, step=10) + assert sent_s == 10 + assert sent_t == -10 + + def test_mode_gate_resets_ramp_state(self): + # While in MANUAL, current is reset to 0 + cur_s, cur_t, _, _ = self._apply_mode_gate( + self.MODE_MANUAL, 500, -200, 500, -200) + assert cur_s == 0 + assert cur_t == 0 + # Re-entering AUTONOMOUS ramps from 0, not from 500 + _, _, sent_s, _ = self._apply_mode_gate( + self.MODE_AUTONOMOUS, cur_s, cur_t, 500, 0, step=10) + assert sent_s == 10 # ramped from 0, not jumped to 500 + + +# ── Deadman switch tests ────────────────────────────────────────────────────── + +class TestDeadman: + def test_deadman_zeros_targets_on_timeout(self): + """Simulate no /cmd_vel for > timeout → targets zeroed.""" + target_speed = 500 + target_steer = -300 + # last_cmd_vel was 1.0s ago, timeout is 0.5s → trigger + last_cmd_vel = 0.0 + now = 0.6 # 0.6s elapsed + timeout = 0.5 + + if last_cmd_vel > 0.0 and now - last_cmd_vel > timeout: + target_speed = 0 + target_steer = 0 + + assert target_speed == 0 + assert target_steer == 0 + + def test_deadman_does_not_trigger_before_timeout(self): + target_speed = 500 + last_cmd_vel = 0.0 + now = 0.3 # only 0.3s elapsed + timeout = 0.5 + + if last_cmd_vel > 0.0 and now - last_cmd_vel > timeout: + target_speed = 0 + + assert target_speed == 500 # unchanged + + def test_deadman_does_not_trigger_at_t0_before_first_cmd(self): + # last_cmd_vel==0 means no /cmd_vel ever received — don't zero yet + target_speed = 0 + last_cmd_vel = 0.0 # never received + now = 10.0 + timeout = 0.5 + + if last_cmd_vel > 0.0 and now - last_cmd_vel > timeout: + target_speed = 999 # should not reach here + + assert target_speed == 0 + + +# ── Frame format ────────────────────────────────────────────────────────────── + +class TestFrameFormat: + def test_zero_frame(self): + frame = f"C{0},{0}\n".encode("ascii") + assert frame == b"C0,0\n" + + def test_drive_frame(self): + speed, steer = cmd_vel_to_targets(0.3, -1.0) + frame = f"C{speed},{steer}\n".encode("ascii") + assert frame == b"C300,500\n" + + def test_heartbeat_frame(self): + assert b"H\n" == b"H\n" -- 2.47.2