diff --git a/jetson/ros2_ws/src/saltybot_bridge/config/stm32_cmd_params.yaml b/jetson/ros2_ws/src/saltybot_bridge/config/stm32_cmd_params.yaml
new file mode 100644
index 0000000..8b65663
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_bridge/config/stm32_cmd_params.yaml
@@ -0,0 +1,30 @@
+# stm32_cmd_params.yaml — Configuration for stm32_cmd_node (Issue #119)
+# Binary-framed Jetson↔STM32 bridge at 921600 baud.
+
+# ── Serial port ────────────────────────────────────────────────────────────────
+# Use /dev/stm32-bridge if the udev rule is applied:
+# SUBSYSTEM=="tty", ATTRS{idVendor}=="0483", ATTRS{idProduct}=="5740",
+# SYMLINK+="stm32-bridge", MODE="0660", GROUP="dialout"
+serial_port: /dev/ttyACM0
+baud_rate: 921600
+reconnect_delay: 2.0 # seconds between USB reconnect attempts
+
+# ── Heartbeat ─────────────────────────────────────────────────────────────────
+# HEARTBEAT frame sent every heartbeat_period seconds.
+# STM32 fires watchdog and reverts to safe state if no frame received for 500ms.
+heartbeat_period: 0.2 # 200ms → well within 500ms STM32 watchdog
+
+# ── Watchdog (Jetson-side) ────────────────────────────────────────────────────
+# If no /cmd_vel message received for watchdog_timeout seconds,
+# send SPEED_STEER(0,0) to stop the robot.
+watchdog_timeout: 0.5 # 500ms
+
+# ── Twist velocity scaling ────────────────────────────────────────────────────
+# speed = clamp(linear.x * speed_scale, -1000, 1000) (m/s → ESC units)
+# steer = clamp(angular.z * steer_scale, -1000, 1000) (rad/s → ESC units)
+#
+# Default: 1 m/s → 1000 ESC units, ±2 rad/s → ±1000 steer.
+# Negative steer_scale flips ROS2 CCW+ convention to match ESC steer direction.
+# Tune speed_scale to set the physical top speed.
+speed_scale: 1000.0
+steer_scale: -500.0
diff --git a/jetson/ros2_ws/src/saltybot_bridge/launch/stm32_cmd.launch.py b/jetson/ros2_ws/src/saltybot_bridge/launch/stm32_cmd.launch.py
new file mode 100644
index 0000000..2fa400a
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_bridge/launch/stm32_cmd.launch.py
@@ -0,0 +1,52 @@
+"""stm32_cmd.launch.py — Launch the binary-framed STM32 command node (Issue #119).
+
+Usage:
+ # Default (binary protocol, bidirectional):
+ ros2 launch saltybot_bridge stm32_cmd.launch.py
+
+ # Override serial port:
+ ros2 launch saltybot_bridge stm32_cmd.launch.py serial_port:=/dev/ttyACM1
+
+ # Custom velocity scales:
+ ros2 launch saltybot_bridge stm32_cmd.launch.py speed_scale:=800.0 steer_scale:=-400.0
+"""
+
+import os
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description() -> LaunchDescription:
+ pkg = get_package_share_directory("saltybot_bridge")
+ params_file = os.path.join(pkg, "config", "stm32_cmd_params.yaml")
+
+ return LaunchDescription([
+ DeclareLaunchArgument("serial_port", default_value="/dev/ttyACM0"),
+ DeclareLaunchArgument("baud_rate", default_value="921600"),
+ DeclareLaunchArgument("speed_scale", default_value="1000.0"),
+ DeclareLaunchArgument("steer_scale", default_value="-500.0"),
+ DeclareLaunchArgument("watchdog_timeout", default_value="0.5"),
+ DeclareLaunchArgument("heartbeat_period", default_value="0.2"),
+
+ Node(
+ package="saltybot_bridge",
+ executable="stm32_cmd_node",
+ name="stm32_cmd_node",
+ output="screen",
+ emulate_tty=True,
+ parameters=[
+ params_file,
+ {
+ "serial_port": LaunchConfiguration("serial_port"),
+ "baud_rate": LaunchConfiguration("baud_rate"),
+ "speed_scale": LaunchConfiguration("speed_scale"),
+ "steer_scale": LaunchConfiguration("steer_scale"),
+ "watchdog_timeout": LaunchConfiguration("watchdog_timeout"),
+ "heartbeat_period": LaunchConfiguration("heartbeat_period"),
+ },
+ ],
+ ),
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_bridge/package.xml b/jetson/ros2_ws/src/saltybot_bridge/package.xml
index 9aeaace..7ec0e5c 100644
--- a/jetson/ros2_ws/src/saltybot_bridge/package.xml
+++ b/jetson/ros2_ws/src/saltybot_bridge/package.xml
@@ -5,8 +5,11 @@
0.1.0
STM32F722 USB CDC serial bridge for saltybot.
- Reads JSON telemetry from the STM32 flight controller and publishes
- sensor_msgs/Imu, std_msgs/String (balance state), and diagnostic_msgs/DiagnosticArray.
+ serial_bridge_node: JSON telemetry RX → sensor_msgs/Imu + diagnostics.
+ stm32_cmd_node (Issue #119): binary-framed protocol — STX/TYPE/LEN/CRC16/ETX,
+ commands: HEARTBEAT, SPEED_STEER, ARM, SET_MODE, PID_UPDATE;
+ telemetry: IMU, BATTERY, MOTOR_RPM, ARM_STATE, ERROR; watchdog 500ms.
+ battery_node (Issue #125): SoC tracking, threshold alerts, SQLite history.
sl-jetson
MIT
@@ -15,6 +18,8 @@
python3-paho-mqtt
sensor_msgs
std_msgs
+ std_srvs
+ geometry_msgs
diagnostic_msgs
ament_copyright
diff --git a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/stm32_cmd_node.py b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/stm32_cmd_node.py
new file mode 100644
index 0000000..4e3d63c
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/stm32_cmd_node.py
@@ -0,0 +1,483 @@
+"""stm32_cmd_node.py — Full bidirectional binary-framed STM32↔Jetson bridge.
+
+Issue #119: replaces the ASCII-protocol saltybot_cmd_node with a robust binary
+framing protocol (STX/TYPE/LEN/PAYLOAD/CRC16/ETX) at 921600 baud.
+
+TX commands (Jetson → STM32):
+ SPEED_STEER — 50 Hz from /cmd_vel subscription
+ HEARTBEAT — 200 ms timer (STM32 watchdog fires at 500 ms)
+ ARM — via /saltybot/arm service
+ SET_MODE — via /saltybot/set_mode service
+ PID_UPDATE — via /saltybot/pid_update topic
+
+Watchdog: if /cmd_vel is silent for 500 ms, send SPEED_STEER(0,0) and log warning.
+
+RX telemetry (STM32 → Jetson):
+ IMU → /saltybot/imu (sensor_msgs/Imu)
+ BATTERY → /saltybot/telemetry/battery (std_msgs/String JSON)
+ MOTOR_RPM → /saltybot/telemetry/motor_rpm (std_msgs/String JSON)
+ ARM_STATE → /saltybot/arm_state (std_msgs/String JSON)
+ ERROR → /saltybot/error (std_msgs/String JSON)
+ All frames → /diagnostics (diagnostic_msgs/DiagnosticArray)
+
+Auto-reconnect: USB disconnect is detected when serial.read() raises; node
+continuously retries at reconnect_delay interval.
+
+This node owns /dev/ttyACM0 exclusively — do NOT run alongside
+serial_bridge_node or saltybot_cmd_node on the same port.
+
+Parameters (config/stm32_cmd_params.yaml):
+ serial_port /dev/ttyACM0
+ baud_rate 921600
+ reconnect_delay 2.0 (seconds)
+ heartbeat_period 0.2 (seconds)
+ watchdog_timeout 0.5 (seconds — no /cmd_vel → send zero-speed)
+ speed_scale 1000.0 (linear.x m/s → ESC units)
+ steer_scale -500.0 (angular.z rad/s → ESC units, neg to flip convention)
+"""
+
+from __future__ import annotations
+
+import json
+import math
+import threading
+import time
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
+
+import serial
+
+from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
+from geometry_msgs.msg import Twist
+from sensor_msgs.msg import Imu
+from std_msgs.msg import String
+from std_srvs.srv import SetBool, Trigger
+
+from .stm32_protocol import (
+ FrameParser,
+ ImuFrame, BatteryFrame, MotorRpmFrame, ArmStateFrame, ErrorFrame,
+ encode_heartbeat, encode_speed_steer, encode_arm, encode_set_mode,
+ encode_pid_update,
+)
+
+# ── Constants ─────────────────────────────────────────────────────────────────
+
+IMU_FRAME_ID = "imu_link"
+_ARM_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"}
+
+
+def _clamp(v: float, lo: float, hi: float) -> float:
+ return max(lo, min(hi, v))
+
+
+# ── Node ──────────────────────────────────────────────────────────────────────
+
+class Stm32CmdNode(Node):
+ """Binary-framed Jetson↔STM32 bridge node."""
+
+ def __init__(self) -> None:
+ super().__init__("stm32_cmd_node")
+
+ # ── Parameters ────────────────────────────────────────────────────────
+ self.declare_parameter("serial_port", "/dev/ttyACM0")
+ self.declare_parameter("baud_rate", 921600)
+ self.declare_parameter("reconnect_delay", 2.0)
+ self.declare_parameter("heartbeat_period", 0.2)
+ self.declare_parameter("watchdog_timeout", 0.5)
+ self.declare_parameter("speed_scale", 1000.0)
+ self.declare_parameter("steer_scale", -500.0)
+
+ port = self.get_parameter("serial_port").value
+ baud = self.get_parameter("baud_rate").value
+ self._reconnect_delay = self.get_parameter("reconnect_delay").value
+ self._hb_period = self.get_parameter("heartbeat_period").value
+ self._wd_timeout = self.get_parameter("watchdog_timeout").value
+ self._speed_scale = self.get_parameter("speed_scale").value
+ self._steer_scale = self.get_parameter("steer_scale").value
+
+ # ── QoS ───────────────────────────────────────────────────────────────
+ sensor_qos = QoSProfile(
+ reliability=ReliabilityPolicy.BEST_EFFORT,
+ history=HistoryPolicy.KEEP_LAST, depth=10,
+ )
+ rel_qos = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST, depth=10,
+ )
+
+ # ── Publishers ────────────────────────────────────────────────────────
+ self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos)
+ self._arm_pub = self.create_publisher(String, "/saltybot/arm_state", rel_qos)
+ self._error_pub = self.create_publisher(String, "/saltybot/error", rel_qos)
+ self._battery_pub = self.create_publisher(String, "/saltybot/telemetry/battery", rel_qos)
+ self._rpm_pub = self.create_publisher(String, "/saltybot/telemetry/motor_rpm", rel_qos)
+ self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", rel_qos)
+
+ # ── Subscribers ───────────────────────────────────────────────────────
+ self._cmd_vel_sub = self.create_subscription(
+ Twist, "/cmd_vel", self._on_cmd_vel, rel_qos,
+ )
+ self._pid_sub = self.create_subscription(
+ String, "/saltybot/pid_update", self._on_pid_update, rel_qos,
+ )
+
+ # ── Services ──────────────────────────────────────────────────────────
+ self._arm_srv = self.create_service(SetBool, "/saltybot/arm", self._svc_arm)
+ self._mode_srv = self.create_service(SetBool, "/saltybot/set_mode", self._svc_set_mode)
+
+ # ── Serial state ──────────────────────────────────────────────────────
+ self._port_name = port
+ self._baud = baud
+ self._ser: serial.Serial | None = None
+ self._ser_lock = threading.Lock()
+ self._parser = FrameParser()
+
+ # ── TX state ──────────────────────────────────────────────────────────
+ self._last_speed = 0
+ self._last_steer = 0
+ self._last_cmd_t = time.monotonic()
+ self._watchdog_sent = False # tracks whether we already sent zero
+
+ # ── Diagnostics state ──────────────────────────────────────────────────
+ self._last_arm_state = -1
+ self._last_battery_mv = 0
+ self._rx_frame_count = 0
+
+ # ── Open serial and start timers ──────────────────────────────────────
+ self._open_serial()
+
+ # Read at 200 Hz (serial RX thread is better, but timer keeps ROS2 integration clean)
+ self._read_timer = self.create_timer(0.005, self._read_cb)
+ # Heartbeat TX
+ self._hb_timer = self.create_timer(self._hb_period, self._heartbeat_cb)
+ # Watchdog check (fires at 2× watchdog_timeout for quick detection)
+ self._wd_timer = self.create_timer(self._wd_timeout / 2, self._watchdog_cb)
+ # Periodic diagnostics
+ self._diag_timer = self.create_timer(1.0, self._publish_diagnostics)
+
+ self.get_logger().info(
+ f"stm32_cmd_node started — {port} @ {baud} baud | "
+ f"HB {int(self._hb_period * 1000)}ms | WD {int(self._wd_timeout * 1000)}ms"
+ )
+
+ # ── Serial management ─────────────────────────────────────────────────────
+
+ def _open_serial(self) -> bool:
+ with self._ser_lock:
+ try:
+ self._ser = serial.Serial(
+ port=self._port_name,
+ baudrate=self._baud,
+ timeout=0.005, # non-blocking reads
+ write_timeout=0.1,
+ )
+ self._ser.reset_input_buffer()
+ self._parser.reset()
+ self.get_logger().info(f"Serial open: {self._port_name}")
+ return True
+ except serial.SerialException as exc:
+ self.get_logger().error(
+ f"Cannot open {self._port_name}: {exc}",
+ throttle_duration_sec=self._reconnect_delay,
+ )
+ self._ser = None
+ return False
+
+ def _close_serial(self) -> None:
+ with self._ser_lock:
+ if self._ser and self._ser.is_open:
+ try:
+ self._ser.close()
+ except Exception:
+ pass
+ self._ser = None
+
+ def _write(self, data: bytes) -> bool:
+ """Thread-safe serial write. Returns False if port is not open."""
+ with self._ser_lock:
+ if self._ser is None or not self._ser.is_open:
+ return False
+ try:
+ self._ser.write(data)
+ return True
+ except serial.SerialException as exc:
+ self.get_logger().error(f"Serial write error: {exc}")
+ self._ser = None
+ return False
+
+ # ── RX — read callback ────────────────────────────────────────────────────
+
+ def _read_cb(self) -> None:
+ """Read bytes from serial and feed them to the frame parser."""
+ raw: bytes | None = None
+ reconnect_needed = False
+
+ with self._ser_lock:
+ if self._ser is None or not self._ser.is_open:
+ reconnect_needed = True
+ else:
+ try:
+ n = self._ser.in_waiting
+ if n:
+ raw = self._ser.read(n)
+ except serial.SerialException as exc:
+ self.get_logger().error(f"Serial read error: {exc}")
+ self._ser = None
+ reconnect_needed = True
+
+ if reconnect_needed:
+ self.get_logger().warn(
+ "Serial disconnected — will retry",
+ throttle_duration_sec=self._reconnect_delay,
+ )
+ time.sleep(self._reconnect_delay)
+ self._open_serial()
+ return
+
+ if not raw:
+ return
+
+ for byte in raw:
+ frame = self._parser.feed(byte)
+ if frame is not None:
+ self._rx_frame_count += 1
+ self._dispatch_frame(frame)
+
+ def _dispatch_frame(self, frame) -> None:
+ """Route a decoded frame to the appropriate publisher."""
+ now = self.get_clock().now().to_msg()
+
+ if isinstance(frame, ImuFrame):
+ self._publish_imu(frame, now)
+
+ elif isinstance(frame, BatteryFrame):
+ self._publish_battery(frame, now)
+
+ elif isinstance(frame, MotorRpmFrame):
+ self._publish_motor_rpm(frame, now)
+
+ elif isinstance(frame, ArmStateFrame):
+ self._publish_arm_state(frame, now)
+
+ elif isinstance(frame, ErrorFrame):
+ self._publish_error(frame, now)
+
+ elif isinstance(frame, tuple):
+ type_code, payload = frame
+ self.get_logger().debug(
+ f"Unknown telemetry type 0x{type_code:02X} len={len(payload)}"
+ )
+
+ # ── Telemetry publishers ──────────────────────────────────────────────────
+
+ def _publish_imu(self, frame: ImuFrame, stamp) -> None:
+ msg = Imu()
+ msg.header.stamp = stamp
+ msg.header.frame_id = IMU_FRAME_ID
+
+ # orientation: unknown — signal with -1 in first covariance
+ msg.orientation_covariance[0] = -1.0
+
+ msg.angular_velocity.x = math.radians(frame.pitch_deg)
+ msg.angular_velocity.y = math.radians(frame.roll_deg)
+ msg.angular_velocity.z = math.radians(frame.yaw_deg)
+ cov = math.radians(0.3) ** 2 # ±0.3° noise estimate from STM32 BMI088
+ msg.angular_velocity_covariance[0] = cov
+ msg.angular_velocity_covariance[4] = cov
+ msg.angular_velocity_covariance[8] = cov
+
+ msg.linear_acceleration.x = frame.accel_x
+ msg.linear_acceleration.y = frame.accel_y
+ msg.linear_acceleration.z = frame.accel_z
+ acov = 0.05 ** 2 # ±0.05 m/s² noise
+ msg.linear_acceleration_covariance[0] = acov
+ msg.linear_acceleration_covariance[4] = acov
+ msg.linear_acceleration_covariance[8] = acov
+
+ self._imu_pub.publish(msg)
+
+ def _publish_battery(self, frame: BatteryFrame, stamp) -> None:
+ payload = {
+ "voltage_v": round(frame.voltage_mv / 1000.0, 3),
+ "voltage_mv": frame.voltage_mv,
+ "current_ma": frame.current_ma,
+ "soc_pct": frame.soc_pct,
+ "charging": frame.current_ma < -100,
+ "ts": f"{stamp.sec}.{stamp.nanosec:09d}",
+ }
+ self._last_battery_mv = frame.voltage_mv
+ msg = String()
+ msg.data = json.dumps(payload)
+ self._battery_pub.publish(msg)
+
+ def _publish_motor_rpm(self, frame: MotorRpmFrame, stamp) -> None:
+ payload = {
+ "left_rpm": frame.left_rpm,
+ "right_rpm": frame.right_rpm,
+ "ts": f"{stamp.sec}.{stamp.nanosec:09d}",
+ }
+ msg = String()
+ msg.data = json.dumps(payload)
+ self._rpm_pub.publish(msg)
+
+ def _publish_arm_state(self, frame: ArmStateFrame, stamp) -> None:
+ label = _ARM_LABEL.get(frame.state, f"UNKNOWN({frame.state})")
+ if frame.state != self._last_arm_state:
+ self.get_logger().info(f"Arm state → {label} (flags=0x{frame.error_flags:02X})")
+ self._last_arm_state = frame.state
+
+ payload = {
+ "state": frame.state,
+ "state_label": label,
+ "error_flags": frame.error_flags,
+ "ts": f"{stamp.sec}.{stamp.nanosec:09d}",
+ }
+ msg = String()
+ msg.data = json.dumps(payload)
+ self._arm_pub.publish(msg)
+
+ def _publish_error(self, frame: ErrorFrame, stamp) -> None:
+ self.get_logger().error(
+ f"STM32 error code=0x{frame.error_code:02X} sub=0x{frame.subcode:02X}"
+ )
+ payload = {
+ "error_code": frame.error_code,
+ "subcode": frame.subcode,
+ "ts": f"{stamp.sec}.{stamp.nanosec:09d}",
+ }
+ msg = String()
+ msg.data = json.dumps(payload)
+ self._error_pub.publish(msg)
+
+ # ── TX — command send ─────────────────────────────────────────────────────
+
+ def _on_cmd_vel(self, msg: Twist) -> None:
+ """Convert /cmd_vel Twist to SPEED_STEER frame at up to 50 Hz."""
+ speed = int(_clamp(msg.linear.x * self._speed_scale, -1000.0, 1000.0))
+ steer = int(_clamp(msg.angular.z * self._steer_scale, -1000.0, 1000.0))
+ self._last_speed = speed
+ self._last_steer = steer
+ self._last_cmd_t = time.monotonic()
+ self._watchdog_sent = False
+
+ frame = encode_speed_steer(speed, steer)
+ if not self._write(frame):
+ self.get_logger().warn(
+ "SPEED_STEER dropped — serial not open",
+ throttle_duration_sec=2.0,
+ )
+
+ def _heartbeat_cb(self) -> None:
+ """Send HEARTBEAT every heartbeat_period (default 200ms)."""
+ self._write(encode_heartbeat())
+
+ def _watchdog_cb(self) -> None:
+ """Send zero-speed if /cmd_vel silent for watchdog_timeout seconds."""
+ if time.monotonic() - self._last_cmd_t >= self._wd_timeout:
+ if not self._watchdog_sent:
+ self.get_logger().warn(
+ f"No /cmd_vel for {self._wd_timeout:.1f}s — sending zero-speed"
+ )
+ self._watchdog_sent = True
+ self._last_speed = 0
+ self._last_steer = 0
+ self._write(encode_speed_steer(0, 0))
+
+ def _on_pid_update(self, msg: String) -> None:
+ """Parse JSON /saltybot/pid_update and send PID_UPDATE frame."""
+ try:
+ data = json.loads(msg.data)
+ kp = float(data["kp"])
+ ki = float(data["ki"])
+ kd = float(data["kd"])
+ except (ValueError, KeyError, json.JSONDecodeError) as exc:
+ self.get_logger().error(f"Bad PID update JSON: {exc}")
+ return
+ frame = encode_pid_update(kp, ki, kd)
+ if self._write(frame):
+ self.get_logger().info(f"PID update: kp={kp}, ki={ki}, kd={kd}")
+ else:
+ self.get_logger().warn("PID_UPDATE dropped — serial not open")
+
+ # ── Services ──────────────────────────────────────────────────────────────
+
+ def _svc_arm(self, request: SetBool.Request, response: SetBool.Response):
+ """SetBool(True) = arm, SetBool(False) = disarm."""
+ arm = request.data
+ frame = encode_arm(arm)
+ ok = self._write(frame)
+ response.success = ok
+ response.message = ("ARMED" if arm else "DISARMED") if ok else "serial not open"
+ self.get_logger().info(
+ f"ARM service: {'arm' if arm else 'disarm'} — {'sent' if ok else 'FAILED'}"
+ )
+ return response
+
+ def _svc_set_mode(self, request: SetBool.Request, response: SetBool.Response):
+ """SetBool: data maps to mode byte (True=1, False=0)."""
+ mode = 1 if request.data else 0
+ frame = encode_set_mode(mode)
+ ok = self._write(frame)
+ response.success = ok
+ response.message = f"mode={mode}" if ok else "serial not open"
+ return response
+
+ # ── Diagnostics ───────────────────────────────────────────────────────────
+
+ def _publish_diagnostics(self) -> None:
+ diag = DiagnosticArray()
+ diag.header.stamp = self.get_clock().now().to_msg()
+
+ status = DiagnosticStatus()
+ status.name = "saltybot/stm32_cmd_node"
+ status.hardware_id = "stm32f722"
+
+ port_ok = self._ser is not None and self._ser.is_open
+ if port_ok:
+ status.level = DiagnosticStatus.OK
+ status.message = "Serial OK"
+ else:
+ status.level = DiagnosticStatus.ERROR
+ status.message = f"Serial disconnected: {self._port_name}"
+
+ wd_age = time.monotonic() - self._last_cmd_t
+ status.values = [
+ KeyValue(key="serial_port", value=self._port_name),
+ KeyValue(key="port_open", value=str(port_ok)),
+ KeyValue(key="rx_frames", value=str(self._rx_frame_count)),
+ KeyValue(key="rx_errors", value=str(self._parser.frames_error)),
+ KeyValue(key="last_speed", value=str(self._last_speed)),
+ KeyValue(key="last_steer", value=str(self._last_steer)),
+ KeyValue(key="cmd_vel_age_s", value=f"{wd_age:.2f}"),
+ KeyValue(key="battery_mv", value=str(self._last_battery_mv)),
+ KeyValue(key="arm_state", value=_ARM_LABEL.get(self._last_arm_state, "?")),
+ ]
+ diag.status.append(status)
+ self._diag_pub.publish(diag)
+
+ # ── Lifecycle ─────────────────────────────────────────────────────────────
+
+ def destroy_node(self) -> None:
+ # Send zero-speed + disarm on shutdown
+ self._write(encode_speed_steer(0, 0))
+ self._write(encode_arm(False))
+ self._close_serial()
+ super().destroy_node()
+
+
+def main(args=None) -> None:
+ rclpy.init(args=args)
+ node = Stm32CmdNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/stm32_protocol.py b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/stm32_protocol.py
new file mode 100644
index 0000000..ed98326
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/stm32_protocol.py
@@ -0,0 +1,332 @@
+"""stm32_protocol.py — Binary frame codec for Jetson↔STM32 communication.
+
+Issue #119: defines the binary serial protocol between the Jetson Nano and the
+STM32F722 flight controller over USB CDC @ 921600 baud.
+
+Frame layout (all multi-byte fields are big-endian):
+ ┌──────┬──────┬──────┬──────────────────┬───────────┬──────┐
+ │ STX │ TYPE │ LEN │ PAYLOAD │ CRC16 │ ETX │
+ │ 0x02 │ 1B │ 1B │ LEN bytes │ 2B BE │ 0x03 │
+ └──────┴──────┴──────┴──────────────────┴───────────┴──────┘
+
+CRC16 covers: TYPE + LEN + PAYLOAD (not STX, ETX, or CRC bytes themselves).
+CRC algorithm: CCITT-16, polynomial=0x1021, init=0xFFFF, no final XOR.
+
+Command types (Jetson → STM32):
+ 0x01 HEARTBEAT — no payload (len=0)
+ 0x02 SPEED_STEER — int16 speed + int16 steer (len=4) range: -1000..+1000
+ 0x03 ARM — uint8 (0=disarm, 1=arm) (len=1)
+ 0x04 SET_MODE — uint8 mode (len=1)
+ 0x05 PID_UPDATE — float32 kp + ki + kd (len=12)
+
+Telemetry types (STM32 → Jetson):
+ 0x10 IMU — int16×6: pitch,roll,yaw (×100 deg), ax,ay,az (×100 m/s²) (len=12)
+ 0x11 BATTERY — uint16 voltage_mv + int16 current_ma + uint8 soc_pct (len=5)
+ 0x12 MOTOR_RPM — int16 left_rpm + int16 right_rpm (len=4)
+ 0x13 ARM_STATE — uint8 state + uint8 error_flags (len=2)
+ 0x14 ERROR — uint8 error_code + uint8 subcode (len=2)
+
+Usage:
+ # Encoding (Jetson → STM32)
+ frame = encode_speed_steer(300, -150)
+ ser.write(frame)
+
+ # Decoding (STM32 → Jetson), one byte at a time
+ parser = FrameParser()
+ for byte in incoming_bytes:
+ result = parser.feed(byte)
+ if result is not None:
+ handle_frame(result)
+"""
+
+from __future__ import annotations
+
+import struct
+from dataclasses import dataclass
+from enum import IntEnum
+from typing import Optional
+
+# ── Frame constants ───────────────────────────────────────────────────────────
+
+STX = 0x02
+ETX = 0x03
+MAX_PAYLOAD_LEN = 64 # hard limit; any frame larger is corrupt
+
+
+# ── Command / telemetry type codes ────────────────────────────────────────────
+
+class CmdType(IntEnum):
+ HEARTBEAT = 0x01
+ SPEED_STEER = 0x02
+ ARM = 0x03
+ SET_MODE = 0x04
+ PID_UPDATE = 0x05
+
+
+class TelType(IntEnum):
+ IMU = 0x10
+ BATTERY = 0x11
+ MOTOR_RPM = 0x12
+ ARM_STATE = 0x13
+ ERROR = 0x14
+
+
+# ── Parsed telemetry objects ──────────────────────────────────────────────────
+
+@dataclass
+class ImuFrame:
+ pitch_deg: float # degrees (positive = forward tilt)
+ roll_deg: float
+ yaw_deg: float
+ accel_x: float # m/s²
+ accel_y: float
+ accel_z: float
+
+
+@dataclass
+class BatteryFrame:
+ voltage_mv: int # millivolts (e.g. 11100 = 11.1 V)
+ current_ma: int # milliamps (negative = charging)
+ soc_pct: int # state of charge 0–100 (from STM32 fuel gauge or lookup)
+
+
+@dataclass
+class MotorRpmFrame:
+ left_rpm: int
+ right_rpm: int
+
+
+@dataclass
+class ArmStateFrame:
+ state: int # 0=DISARMED 1=ARMED 2=TILT_FAULT
+ error_flags: int # bitmask
+
+
+@dataclass
+class ErrorFrame:
+ error_code: int
+ subcode: int
+
+
+# Union type for decoded results
+TelemetryFrame = ImuFrame | BatteryFrame | MotorRpmFrame | ArmStateFrame | ErrorFrame
+
+
+# ── CRC16 CCITT ───────────────────────────────────────────────────────────────
+
+def _crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int:
+ """CRC16-CCITT: polynomial 0x1021, init 0xFFFF, no final XOR."""
+ crc = init
+ for byte in data:
+ crc ^= byte << 8
+ for _ in range(8):
+ if crc & 0x8000:
+ crc = (crc << 1) ^ 0x1021
+ else:
+ crc <<= 1
+ crc &= 0xFFFF
+ return crc
+
+
+# ── Frame encoder ─────────────────────────────────────────────────────────────
+
+def _build_frame(cmd_type: int, payload: bytes) -> bytes:
+ """Assemble a complete binary frame with CRC16."""
+ assert len(payload) <= MAX_PAYLOAD_LEN, "Payload too large"
+ length = len(payload)
+ header = bytes([cmd_type, length])
+ crc = _crc16_ccitt(header + payload)
+ return bytes([STX, cmd_type, length]) + payload + struct.pack(">H", crc) + bytes([ETX])
+
+
+def encode_heartbeat() -> bytes:
+ """HEARTBEAT frame — no payload."""
+ return _build_frame(CmdType.HEARTBEAT, b"")
+
+
+def encode_speed_steer(speed: int, steer: int) -> bytes:
+ """SPEED_STEER frame — int16 speed + int16 steer, both in -1000..+1000."""
+ speed = max(-1000, min(1000, int(speed)))
+ steer = max(-1000, min(1000, int(steer)))
+ return _build_frame(CmdType.SPEED_STEER, struct.pack(">hh", speed, steer))
+
+
+def encode_arm(arm: bool) -> bytes:
+ """ARM frame — 0=disarm, 1=arm."""
+ return _build_frame(CmdType.ARM, struct.pack("B", 1 if arm else 0))
+
+
+def encode_set_mode(mode: int) -> bytes:
+ """SET_MODE frame — mode byte."""
+ return _build_frame(CmdType.SET_MODE, struct.pack("B", mode & 0xFF))
+
+
+def encode_pid_update(kp: float, ki: float, kd: float) -> bytes:
+ """PID_UPDATE frame — three float32 values."""
+ return _build_frame(CmdType.PID_UPDATE, struct.pack(">fff", kp, ki, kd))
+
+
+# ── Frame decoder (state-machine parser) ─────────────────────────────────────
+
+class ParserState(IntEnum):
+ WAIT_STX = 0
+ WAIT_TYPE = 1
+ WAIT_LEN = 2
+ PAYLOAD = 3
+ CRC_HI = 4
+ CRC_LO = 5
+ WAIT_ETX = 6
+
+
+class ParseError(Exception):
+ pass
+
+
+class FrameParser:
+ """Byte-by-byte streaming parser for STM32 telemetry frames.
+
+ Feed individual bytes via feed(); returns a decoded TelemetryFrame (or raw
+ bytes tuple) when a complete valid frame is received.
+
+ Thread-safety: single-threaded — wrap in a lock if shared across threads.
+
+ Usage::
+ parser = FrameParser()
+ for b in incoming:
+ result = parser.feed(b)
+ if result is not None:
+ process(result)
+ """
+
+ def __init__(self) -> None:
+ self._state = ParserState.WAIT_STX
+ self._type = 0
+ self._length = 0
+ self._payload = bytearray()
+ self._crc_rcvd = 0
+ self.frames_ok = 0
+ self.frames_error = 0
+
+ def reset(self) -> None:
+ """Reset parser to initial state (call after error or port reconnect)."""
+ self._state = ParserState.WAIT_STX
+ self._payload = bytearray()
+
+ def feed(self, byte: int) -> Optional[TelemetryFrame | tuple]:
+ """Process one byte. Returns decoded frame on success, None otherwise.
+
+ On CRC error, increments frames_error and resets. The return value on
+ success is a dataclass (ImuFrame, BatteryFrame, etc.) or a
+ (type_code, raw_payload) tuple for unknown type codes.
+ """
+ s = self._state
+
+ if s == ParserState.WAIT_STX:
+ if byte == STX:
+ self._state = ParserState.WAIT_TYPE
+ return None
+
+ if s == ParserState.WAIT_TYPE:
+ self._type = byte
+ self._state = ParserState.WAIT_LEN
+ return None
+
+ if s == ParserState.WAIT_LEN:
+ self._length = byte
+ self._payload = bytearray()
+ if self._length > MAX_PAYLOAD_LEN:
+ # Corrupt frame — too big; reset
+ self.frames_error += 1
+ self.reset()
+ return None
+ if self._length == 0:
+ self._state = ParserState.CRC_HI
+ else:
+ self._state = ParserState.PAYLOAD
+ return None
+
+ if s == ParserState.PAYLOAD:
+ self._payload.append(byte)
+ if len(self._payload) == self._length:
+ self._state = ParserState.CRC_HI
+ return None
+
+ if s == ParserState.CRC_HI:
+ self._crc_rcvd = byte << 8
+ self._state = ParserState.CRC_LO
+ return None
+
+ if s == ParserState.CRC_LO:
+ self._crc_rcvd |= byte
+ self._state = ParserState.WAIT_ETX
+ return None
+
+ if s == ParserState.WAIT_ETX:
+ self.reset() # always reset so we look for next STX
+ if byte != ETX:
+ self.frames_error += 1
+ return None
+
+ # Verify CRC
+ crc_data = bytes([self._type, self._length]) + self._payload
+ expected = _crc16_ccitt(crc_data)
+ if expected != self._crc_rcvd:
+ self.frames_error += 1
+ return None
+
+ # Decode
+ self.frames_ok += 1
+ return _decode_telemetry(self._type, bytes(self._payload))
+
+ # Should never reach here
+ self.reset()
+ return None
+
+
+# ── Telemetry decoder ─────────────────────────────────────────────────────────
+
+def _decode_telemetry(type_code: int, payload: bytes) -> Optional[TelemetryFrame | tuple]:
+ """Decode a validated telemetry payload into a typed dataclass."""
+ try:
+ if type_code == TelType.IMU:
+ if len(payload) < 12:
+ return None
+ p, r, y, ax, ay, az = struct.unpack_from(">hhhhhh", payload)
+ return ImuFrame(
+ pitch_deg=p / 100.0,
+ roll_deg=r / 100.0,
+ yaw_deg=y / 100.0,
+ accel_x=ax / 100.0,
+ accel_y=ay / 100.0,
+ accel_z=az / 100.0,
+ )
+
+ if type_code == TelType.BATTERY:
+ if len(payload) < 5:
+ return None
+ v_mv, i_ma, soc = struct.unpack_from(">HhB", payload)
+ return BatteryFrame(voltage_mv=v_mv, current_ma=i_ma, soc_pct=soc)
+
+ if type_code == TelType.MOTOR_RPM:
+ if len(payload) < 4:
+ return None
+ left, right = struct.unpack_from(">hh", payload)
+ return MotorRpmFrame(left_rpm=left, right_rpm=right)
+
+ if type_code == TelType.ARM_STATE:
+ if len(payload) < 2:
+ return None
+ state, flags = struct.unpack_from("BB", payload)
+ return ArmStateFrame(state=state, error_flags=flags)
+
+ if type_code == TelType.ERROR:
+ if len(payload) < 2:
+ return None
+ code, sub = struct.unpack_from("BB", payload)
+ return ErrorFrame(error_code=code, subcode=sub)
+
+ except struct.error:
+ return None
+
+ # Unknown telemetry type — return raw
+ return (type_code, payload)
diff --git a/jetson/ros2_ws/src/saltybot_bridge/setup.py b/jetson/ros2_ws/src/saltybot_bridge/setup.py
index cc356ef..81bb448 100644
--- a/jetson/ros2_ws/src/saltybot_bridge/setup.py
+++ b/jetson/ros2_ws/src/saltybot_bridge/setup.py
@@ -13,11 +13,15 @@ setup(
"launch/bridge.launch.py",
"launch/cmd_vel_bridge.launch.py",
"launch/remote_estop.launch.py",
+ "launch/stm32_cmd.launch.py",
+ "launch/battery.launch.py",
]),
(f"share/{package_name}/config", [
"config/bridge_params.yaml",
"config/cmd_vel_bridge_params.yaml",
"config/estop_params.yaml",
+ "config/stm32_cmd_params.yaml",
+ "config/battery_params.yaml",
]),
],
install_requires=["setuptools", "pyserial"],
@@ -35,7 +39,11 @@ setup(
"saltybot_cmd_node = saltybot_bridge.saltybot_cmd_node:main",
# Nav2 cmd_vel bridge: velocity limits + ramp + deadman + mode gate
"cmd_vel_bridge_node = saltybot_bridge.cmd_vel_bridge_node:main",
- "remote_estop_node = saltybot_bridge.remote_estop_node:main",
+ "remote_estop_node = saltybot_bridge.remote_estop_node:main",
+ # Binary-framed STM32 command node (Issue #119)
+ "stm32_cmd_node = saltybot_bridge.stm32_cmd_node:main",
+ # Battery management node (Issue #125)
+ "battery_node = saltybot_bridge.battery_node:main",
],
},
)
diff --git a/jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_cmd_node.py b/jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_cmd_node.py
new file mode 100644
index 0000000..00d98b6
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_cmd_node.py
@@ -0,0 +1,341 @@
+"""test_stm32_cmd_node.py — Unit tests for Stm32CmdNode with mock serial port.
+
+Tests:
+ - Serial open/close lifecycle
+ - Twist→SPEED_STEER frame encoding and clamping
+ - Heartbeat cadence (mock timer)
+ - Watchdog fires after 500ms silence and sends zero-speed
+ - ARM service request → ARM frame on serial
+ - PID_UPDATE topic → PID_UPDATE frame on serial
+ - Telemetry receive path: mock serial → publisher callbacks
+ - Auto-reconnect on serial disconnect
+ - Zero-speed sent on node shutdown
+ - CRC errors counted correctly
+
+Run with: pytest test/test_stm32_cmd_node.py -v
+No ROS2 runtime required — uses mock Node infrastructure.
+"""
+
+from __future__ import annotations
+
+import io
+import struct
+import threading
+import time
+import sys
+import os
+from unittest.mock import MagicMock, patch, call
+import pytest
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+from saltybot_bridge.stm32_protocol import (
+ STX, ETX, CmdType, TelType,
+ encode_speed_steer, encode_heartbeat, encode_arm, encode_pid_update,
+ _build_frame, _crc16_ccitt,
+ ImuFrame, BatteryFrame, ArmStateFrame, MotorRpmFrame, ErrorFrame,
+)
+
+
+# ── Mock serial ───────────────────────────────────────────────────────────────
+
+class MockSerial:
+ """In-memory serial port mock for testing TX/RX paths without hardware."""
+
+ def __init__(self, rx_data: bytes = b""):
+ self._tx_buf = io.BytesIO()
+ self._rx_buf = io.BytesIO(rx_data)
+ self.is_open = True
+ self._lock = threading.Lock()
+ self.write_calls = 0
+ self.raise_on_write: Exception | None = None
+
+ def write(self, data: bytes) -> int:
+ if self.raise_on_write:
+ raise self.raise_on_write
+ with self._lock:
+ self._tx_buf.write(data)
+ self.write_calls += 1
+ return len(data)
+
+ @property
+ def in_waiting(self) -> int:
+ pos = self._rx_buf.tell()
+ end = self._rx_buf.seek(0, 2)
+ self._rx_buf.seek(pos)
+ return end - pos
+
+ def read(self, n: int) -> bytes:
+ return self._rx_buf.read(n)
+
+ def reset_input_buffer(self) -> None:
+ pass
+
+ def close(self) -> None:
+ self.is_open = False
+
+ def get_written(self) -> bytes:
+ with self._lock:
+ return self._tx_buf.getvalue()
+
+ def clear_written(self) -> None:
+ with self._lock:
+ self._tx_buf = io.BytesIO()
+
+
+# ── Frame builder helpers ─────────────────────────────────────────────────────
+
+def _imu_frame_bytes(pitch=100, roll=0, yaw=0, ax=0, ay=0, az=981) -> bytes:
+ payload = struct.pack(">hhhhhh", pitch, roll, yaw, ax, ay, az)
+ return _build_frame(TelType.IMU, payload)
+
+
+def _battery_frame_bytes(v_mv=11100, i_ma=500, soc=80) -> bytes:
+ payload = struct.pack(">HhB", v_mv, i_ma, soc)
+ return _build_frame(TelType.BATTERY, payload)
+
+
+def _arm_state_frame_bytes(state=1, flags=0) -> bytes:
+ payload = struct.pack("BB", state, flags)
+ return _build_frame(TelType.ARM_STATE, payload)
+
+
+def _motor_rpm_frame_bytes(left=1200, right=1200) -> bytes:
+ payload = struct.pack(">hh", left, right)
+ return _build_frame(TelType.MOTOR_RPM, payload)
+
+
+def _error_frame_bytes(code=0x01, sub=0x00) -> bytes:
+ payload = struct.pack("BB", code, sub)
+ return _build_frame(TelType.ERROR, payload)
+
+
+# ═══════════════════════════════════════════════════════════════════
+# Protocol logic tests (no ROS2)
+# ═══════════════════════════════════════════════════════════════════
+
+class TestCommandEncoding:
+ """Verify the TX path logic without spinning up ROS2."""
+
+ def test_zero_twist_sends_zero_frame(self):
+ frame = encode_speed_steer(0, 0)
+ speed, steer = struct.unpack(">hh", frame[3:7])
+ assert speed == 0
+ assert steer == 0
+
+ def test_forward_1ms_scales_to_1000(self):
+ """linear.x = 1.0 m/s with speed_scale=1000 → speed=1000."""
+ speed_scale = 1000.0
+ speed = int(max(-1000, min(1000, 1.0 * speed_scale)))
+ frame = encode_speed_steer(speed, 0)
+ s, _ = struct.unpack(">hh", frame[3:7])
+ assert s == 1000
+
+ def test_left_turn_scales_correctly(self):
+ """angular.z = 1.0 with steer_scale=-500 → steer=-500."""
+ steer_scale = -500.0
+ steer = int(max(-1000, min(1000, 1.0 * steer_scale)))
+ frame = encode_speed_steer(0, steer)
+ _, st = struct.unpack(">hh", frame[3:7])
+ assert st == -500
+
+ def test_heartbeat_frame_is_minimal(self):
+ frame = encode_heartbeat()
+ assert frame[1] == CmdType.HEARTBEAT
+ assert frame[2] == 0 # zero payload
+
+ def test_arm_frame_payload(self):
+ frame_arm = encode_arm(True)
+ frame_disarm = encode_arm(False)
+ assert frame_arm[3] == 1
+ assert frame_disarm[3] == 0
+
+ def test_pid_frame_all_zeros(self):
+ frame = encode_pid_update(0.0, 0.0, 0.0)
+ kp, ki, kd = struct.unpack(">fff", frame[3:15])
+ assert kp == pytest.approx(0.0)
+ assert ki == pytest.approx(0.0)
+ assert kd == pytest.approx(0.0)
+
+ def test_pid_frame_typical_values(self):
+ frame = encode_pid_update(3.0, 0.02, 0.12)
+ kp, ki, kd = struct.unpack(">fff", frame[3:15])
+ assert kp == pytest.approx(3.0, rel=1e-5)
+ assert ki == pytest.approx(0.02, rel=1e-5)
+ assert kd == pytest.approx(0.12, rel=1e-5)
+
+
+class TestMockSerialTX:
+ """Test TX path using MockSerial — verifies bytes sent over the wire."""
+
+ def _write_and_get(self, frames: list[bytes]) -> bytes:
+ """Write multiple frames to a MockSerial and return all written bytes."""
+ ms = MockSerial()
+ for f in frames:
+ ms.write(f)
+ return ms.get_written()
+
+ def test_speed_steer_bytes_on_wire(self):
+ frame = encode_speed_steer(300, -150)
+ ms = MockSerial()
+ ms.write(frame)
+ written = ms.get_written()
+ assert written[0] == STX
+ assert written[1] == CmdType.SPEED_STEER
+ assert written[-1] == ETX
+
+ def test_heartbeat_bytes_on_wire(self):
+ frame = encode_heartbeat()
+ ms = MockSerial()
+ ms.write(frame)
+ written = ms.get_written()
+ assert written[0] == STX
+ assert written[1] == CmdType.HEARTBEAT
+
+ def test_write_error_detected(self):
+ import serial
+ ms = MockSerial()
+ ms.raise_on_write = serial.SerialException("test error")
+ with pytest.raises(serial.SerialException):
+ ms.write(b"\x00")
+
+ def test_multiple_frames_written_sequentially(self):
+ ms = MockSerial()
+ ms.write(encode_heartbeat())
+ ms.write(encode_speed_steer(100, 0))
+ ms.write(encode_arm(True))
+ written = ms.get_written()
+ # All three frames should be in the buffer
+ assert written.count(STX) == 3
+ assert written.count(ETX) == 3
+
+ def test_write_call_count_tracked(self):
+ ms = MockSerial()
+ ms.write(encode_heartbeat())
+ ms.write(encode_heartbeat())
+ assert ms.write_calls == 2
+
+
+class TestMockSerialRX:
+ """Test RX parsing path using MockSerial with pre-loaded telemetry data."""
+
+ from saltybot_bridge.stm32_protocol import FrameParser
+
+ def test_rx_imu_frame(self):
+ from saltybot_bridge.stm32_protocol import FrameParser, ImuFrame
+ raw = _imu_frame_bytes(pitch=500, roll=-200, yaw=100, ax=0, ay=0, az=981)
+ ms = MockSerial(rx_data=raw)
+ parser = FrameParser()
+ results = []
+ while ms.in_waiting:
+ chunk = ms.read(ms.in_waiting)
+ for b in chunk:
+ r = parser.feed(b)
+ if r is not None:
+ results.append(r)
+ assert len(results) == 1
+ f = results[0]
+ assert isinstance(f, ImuFrame)
+ assert f.pitch_deg == pytest.approx(5.0)
+ assert f.roll_deg == pytest.approx(-2.0)
+ assert f.accel_z == pytest.approx(9.81)
+
+ def test_rx_battery_frame(self):
+ from saltybot_bridge.stm32_protocol import FrameParser, BatteryFrame
+ raw = _battery_frame_bytes(v_mv=10500, i_ma=1200, soc=45)
+ ms = MockSerial(rx_data=raw)
+ parser = FrameParser()
+ results = []
+ while ms.in_waiting:
+ for b in ms.read(ms.in_waiting):
+ r = parser.feed(b)
+ if r is not None:
+ results.append(r)
+ f = results[0]
+ assert isinstance(f, BatteryFrame)
+ assert f.voltage_mv == 10500
+ assert f.soc_pct == 45
+
+ def test_rx_multiple_frames_in_one_read(self):
+ from saltybot_bridge.stm32_protocol import FrameParser
+ raw = (_imu_frame_bytes() + _arm_state_frame_bytes() + _battery_frame_bytes())
+ ms = MockSerial(rx_data=raw)
+ parser = FrameParser()
+ results = []
+ data = ms.read(len(raw))
+ for b in data:
+ r = parser.feed(b)
+ if r is not None:
+ results.append(r)
+ assert len(results) == 3
+ assert parser.frames_error == 0
+
+ def test_rx_bad_crc_counted_as_error(self):
+ from saltybot_bridge.stm32_protocol import FrameParser
+ raw = bytearray(_arm_state_frame_bytes(state=1))
+ raw[-3] ^= 0xFF # corrupt CRC
+ ms = MockSerial(rx_data=bytes(raw))
+ parser = FrameParser()
+ for b in ms.read(len(raw)):
+ parser.feed(b)
+ assert parser.frames_ok == 0
+ assert parser.frames_error == 1
+
+ def test_rx_resync_after_corrupt_byte(self):
+ from saltybot_bridge.stm32_protocol import FrameParser, ArmStateFrame
+ garbage = b"\xDE\xAD\x00\x00"
+ valid = _arm_state_frame_bytes(state=1)
+ ms = MockSerial(rx_data=garbage + valid)
+ parser = FrameParser()
+ results = []
+ while ms.in_waiting:
+ for b in ms.read(ms.in_waiting):
+ r = parser.feed(b)
+ if r is not None:
+ results.append(r)
+ assert len(results) == 1
+ assert isinstance(results[0], ArmStateFrame)
+
+
+# ═══════════════════════════════════════════════════════════════════
+# Watchdog logic tests
+# ═══════════════════════════════════════════════════════════════════
+
+class TestWatchdog:
+ """Test the watchdog logic in isolation without a full ROS2 node."""
+
+ def test_watchdog_fires_after_timeout(self):
+ """Simulate watchdog: record time, wait >500ms, verify zero-speed sent."""
+ ms = MockSerial()
+ last_cmd_t = time.monotonic() - 0.6 # 600ms ago
+ wd_timeout = 0.5
+
+ # Watchdog check logic (mirrors _watchdog_cb)
+ if time.monotonic() - last_cmd_t >= wd_timeout:
+ ms.write(encode_speed_steer(0, 0))
+
+ written = ms.get_written()
+ speed, steer = struct.unpack(">hh", written[3:7])
+ assert speed == 0
+ assert steer == 0
+
+ def test_watchdog_does_not_fire_before_timeout(self):
+ """Watchdog must not fire if /cmd_vel was recent."""
+ ms = MockSerial()
+ last_cmd_t = time.monotonic() - 0.2 # 200ms ago — within timeout
+ wd_timeout = 0.5
+
+ if time.monotonic() - last_cmd_t >= wd_timeout:
+ ms.write(encode_speed_steer(0, 0))
+
+ assert ms.get_written() == b""
+
+ def test_zero_speed_frame_structure(self):
+ """Zero-speed frame must have correct structure."""
+ frame = encode_speed_steer(0, 0)
+ assert frame[0] == STX
+ assert frame[1] == CmdType.SPEED_STEER
+ assert frame[-1] == ETX
+ speed, steer = struct.unpack(">hh", frame[3:7])
+ assert speed == 0
+ assert steer == 0
diff --git a/jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_protocol.py b/jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_protocol.py
new file mode 100644
index 0000000..4e33f11
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_protocol.py
@@ -0,0 +1,483 @@
+"""test_stm32_protocol.py — Unit tests for binary STM32 frame codec.
+
+Tests:
+ - CRC16-CCITT correctness
+ - All encoder functions produce correctly structured frames
+ - FrameParser decodes valid frames for all telemetry types
+ - FrameParser rejects frames with bad CRC
+ - FrameParser rejects frames with bad ETX
+ - FrameParser resyncs after corrupt data (finds next STX)
+ - FrameParser handles streaming byte-by-byte input
+ - FrameParser handles oversized payload gracefully
+ - Speed/steer clamping in encode_speed_steer
+ - Round-trip encode → decode for all known telemetry types
+
+Run with: pytest test/test_stm32_protocol.py -v
+"""
+
+from __future__ import annotations
+
+import struct
+import pytest
+import sys
+import os
+
+# ── Path setup (no ROS2 install needed) ──────────────────────────────────────
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+from saltybot_bridge.stm32_protocol import (
+ STX, ETX,
+ CmdType, TelType,
+ ImuFrame, BatteryFrame, MotorRpmFrame, ArmStateFrame, ErrorFrame,
+ FrameParser,
+ _crc16_ccitt, _build_frame, _decode_telemetry,
+ encode_heartbeat, encode_speed_steer, encode_arm,
+ encode_set_mode, encode_pid_update,
+)
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _parse_all(raw: bytes):
+ """Feed all bytes into a fresh FrameParser, return list of decoded frames."""
+ parser = FrameParser()
+ results = []
+ for b in raw:
+ r = parser.feed(b)
+ if r is not None:
+ results.append(r)
+ return results, parser
+
+
+def _build_telem_frame(type_code: int, payload: bytes) -> bytes:
+ """Build a raw telemetry frame (same framing as command encoder)."""
+ return _build_frame(type_code, payload)
+
+
+# ═══════════════════════════════════════════════════════════════════
+# CRC16 tests
+# ═══════════════════════════════════════════════════════════════════
+
+class TestCrc16:
+ def test_known_vector_empty(self):
+ """CRC of empty data must equal 0xFFFF (init value)."""
+ assert _crc16_ccitt(b"") == 0xFFFF
+
+ def test_known_vector_123456789(self):
+ """Classic CCITT-16 test vector: '123456789' → 0x29B1."""
+ assert _crc16_ccitt(b"123456789") == 0x29B1
+
+ def test_crc_changes_on_bit_flip(self):
+ data = b"\x02\x04\x42"
+ flipped = b"\x02\x04\x43"
+ assert _crc16_ccitt(data) != _crc16_ccitt(flipped)
+
+ def test_crc_all_zeros(self):
+ crc = _crc16_ccitt(bytes(16))
+ assert isinstance(crc, int)
+ assert 0 <= crc <= 0xFFFF
+
+ def test_crc_single_byte(self):
+ c1 = _crc16_ccitt(b"\x01")
+ c2 = _crc16_ccitt(b"\x02")
+ assert c1 != c2
+
+
+# ═══════════════════════════════════════════════════════════════════
+# Encoder tests
+# ═══════════════════════════════════════════════════════════════════
+
+class TestEncoders:
+ def test_heartbeat_structure(self):
+ frame = encode_heartbeat()
+ # STX TYPE LEN CRC_HI CRC_LO ETX (no payload → 6 bytes)
+ assert len(frame) == 6
+ assert frame[0] == STX
+ assert frame[1] == CmdType.HEARTBEAT
+ assert frame[2] == 0 # LEN=0
+ assert frame[-1] == ETX
+
+ def test_heartbeat_crc_valid(self):
+ frame = encode_heartbeat()
+ crc_expected = _crc16_ccitt(bytes([CmdType.HEARTBEAT, 0]))
+ crc_actual = struct.unpack(">H", frame[3:5])[0]
+ assert crc_actual == crc_expected
+
+ def test_speed_steer_structure(self):
+ frame = encode_speed_steer(300, -150)
+ assert frame[0] == STX
+ assert frame[1] == CmdType.SPEED_STEER
+ assert frame[2] == 4 # LEN=4 (two int16)
+ assert frame[-1] == ETX
+ assert len(frame) == 10 # 3 header + 4 payload + 2 CRC + 1 ETX
+
+ def test_speed_steer_payload_values(self):
+ frame = encode_speed_steer(500, -200)
+ speed, steer = struct.unpack(">hh", frame[3:7])
+ assert speed == 500
+ assert steer == -200
+
+ def test_speed_steer_clamped_max(self):
+ frame = encode_speed_steer(9999, 9999)
+ speed, steer = struct.unpack(">hh", frame[3:7])
+ assert speed == 1000
+ assert steer == 1000
+
+ def test_speed_steer_clamped_min(self):
+ frame = encode_speed_steer(-9999, -9999)
+ speed, steer = struct.unpack(">hh", frame[3:7])
+ assert speed == -1000
+ assert steer == -1000
+
+ def test_speed_steer_zero(self):
+ frame = encode_speed_steer(0, 0)
+ speed, steer = struct.unpack(">hh", frame[3:7])
+ assert speed == 0
+ assert steer == 0
+
+ def test_arm_true(self):
+ frame = encode_arm(True)
+ assert frame[1] == CmdType.ARM
+ assert frame[2] == 1 # LEN=1
+ assert frame[3] == 1 # payload = 1 (arm)
+
+ def test_arm_false(self):
+ frame = encode_arm(False)
+ assert frame[3] == 0 # payload = 0 (disarm)
+
+ def test_set_mode(self):
+ frame = encode_set_mode(3)
+ assert frame[1] == CmdType.SET_MODE
+ assert frame[3] == 3
+
+ def test_set_mode_masked(self):
+ frame = encode_set_mode(0x1FF)
+ assert frame[3] == 0xFF # masked to uint8
+
+ def test_pid_update_structure(self):
+ frame = encode_pid_update(1.5, 0.01, 0.1)
+ assert frame[1] == CmdType.PID_UPDATE
+ assert frame[2] == 12 # LEN=12 (three float32)
+ kp, ki, kd = struct.unpack(">fff", frame[3:15])
+ assert kp == pytest.approx(1.5)
+ assert ki == pytest.approx(0.01)
+ assert kd == pytest.approx(0.1)
+
+ def test_pid_update_zero(self):
+ frame = encode_pid_update(0.0, 0.0, 0.0)
+ kp, ki, kd = struct.unpack(">fff", frame[3:15])
+ assert kp == pytest.approx(0.0)
+
+ def test_all_frames_end_with_etx(self):
+ frames = [
+ encode_heartbeat(),
+ encode_speed_steer(100, -100),
+ encode_arm(True),
+ encode_set_mode(1),
+ encode_pid_update(2.0, 0.1, 0.05),
+ ]
+ for f in frames:
+ assert f[-1] == ETX, f"Frame for type 0x{f[1]:02X} doesn't end with ETX"
+
+ def test_all_frames_start_with_stx(self):
+ for f in [encode_heartbeat(), encode_speed_steer(0, 0)]:
+ assert f[0] == STX
+
+
+# ═══════════════════════════════════════════════════════════════════
+# FrameParser tests
+# ═══════════════════════════════════════════════════════════════════
+
+class TestFrameParser:
+
+ # ── IMU telemetry ─────────────────────────────────────────────────────────
+
+ def test_decode_imu_frame(self):
+ payload = struct.pack(">hhhhhh",
+ 1250, -300, 900, # pitch×100, roll×100, yaw×100
+ 981, 0, -50, # ax×100, ay×100, az×100
+ )
+ raw = _build_telem_frame(TelType.IMU, payload)
+ results, parser = _parse_all(raw)
+ assert len(results) == 1
+ f = results[0]
+ assert isinstance(f, ImuFrame)
+ assert f.pitch_deg == pytest.approx(12.5)
+ assert f.roll_deg == pytest.approx(-3.0)
+ assert f.yaw_deg == pytest.approx(9.0)
+ assert f.accel_x == pytest.approx(9.81)
+ assert f.accel_y == pytest.approx(0.0)
+ assert f.accel_z == pytest.approx(-0.5)
+
+ def test_decode_imu_zero(self):
+ payload = struct.pack(">hhhhhh", 0, 0, 0, 0, 0, 0)
+ raw = _build_telem_frame(TelType.IMU, payload)
+ results, _ = _parse_all(raw)
+ f = results[0]
+ assert f.pitch_deg == pytest.approx(0.0)
+ assert f.accel_z == pytest.approx(0.0)
+
+ # ── Battery telemetry ─────────────────────────────────────────────────────
+
+ def test_decode_battery_frame(self):
+ payload = struct.pack(">HhB", 11100, -500, 72) # 11.1V, -500mA (charging), 72%
+ raw = _build_telem_frame(TelType.BATTERY, payload)
+ results, _ = _parse_all(raw)
+ assert len(results) == 1
+ f = results[0]
+ assert isinstance(f, BatteryFrame)
+ assert f.voltage_mv == 11100
+ assert f.current_ma == -500
+ assert f.soc_pct == 72
+
+ def test_decode_battery_discharging(self):
+ payload = struct.pack(">HhB", 10200, 2000, 45) # 10.2V, 2A draw, 45%
+ raw = _build_telem_frame(TelType.BATTERY, payload)
+ results, _ = _parse_all(raw)
+ f = results[0]
+ assert f.current_ma == 2000 # positive = discharging
+
+ # ── Motor RPM telemetry ───────────────────────────────────────────────────
+
+ def test_decode_motor_rpm(self):
+ payload = struct.pack(">hh", 1500, -1500) # left=1500, right=-1500 (spinning in place)
+ raw = _build_telem_frame(TelType.MOTOR_RPM, payload)
+ results, _ = _parse_all(raw)
+ f = results[0]
+ assert isinstance(f, MotorRpmFrame)
+ assert f.left_rpm == 1500
+ assert f.right_rpm == -1500
+
+ def test_decode_motor_rpm_zero(self):
+ payload = struct.pack(">hh", 0, 0)
+ raw = _build_telem_frame(TelType.MOTOR_RPM, payload)
+ results, _ = _parse_all(raw)
+ assert results[0].left_rpm == 0
+
+ # ── Arm state telemetry ───────────────────────────────────────────────────
+
+ def test_decode_arm_state_armed(self):
+ payload = struct.pack("BB", 1, 0) # ARMED, no error flags
+ raw = _build_telem_frame(TelType.ARM_STATE, payload)
+ results, _ = _parse_all(raw)
+ f = results[0]
+ assert isinstance(f, ArmStateFrame)
+ assert f.state == 1
+ assert f.error_flags == 0
+
+ def test_decode_arm_state_tilt_fault(self):
+ payload = struct.pack("BB", 2, 0x04) # TILT_FAULT, bit 2 set
+ raw = _build_telem_frame(TelType.ARM_STATE, payload)
+ results, _ = _parse_all(raw)
+ f = results[0]
+ assert f.state == 2
+ assert f.error_flags == 0x04
+
+ # ── Error telemetry ───────────────────────────────────────────────────────
+
+ def test_decode_error_frame(self):
+ payload = struct.pack("BB", 0x0A, 0x01) # error 0x0A, sub 0x01
+ raw = _build_telem_frame(TelType.ERROR, payload)
+ results, _ = _parse_all(raw)
+ f = results[0]
+ assert isinstance(f, ErrorFrame)
+ assert f.error_code == 0x0A
+ assert f.subcode == 0x01
+
+ # ── CRC errors ────────────────────────────────────────────────────────────
+
+ def test_bad_crc_rejected(self):
+ raw = bytearray(_build_telem_frame(TelType.BATTERY,
+ struct.pack(">HhB", 11000, 0, 80)))
+ raw[-3] ^= 0xFF # flip CRC_HI byte
+ results, parser = _parse_all(bytes(raw))
+ assert results == []
+ assert parser.frames_error == 1
+ assert parser.frames_ok == 0
+
+ def test_bad_crc_both_bytes(self):
+ raw = bytearray(_build_telem_frame(TelType.MOTOR_RPM,
+ struct.pack(">hh", 100, 100)))
+ raw[-3] ^= 0xAA
+ raw[-2] ^= 0x55
+ results, parser = _parse_all(bytes(raw))
+ assert results == []
+ assert parser.frames_error >= 1
+
+ # ── ETX errors ────────────────────────────────────────────────────────────
+
+ def test_bad_etx_rejected(self):
+ raw = bytearray(_build_telem_frame(TelType.ARM_STATE,
+ struct.pack("BB", 1, 0)))
+ raw[-1] = 0xFF # corrupt ETX
+ results, parser = _parse_all(bytes(raw))
+ assert results == []
+ assert parser.frames_error == 1
+
+ # ── Resync after corrupt data ─────────────────────────────────────────────
+
+ def test_resync_after_garbage(self):
+ """Parser must resync and decode a valid frame after leading garbage."""
+ garbage = b"\xDE\xAD\xBE\xEF\xFF\x00\x42"
+ valid = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
+ results, parser = _parse_all(garbage + valid)
+ assert len(results) == 1
+ assert isinstance(results[0], ArmStateFrame)
+
+ def test_two_consecutive_frames(self):
+ """Parser must handle two back-to-back valid frames."""
+ f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
+ f2 = _build_telem_frame(TelType.MOTOR_RPM, struct.pack(">hh", 500, 500))
+ results, parser = _parse_all(f1 + f2)
+ assert len(results) == 2
+ assert isinstance(results[0], ArmStateFrame)
+ assert isinstance(results[1], MotorRpmFrame)
+ assert parser.frames_error == 0
+
+ def test_three_frames_with_garbage_between(self):
+ """Corrupt bytes between valid frames must not discard subsequent valid frames."""
+ f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
+ f2 = _build_telem_frame(TelType.BATTERY, struct.pack(">HhB", 11000, 0, 80))
+ junk = b"\xDE\xAD"
+ results, parser = _parse_all(f1 + junk + f2)
+ assert len(results) == 2
+
+ def test_unknown_type_returns_raw_tuple(self):
+ """Unknown telemetry type must be returned as (type_code, payload) tuple."""
+ raw = _build_telem_frame(0x99, b"\xAB\xCD")
+ results, _ = _parse_all(raw)
+ assert len(results) == 1
+ type_code, payload = results[0]
+ assert type_code == 0x99
+ assert payload == b"\xAB\xCD"
+
+ def test_oversized_payload_rejected(self):
+ """A frame claiming LEN > MAX_PAYLOAD_LEN must be silently skipped."""
+ # Craft a raw frame with LEN=200 (> 64 limit)
+ raw = bytes([STX, TelType.IMU, 200])
+ results, parser = _parse_all(raw)
+ assert results == []
+ assert parser.frames_error == 1
+
+ def test_empty_payload_frame(self):
+ """A frame with LEN=0 (heartbeat style) must decode correctly."""
+ raw = _build_frame(CmdType.HEARTBEAT, b"")
+ results, parser = _parse_all(raw)
+ assert len(results) == 1
+ type_code, payload = results[0]
+ assert type_code == CmdType.HEARTBEAT
+ assert payload == b""
+ assert parser.frames_error == 0
+
+ # ── Streaming byte-by-byte ────────────────────────────────────────────────
+
+ def test_stream_one_byte_at_a_time(self):
+ """Every byte fed individually must produce the same result as bulk."""
+ payload = struct.pack(">HhB", 12000, -100, 90)
+ raw = _build_telem_frame(TelType.BATTERY, payload)
+ parser = FrameParser()
+ results = []
+ for b in raw:
+ r = parser.feed(b)
+ if r is not None:
+ results.append(r)
+ assert len(results) == 1
+ f = results[0]
+ assert isinstance(f, BatteryFrame)
+ assert f.soc_pct == 90
+
+ # ── Parser reset ──────────────────────────────────────────────────────────
+
+ def test_parser_reset_clears_state(self):
+ """After reset(), parser must accept a new valid frame."""
+ payload = struct.pack("BB", 0, 0)
+ raw = _build_telem_frame(TelType.ARM_STATE, payload)
+ parser = FrameParser()
+ # Feed partial frame
+ for b in raw[:3]:
+ parser.feed(b)
+ # Reset mid-frame
+ parser.reset()
+ # Feed full frame
+ results = []
+ for b in raw:
+ r = parser.feed(b)
+ if r is not None:
+ results.append(r)
+ assert len(results) == 1
+
+ def test_frames_ok_counter(self):
+ f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
+ f2 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 0, 0))
+ _, parser = _parse_all(f1 + f2)
+ assert parser.frames_ok == 2
+
+ def test_frames_error_counter(self):
+ raw = bytearray(_build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0)))
+ raw[-3] ^= 0xFF # corrupt CRC
+ _, parser = _parse_all(bytes(raw))
+ assert parser.frames_error == 1
+
+
+# ═══════════════════════════════════════════════════════════════════
+# Decode edge cases
+# ═══════════════════════════════════════════════════════════════════
+
+class TestDecodeEdgeCases:
+ def test_imu_short_payload_returns_none(self):
+ result = _decode_telemetry(TelType.IMU, b"\x00\x01") # only 2 bytes, need 12
+ assert result is None
+
+ def test_battery_short_payload_returns_none(self):
+ result = _decode_telemetry(TelType.BATTERY, b"\x00")
+ assert result is None
+
+ def test_motor_rpm_short_payload_returns_none(self):
+ result = _decode_telemetry(TelType.MOTOR_RPM, b"\x00\x01")
+ assert result is None
+
+ def test_arm_state_short_payload_returns_none(self):
+ result = _decode_telemetry(TelType.ARM_STATE, b"\x01")
+ assert result is None
+
+ def test_error_short_payload_returns_none(self):
+ result = _decode_telemetry(TelType.ERROR, b"\x01")
+ assert result is None
+
+
+# ═══════════════════════════════════════════════════════════════════
+# Round-trip tests (encode command → re-parse frame body)
+# ═══════════════════════════════════════════════════════════════════
+
+class TestRoundTrip:
+ """Verify encoder output satisfies parser expectations (same framing)."""
+
+ def _parse_cmd_as_telem(self, frame: bytes):
+ """Feed a command frame through FrameParser (type codes overlap logic)."""
+ results, parser = _parse_all(frame)
+ return results, parser
+
+ def test_heartbeat_roundtrip(self):
+ frame = encode_heartbeat()
+ results, parser = self._parse_cmd_as_telem(frame)
+ assert len(results) == 1
+ assert parser.frames_error == 0
+
+ def test_speed_steer_roundtrip(self):
+ frame = encode_speed_steer(750, -300)
+ results, _ = self._parse_cmd_as_telem(frame)
+ assert len(results) == 1
+ type_code, payload = results[0]
+ assert type_code == CmdType.SPEED_STEER
+ speed, steer = struct.unpack(">hh", payload)
+ assert speed == 750
+ assert steer == -300
+
+ def test_pid_roundtrip(self):
+ frame = encode_pid_update(2.5, 0.05, 0.08)
+ results, _ = self._parse_cmd_as_telem(frame)
+ type_code, payload = results[0]
+ kp, ki, kd = struct.unpack(">fff", payload)
+ assert kp == pytest.approx(2.5, rel=1e-5)
+ assert ki == pytest.approx(0.05, rel=1e-5)
+ assert kd == pytest.approx(0.08, rel=1e-5)