Merge pull request 'feat(bridge): binary STM32 command protocol (Issue #119)' (#128) from sl-jetson/issue-119-cmd-protocol into main
This commit is contained in:
commit
5cfacdda3f
@ -0,0 +1,30 @@
|
||||
# stm32_cmd_params.yaml — Configuration for stm32_cmd_node (Issue #119)
|
||||
# Binary-framed Jetson↔STM32 bridge at 921600 baud.
|
||||
|
||||
# ── Serial port ────────────────────────────────────────────────────────────────
|
||||
# Use /dev/stm32-bridge if the udev rule is applied:
|
||||
# SUBSYSTEM=="tty", ATTRS{idVendor}=="0483", ATTRS{idProduct}=="5740",
|
||||
# SYMLINK+="stm32-bridge", MODE="0660", GROUP="dialout"
|
||||
serial_port: /dev/ttyACM0
|
||||
baud_rate: 921600
|
||||
reconnect_delay: 2.0 # seconds between USB reconnect attempts
|
||||
|
||||
# ── Heartbeat ─────────────────────────────────────────────────────────────────
|
||||
# HEARTBEAT frame sent every heartbeat_period seconds.
|
||||
# STM32 fires watchdog and reverts to safe state if no frame received for 500ms.
|
||||
heartbeat_period: 0.2 # 200ms → well within 500ms STM32 watchdog
|
||||
|
||||
# ── Watchdog (Jetson-side) ────────────────────────────────────────────────────
|
||||
# If no /cmd_vel message received for watchdog_timeout seconds,
|
||||
# send SPEED_STEER(0,0) to stop the robot.
|
||||
watchdog_timeout: 0.5 # 500ms
|
||||
|
||||
# ── Twist velocity scaling ────────────────────────────────────────────────────
|
||||
# speed = clamp(linear.x * speed_scale, -1000, 1000) (m/s → ESC units)
|
||||
# steer = clamp(angular.z * steer_scale, -1000, 1000) (rad/s → ESC units)
|
||||
#
|
||||
# Default: 1 m/s → 1000 ESC units, ±2 rad/s → ±1000 steer.
|
||||
# Negative steer_scale flips ROS2 CCW+ convention to match ESC steer direction.
|
||||
# Tune speed_scale to set the physical top speed.
|
||||
speed_scale: 1000.0
|
||||
steer_scale: -500.0
|
||||
@ -0,0 +1,52 @@
|
||||
"""stm32_cmd.launch.py — Launch the binary-framed STM32 command node (Issue #119).
|
||||
|
||||
Usage:
|
||||
# Default (binary protocol, bidirectional):
|
||||
ros2 launch saltybot_bridge stm32_cmd.launch.py
|
||||
|
||||
# Override serial port:
|
||||
ros2 launch saltybot_bridge stm32_cmd.launch.py serial_port:=/dev/ttyACM1
|
||||
|
||||
# Custom velocity scales:
|
||||
ros2 launch saltybot_bridge stm32_cmd.launch.py speed_scale:=800.0 steer_scale:=-400.0
|
||||
"""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description() -> LaunchDescription:
|
||||
pkg = get_package_share_directory("saltybot_bridge")
|
||||
params_file = os.path.join(pkg, "config", "stm32_cmd_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument("serial_port", default_value="/dev/ttyACM0"),
|
||||
DeclareLaunchArgument("baud_rate", default_value="921600"),
|
||||
DeclareLaunchArgument("speed_scale", default_value="1000.0"),
|
||||
DeclareLaunchArgument("steer_scale", default_value="-500.0"),
|
||||
DeclareLaunchArgument("watchdog_timeout", default_value="0.5"),
|
||||
DeclareLaunchArgument("heartbeat_period", default_value="0.2"),
|
||||
|
||||
Node(
|
||||
package="saltybot_bridge",
|
||||
executable="stm32_cmd_node",
|
||||
name="stm32_cmd_node",
|
||||
output="screen",
|
||||
emulate_tty=True,
|
||||
parameters=[
|
||||
params_file,
|
||||
{
|
||||
"serial_port": LaunchConfiguration("serial_port"),
|
||||
"baud_rate": LaunchConfiguration("baud_rate"),
|
||||
"speed_scale": LaunchConfiguration("speed_scale"),
|
||||
"steer_scale": LaunchConfiguration("steer_scale"),
|
||||
"watchdog_timeout": LaunchConfiguration("watchdog_timeout"),
|
||||
"heartbeat_period": LaunchConfiguration("heartbeat_period"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
@ -5,8 +5,11 @@
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
STM32F722 USB CDC serial bridge for saltybot.
|
||||
Reads JSON telemetry from the STM32 flight controller and publishes
|
||||
sensor_msgs/Imu, std_msgs/String (balance state), and diagnostic_msgs/DiagnosticArray.
|
||||
serial_bridge_node: JSON telemetry RX → sensor_msgs/Imu + diagnostics.
|
||||
stm32_cmd_node (Issue #119): binary-framed protocol — STX/TYPE/LEN/CRC16/ETX,
|
||||
commands: HEARTBEAT, SPEED_STEER, ARM, SET_MODE, PID_UPDATE;
|
||||
telemetry: IMU, BATTERY, MOTOR_RPM, ARM_STATE, ERROR; watchdog 500ms.
|
||||
battery_node (Issue #125): SoC tracking, threshold alerts, SQLite history.
|
||||
</description>
|
||||
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
|
||||
<license>MIT</license>
|
||||
@ -15,6 +18,8 @@
|
||||
<depend>python3-paho-mqtt</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>std_srvs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>diagnostic_msgs</depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
|
||||
@ -0,0 +1,483 @@
|
||||
"""stm32_cmd_node.py — Full bidirectional binary-framed STM32↔Jetson bridge.
|
||||
|
||||
Issue #119: replaces the ASCII-protocol saltybot_cmd_node with a robust binary
|
||||
framing protocol (STX/TYPE/LEN/PAYLOAD/CRC16/ETX) at 921600 baud.
|
||||
|
||||
TX commands (Jetson → STM32):
|
||||
SPEED_STEER — 50 Hz from /cmd_vel subscription
|
||||
HEARTBEAT — 200 ms timer (STM32 watchdog fires at 500 ms)
|
||||
ARM — via /saltybot/arm service
|
||||
SET_MODE — via /saltybot/set_mode service
|
||||
PID_UPDATE — via /saltybot/pid_update topic
|
||||
|
||||
Watchdog: if /cmd_vel is silent for 500 ms, send SPEED_STEER(0,0) and log warning.
|
||||
|
||||
RX telemetry (STM32 → Jetson):
|
||||
IMU → /saltybot/imu (sensor_msgs/Imu)
|
||||
BATTERY → /saltybot/telemetry/battery (std_msgs/String JSON)
|
||||
MOTOR_RPM → /saltybot/telemetry/motor_rpm (std_msgs/String JSON)
|
||||
ARM_STATE → /saltybot/arm_state (std_msgs/String JSON)
|
||||
ERROR → /saltybot/error (std_msgs/String JSON)
|
||||
All frames → /diagnostics (diagnostic_msgs/DiagnosticArray)
|
||||
|
||||
Auto-reconnect: USB disconnect is detected when serial.read() raises; node
|
||||
continuously retries at reconnect_delay interval.
|
||||
|
||||
This node owns /dev/ttyACM0 exclusively — do NOT run alongside
|
||||
serial_bridge_node or saltybot_cmd_node on the same port.
|
||||
|
||||
Parameters (config/stm32_cmd_params.yaml):
|
||||
serial_port /dev/ttyACM0
|
||||
baud_rate 921600
|
||||
reconnect_delay 2.0 (seconds)
|
||||
heartbeat_period 0.2 (seconds)
|
||||
watchdog_timeout 0.5 (seconds — no /cmd_vel → send zero-speed)
|
||||
speed_scale 1000.0 (linear.x m/s → ESC units)
|
||||
steer_scale -500.0 (angular.z rad/s → ESC units, neg to flip convention)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
import threading
|
||||
import time
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||
|
||||
import serial
|
||||
|
||||
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
|
||||
from geometry_msgs.msg import Twist
|
||||
from sensor_msgs.msg import Imu
|
||||
from std_msgs.msg import String
|
||||
from std_srvs.srv import SetBool, Trigger
|
||||
|
||||
from .stm32_protocol import (
|
||||
FrameParser,
|
||||
ImuFrame, BatteryFrame, MotorRpmFrame, ArmStateFrame, ErrorFrame,
|
||||
encode_heartbeat, encode_speed_steer, encode_arm, encode_set_mode,
|
||||
encode_pid_update,
|
||||
)
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────────
|
||||
|
||||
IMU_FRAME_ID = "imu_link"
|
||||
_ARM_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"}
|
||||
|
||||
|
||||
def _clamp(v: float, lo: float, hi: float) -> float:
|
||||
return max(lo, min(hi, v))
|
||||
|
||||
|
||||
# ── Node ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
class Stm32CmdNode(Node):
|
||||
"""Binary-framed Jetson↔STM32 bridge node."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("stm32_cmd_node")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter("serial_port", "/dev/ttyACM0")
|
||||
self.declare_parameter("baud_rate", 921600)
|
||||
self.declare_parameter("reconnect_delay", 2.0)
|
||||
self.declare_parameter("heartbeat_period", 0.2)
|
||||
self.declare_parameter("watchdog_timeout", 0.5)
|
||||
self.declare_parameter("speed_scale", 1000.0)
|
||||
self.declare_parameter("steer_scale", -500.0)
|
||||
|
||||
port = self.get_parameter("serial_port").value
|
||||
baud = self.get_parameter("baud_rate").value
|
||||
self._reconnect_delay = self.get_parameter("reconnect_delay").value
|
||||
self._hb_period = self.get_parameter("heartbeat_period").value
|
||||
self._wd_timeout = self.get_parameter("watchdog_timeout").value
|
||||
self._speed_scale = self.get_parameter("speed_scale").value
|
||||
self._steer_scale = self.get_parameter("steer_scale").value
|
||||
|
||||
# ── QoS ───────────────────────────────────────────────────────────────
|
||||
sensor_qos = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST, depth=10,
|
||||
)
|
||||
rel_qos = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST, depth=10,
|
||||
)
|
||||
|
||||
# ── Publishers ────────────────────────────────────────────────────────
|
||||
self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos)
|
||||
self._arm_pub = self.create_publisher(String, "/saltybot/arm_state", rel_qos)
|
||||
self._error_pub = self.create_publisher(String, "/saltybot/error", rel_qos)
|
||||
self._battery_pub = self.create_publisher(String, "/saltybot/telemetry/battery", rel_qos)
|
||||
self._rpm_pub = self.create_publisher(String, "/saltybot/telemetry/motor_rpm", rel_qos)
|
||||
self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", rel_qos)
|
||||
|
||||
# ── Subscribers ───────────────────────────────────────────────────────
|
||||
self._cmd_vel_sub = self.create_subscription(
|
||||
Twist, "/cmd_vel", self._on_cmd_vel, rel_qos,
|
||||
)
|
||||
self._pid_sub = self.create_subscription(
|
||||
String, "/saltybot/pid_update", self._on_pid_update, rel_qos,
|
||||
)
|
||||
|
||||
# ── Services ──────────────────────────────────────────────────────────
|
||||
self._arm_srv = self.create_service(SetBool, "/saltybot/arm", self._svc_arm)
|
||||
self._mode_srv = self.create_service(SetBool, "/saltybot/set_mode", self._svc_set_mode)
|
||||
|
||||
# ── Serial state ──────────────────────────────────────────────────────
|
||||
self._port_name = port
|
||||
self._baud = baud
|
||||
self._ser: serial.Serial | None = None
|
||||
self._ser_lock = threading.Lock()
|
||||
self._parser = FrameParser()
|
||||
|
||||
# ── TX state ──────────────────────────────────────────────────────────
|
||||
self._last_speed = 0
|
||||
self._last_steer = 0
|
||||
self._last_cmd_t = time.monotonic()
|
||||
self._watchdog_sent = False # tracks whether we already sent zero
|
||||
|
||||
# ── Diagnostics state ──────────────────────────────────────────────────
|
||||
self._last_arm_state = -1
|
||||
self._last_battery_mv = 0
|
||||
self._rx_frame_count = 0
|
||||
|
||||
# ── Open serial and start timers ──────────────────────────────────────
|
||||
self._open_serial()
|
||||
|
||||
# Read at 200 Hz (serial RX thread is better, but timer keeps ROS2 integration clean)
|
||||
self._read_timer = self.create_timer(0.005, self._read_cb)
|
||||
# Heartbeat TX
|
||||
self._hb_timer = self.create_timer(self._hb_period, self._heartbeat_cb)
|
||||
# Watchdog check (fires at 2× watchdog_timeout for quick detection)
|
||||
self._wd_timer = self.create_timer(self._wd_timeout / 2, self._watchdog_cb)
|
||||
# Periodic diagnostics
|
||||
self._diag_timer = self.create_timer(1.0, self._publish_diagnostics)
|
||||
|
||||
self.get_logger().info(
|
||||
f"stm32_cmd_node started — {port} @ {baud} baud | "
|
||||
f"HB {int(self._hb_period * 1000)}ms | WD {int(self._wd_timeout * 1000)}ms"
|
||||
)
|
||||
|
||||
# ── Serial management ─────────────────────────────────────────────────────
|
||||
|
||||
def _open_serial(self) -> bool:
|
||||
with self._ser_lock:
|
||||
try:
|
||||
self._ser = serial.Serial(
|
||||
port=self._port_name,
|
||||
baudrate=self._baud,
|
||||
timeout=0.005, # non-blocking reads
|
||||
write_timeout=0.1,
|
||||
)
|
||||
self._ser.reset_input_buffer()
|
||||
self._parser.reset()
|
||||
self.get_logger().info(f"Serial open: {self._port_name}")
|
||||
return True
|
||||
except serial.SerialException as exc:
|
||||
self.get_logger().error(
|
||||
f"Cannot open {self._port_name}: {exc}",
|
||||
throttle_duration_sec=self._reconnect_delay,
|
||||
)
|
||||
self._ser = None
|
||||
return False
|
||||
|
||||
def _close_serial(self) -> None:
|
||||
with self._ser_lock:
|
||||
if self._ser and self._ser.is_open:
|
||||
try:
|
||||
self._ser.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._ser = None
|
||||
|
||||
def _write(self, data: bytes) -> bool:
|
||||
"""Thread-safe serial write. Returns False if port is not open."""
|
||||
with self._ser_lock:
|
||||
if self._ser is None or not self._ser.is_open:
|
||||
return False
|
||||
try:
|
||||
self._ser.write(data)
|
||||
return True
|
||||
except serial.SerialException as exc:
|
||||
self.get_logger().error(f"Serial write error: {exc}")
|
||||
self._ser = None
|
||||
return False
|
||||
|
||||
# ── RX — read callback ────────────────────────────────────────────────────
|
||||
|
||||
def _read_cb(self) -> None:
|
||||
"""Read bytes from serial and feed them to the frame parser."""
|
||||
raw: bytes | None = None
|
||||
reconnect_needed = False
|
||||
|
||||
with self._ser_lock:
|
||||
if self._ser is None or not self._ser.is_open:
|
||||
reconnect_needed = True
|
||||
else:
|
||||
try:
|
||||
n = self._ser.in_waiting
|
||||
if n:
|
||||
raw = self._ser.read(n)
|
||||
except serial.SerialException as exc:
|
||||
self.get_logger().error(f"Serial read error: {exc}")
|
||||
self._ser = None
|
||||
reconnect_needed = True
|
||||
|
||||
if reconnect_needed:
|
||||
self.get_logger().warn(
|
||||
"Serial disconnected — will retry",
|
||||
throttle_duration_sec=self._reconnect_delay,
|
||||
)
|
||||
time.sleep(self._reconnect_delay)
|
||||
self._open_serial()
|
||||
return
|
||||
|
||||
if not raw:
|
||||
return
|
||||
|
||||
for byte in raw:
|
||||
frame = self._parser.feed(byte)
|
||||
if frame is not None:
|
||||
self._rx_frame_count += 1
|
||||
self._dispatch_frame(frame)
|
||||
|
||||
def _dispatch_frame(self, frame) -> None:
|
||||
"""Route a decoded frame to the appropriate publisher."""
|
||||
now = self.get_clock().now().to_msg()
|
||||
|
||||
if isinstance(frame, ImuFrame):
|
||||
self._publish_imu(frame, now)
|
||||
|
||||
elif isinstance(frame, BatteryFrame):
|
||||
self._publish_battery(frame, now)
|
||||
|
||||
elif isinstance(frame, MotorRpmFrame):
|
||||
self._publish_motor_rpm(frame, now)
|
||||
|
||||
elif isinstance(frame, ArmStateFrame):
|
||||
self._publish_arm_state(frame, now)
|
||||
|
||||
elif isinstance(frame, ErrorFrame):
|
||||
self._publish_error(frame, now)
|
||||
|
||||
elif isinstance(frame, tuple):
|
||||
type_code, payload = frame
|
||||
self.get_logger().debug(
|
||||
f"Unknown telemetry type 0x{type_code:02X} len={len(payload)}"
|
||||
)
|
||||
|
||||
# ── Telemetry publishers ──────────────────────────────────────────────────
|
||||
|
||||
def _publish_imu(self, frame: ImuFrame, stamp) -> None:
|
||||
msg = Imu()
|
||||
msg.header.stamp = stamp
|
||||
msg.header.frame_id = IMU_FRAME_ID
|
||||
|
||||
# orientation: unknown — signal with -1 in first covariance
|
||||
msg.orientation_covariance[0] = -1.0
|
||||
|
||||
msg.angular_velocity.x = math.radians(frame.pitch_deg)
|
||||
msg.angular_velocity.y = math.radians(frame.roll_deg)
|
||||
msg.angular_velocity.z = math.radians(frame.yaw_deg)
|
||||
cov = math.radians(0.3) ** 2 # ±0.3° noise estimate from STM32 BMI088
|
||||
msg.angular_velocity_covariance[0] = cov
|
||||
msg.angular_velocity_covariance[4] = cov
|
||||
msg.angular_velocity_covariance[8] = cov
|
||||
|
||||
msg.linear_acceleration.x = frame.accel_x
|
||||
msg.linear_acceleration.y = frame.accel_y
|
||||
msg.linear_acceleration.z = frame.accel_z
|
||||
acov = 0.05 ** 2 # ±0.05 m/s² noise
|
||||
msg.linear_acceleration_covariance[0] = acov
|
||||
msg.linear_acceleration_covariance[4] = acov
|
||||
msg.linear_acceleration_covariance[8] = acov
|
||||
|
||||
self._imu_pub.publish(msg)
|
||||
|
||||
def _publish_battery(self, frame: BatteryFrame, stamp) -> None:
|
||||
payload = {
|
||||
"voltage_v": round(frame.voltage_mv / 1000.0, 3),
|
||||
"voltage_mv": frame.voltage_mv,
|
||||
"current_ma": frame.current_ma,
|
||||
"soc_pct": frame.soc_pct,
|
||||
"charging": frame.current_ma < -100,
|
||||
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
|
||||
}
|
||||
self._last_battery_mv = frame.voltage_mv
|
||||
msg = String()
|
||||
msg.data = json.dumps(payload)
|
||||
self._battery_pub.publish(msg)
|
||||
|
||||
def _publish_motor_rpm(self, frame: MotorRpmFrame, stamp) -> None:
|
||||
payload = {
|
||||
"left_rpm": frame.left_rpm,
|
||||
"right_rpm": frame.right_rpm,
|
||||
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
|
||||
}
|
||||
msg = String()
|
||||
msg.data = json.dumps(payload)
|
||||
self._rpm_pub.publish(msg)
|
||||
|
||||
def _publish_arm_state(self, frame: ArmStateFrame, stamp) -> None:
|
||||
label = _ARM_LABEL.get(frame.state, f"UNKNOWN({frame.state})")
|
||||
if frame.state != self._last_arm_state:
|
||||
self.get_logger().info(f"Arm state → {label} (flags=0x{frame.error_flags:02X})")
|
||||
self._last_arm_state = frame.state
|
||||
|
||||
payload = {
|
||||
"state": frame.state,
|
||||
"state_label": label,
|
||||
"error_flags": frame.error_flags,
|
||||
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
|
||||
}
|
||||
msg = String()
|
||||
msg.data = json.dumps(payload)
|
||||
self._arm_pub.publish(msg)
|
||||
|
||||
def _publish_error(self, frame: ErrorFrame, stamp) -> None:
|
||||
self.get_logger().error(
|
||||
f"STM32 error code=0x{frame.error_code:02X} sub=0x{frame.subcode:02X}"
|
||||
)
|
||||
payload = {
|
||||
"error_code": frame.error_code,
|
||||
"subcode": frame.subcode,
|
||||
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
|
||||
}
|
||||
msg = String()
|
||||
msg.data = json.dumps(payload)
|
||||
self._error_pub.publish(msg)
|
||||
|
||||
# ── TX — command send ─────────────────────────────────────────────────────
|
||||
|
||||
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||
"""Convert /cmd_vel Twist to SPEED_STEER frame at up to 50 Hz."""
|
||||
speed = int(_clamp(msg.linear.x * self._speed_scale, -1000.0, 1000.0))
|
||||
steer = int(_clamp(msg.angular.z * self._steer_scale, -1000.0, 1000.0))
|
||||
self._last_speed = speed
|
||||
self._last_steer = steer
|
||||
self._last_cmd_t = time.monotonic()
|
||||
self._watchdog_sent = False
|
||||
|
||||
frame = encode_speed_steer(speed, steer)
|
||||
if not self._write(frame):
|
||||
self.get_logger().warn(
|
||||
"SPEED_STEER dropped — serial not open",
|
||||
throttle_duration_sec=2.0,
|
||||
)
|
||||
|
||||
def _heartbeat_cb(self) -> None:
|
||||
"""Send HEARTBEAT every heartbeat_period (default 200ms)."""
|
||||
self._write(encode_heartbeat())
|
||||
|
||||
def _watchdog_cb(self) -> None:
|
||||
"""Send zero-speed if /cmd_vel silent for watchdog_timeout seconds."""
|
||||
if time.monotonic() - self._last_cmd_t >= self._wd_timeout:
|
||||
if not self._watchdog_sent:
|
||||
self.get_logger().warn(
|
||||
f"No /cmd_vel for {self._wd_timeout:.1f}s — sending zero-speed"
|
||||
)
|
||||
self._watchdog_sent = True
|
||||
self._last_speed = 0
|
||||
self._last_steer = 0
|
||||
self._write(encode_speed_steer(0, 0))
|
||||
|
||||
def _on_pid_update(self, msg: String) -> None:
|
||||
"""Parse JSON /saltybot/pid_update and send PID_UPDATE frame."""
|
||||
try:
|
||||
data = json.loads(msg.data)
|
||||
kp = float(data["kp"])
|
||||
ki = float(data["ki"])
|
||||
kd = float(data["kd"])
|
||||
except (ValueError, KeyError, json.JSONDecodeError) as exc:
|
||||
self.get_logger().error(f"Bad PID update JSON: {exc}")
|
||||
return
|
||||
frame = encode_pid_update(kp, ki, kd)
|
||||
if self._write(frame):
|
||||
self.get_logger().info(f"PID update: kp={kp}, ki={ki}, kd={kd}")
|
||||
else:
|
||||
self.get_logger().warn("PID_UPDATE dropped — serial not open")
|
||||
|
||||
# ── Services ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _svc_arm(self, request: SetBool.Request, response: SetBool.Response):
|
||||
"""SetBool(True) = arm, SetBool(False) = disarm."""
|
||||
arm = request.data
|
||||
frame = encode_arm(arm)
|
||||
ok = self._write(frame)
|
||||
response.success = ok
|
||||
response.message = ("ARMED" if arm else "DISARMED") if ok else "serial not open"
|
||||
self.get_logger().info(
|
||||
f"ARM service: {'arm' if arm else 'disarm'} — {'sent' if ok else 'FAILED'}"
|
||||
)
|
||||
return response
|
||||
|
||||
def _svc_set_mode(self, request: SetBool.Request, response: SetBool.Response):
|
||||
"""SetBool: data maps to mode byte (True=1, False=0)."""
|
||||
mode = 1 if request.data else 0
|
||||
frame = encode_set_mode(mode)
|
||||
ok = self._write(frame)
|
||||
response.success = ok
|
||||
response.message = f"mode={mode}" if ok else "serial not open"
|
||||
return response
|
||||
|
||||
# ── Diagnostics ───────────────────────────────────────────────────────────
|
||||
|
||||
def _publish_diagnostics(self) -> None:
|
||||
diag = DiagnosticArray()
|
||||
diag.header.stamp = self.get_clock().now().to_msg()
|
||||
|
||||
status = DiagnosticStatus()
|
||||
status.name = "saltybot/stm32_cmd_node"
|
||||
status.hardware_id = "stm32f722"
|
||||
|
||||
port_ok = self._ser is not None and self._ser.is_open
|
||||
if port_ok:
|
||||
status.level = DiagnosticStatus.OK
|
||||
status.message = "Serial OK"
|
||||
else:
|
||||
status.level = DiagnosticStatus.ERROR
|
||||
status.message = f"Serial disconnected: {self._port_name}"
|
||||
|
||||
wd_age = time.monotonic() - self._last_cmd_t
|
||||
status.values = [
|
||||
KeyValue(key="serial_port", value=self._port_name),
|
||||
KeyValue(key="port_open", value=str(port_ok)),
|
||||
KeyValue(key="rx_frames", value=str(self._rx_frame_count)),
|
||||
KeyValue(key="rx_errors", value=str(self._parser.frames_error)),
|
||||
KeyValue(key="last_speed", value=str(self._last_speed)),
|
||||
KeyValue(key="last_steer", value=str(self._last_steer)),
|
||||
KeyValue(key="cmd_vel_age_s", value=f"{wd_age:.2f}"),
|
||||
KeyValue(key="battery_mv", value=str(self._last_battery_mv)),
|
||||
KeyValue(key="arm_state", value=_ARM_LABEL.get(self._last_arm_state, "?")),
|
||||
]
|
||||
diag.status.append(status)
|
||||
self._diag_pub.publish(diag)
|
||||
|
||||
# ── Lifecycle ─────────────────────────────────────────────────────────────
|
||||
|
||||
def destroy_node(self) -> None:
|
||||
# Send zero-speed + disarm on shutdown
|
||||
self._write(encode_speed_steer(0, 0))
|
||||
self._write(encode_arm(False))
|
||||
self._close_serial()
|
||||
super().destroy_node()
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = Stm32CmdNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,332 @@
|
||||
"""stm32_protocol.py — Binary frame codec for Jetson↔STM32 communication.
|
||||
|
||||
Issue #119: defines the binary serial protocol between the Jetson Nano and the
|
||||
STM32F722 flight controller over USB CDC @ 921600 baud.
|
||||
|
||||
Frame layout (all multi-byte fields are big-endian):
|
||||
┌──────┬──────┬──────┬──────────────────┬───────────┬──────┐
|
||||
│ STX │ TYPE │ LEN │ PAYLOAD │ CRC16 │ ETX │
|
||||
│ 0x02 │ 1B │ 1B │ LEN bytes │ 2B BE │ 0x03 │
|
||||
└──────┴──────┴──────┴──────────────────┴───────────┴──────┘
|
||||
|
||||
CRC16 covers: TYPE + LEN + PAYLOAD (not STX, ETX, or CRC bytes themselves).
|
||||
CRC algorithm: CCITT-16, polynomial=0x1021, init=0xFFFF, no final XOR.
|
||||
|
||||
Command types (Jetson → STM32):
|
||||
0x01 HEARTBEAT — no payload (len=0)
|
||||
0x02 SPEED_STEER — int16 speed + int16 steer (len=4) range: -1000..+1000
|
||||
0x03 ARM — uint8 (0=disarm, 1=arm) (len=1)
|
||||
0x04 SET_MODE — uint8 mode (len=1)
|
||||
0x05 PID_UPDATE — float32 kp + ki + kd (len=12)
|
||||
|
||||
Telemetry types (STM32 → Jetson):
|
||||
0x10 IMU — int16×6: pitch,roll,yaw (×100 deg), ax,ay,az (×100 m/s²) (len=12)
|
||||
0x11 BATTERY — uint16 voltage_mv + int16 current_ma + uint8 soc_pct (len=5)
|
||||
0x12 MOTOR_RPM — int16 left_rpm + int16 right_rpm (len=4)
|
||||
0x13 ARM_STATE — uint8 state + uint8 error_flags (len=2)
|
||||
0x14 ERROR — uint8 error_code + uint8 subcode (len=2)
|
||||
|
||||
Usage:
|
||||
# Encoding (Jetson → STM32)
|
||||
frame = encode_speed_steer(300, -150)
|
||||
ser.write(frame)
|
||||
|
||||
# Decoding (STM32 → Jetson), one byte at a time
|
||||
parser = FrameParser()
|
||||
for byte in incoming_bytes:
|
||||
result = parser.feed(byte)
|
||||
if result is not None:
|
||||
handle_frame(result)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
from typing import Optional
|
||||
|
||||
# ── Frame constants ───────────────────────────────────────────────────────────
|
||||
|
||||
STX = 0x02
|
||||
ETX = 0x03
|
||||
MAX_PAYLOAD_LEN = 64 # hard limit; any frame larger is corrupt
|
||||
|
||||
|
||||
# ── Command / telemetry type codes ────────────────────────────────────────────
|
||||
|
||||
class CmdType(IntEnum):
|
||||
HEARTBEAT = 0x01
|
||||
SPEED_STEER = 0x02
|
||||
ARM = 0x03
|
||||
SET_MODE = 0x04
|
||||
PID_UPDATE = 0x05
|
||||
|
||||
|
||||
class TelType(IntEnum):
|
||||
IMU = 0x10
|
||||
BATTERY = 0x11
|
||||
MOTOR_RPM = 0x12
|
||||
ARM_STATE = 0x13
|
||||
ERROR = 0x14
|
||||
|
||||
|
||||
# ── Parsed telemetry objects ──────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class ImuFrame:
|
||||
pitch_deg: float # degrees (positive = forward tilt)
|
||||
roll_deg: float
|
||||
yaw_deg: float
|
||||
accel_x: float # m/s²
|
||||
accel_y: float
|
||||
accel_z: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatteryFrame:
|
||||
voltage_mv: int # millivolts (e.g. 11100 = 11.1 V)
|
||||
current_ma: int # milliamps (negative = charging)
|
||||
soc_pct: int # state of charge 0–100 (from STM32 fuel gauge or lookup)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MotorRpmFrame:
|
||||
left_rpm: int
|
||||
right_rpm: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArmStateFrame:
|
||||
state: int # 0=DISARMED 1=ARMED 2=TILT_FAULT
|
||||
error_flags: int # bitmask
|
||||
|
||||
|
||||
@dataclass
|
||||
class ErrorFrame:
|
||||
error_code: int
|
||||
subcode: int
|
||||
|
||||
|
||||
# Union type for decoded results
|
||||
TelemetryFrame = ImuFrame | BatteryFrame | MotorRpmFrame | ArmStateFrame | ErrorFrame
|
||||
|
||||
|
||||
# ── CRC16 CCITT ───────────────────────────────────────────────────────────────
|
||||
|
||||
def _crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int:
|
||||
"""CRC16-CCITT: polynomial 0x1021, init 0xFFFF, no final XOR."""
|
||||
crc = init
|
||||
for byte in data:
|
||||
crc ^= byte << 8
|
||||
for _ in range(8):
|
||||
if crc & 0x8000:
|
||||
crc = (crc << 1) ^ 0x1021
|
||||
else:
|
||||
crc <<= 1
|
||||
crc &= 0xFFFF
|
||||
return crc
|
||||
|
||||
|
||||
# ── Frame encoder ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _build_frame(cmd_type: int, payload: bytes) -> bytes:
|
||||
"""Assemble a complete binary frame with CRC16."""
|
||||
assert len(payload) <= MAX_PAYLOAD_LEN, "Payload too large"
|
||||
length = len(payload)
|
||||
header = bytes([cmd_type, length])
|
||||
crc = _crc16_ccitt(header + payload)
|
||||
return bytes([STX, cmd_type, length]) + payload + struct.pack(">H", crc) + bytes([ETX])
|
||||
|
||||
|
||||
def encode_heartbeat() -> bytes:
|
||||
"""HEARTBEAT frame — no payload."""
|
||||
return _build_frame(CmdType.HEARTBEAT, b"")
|
||||
|
||||
|
||||
def encode_speed_steer(speed: int, steer: int) -> bytes:
|
||||
"""SPEED_STEER frame — int16 speed + int16 steer, both in -1000..+1000."""
|
||||
speed = max(-1000, min(1000, int(speed)))
|
||||
steer = max(-1000, min(1000, int(steer)))
|
||||
return _build_frame(CmdType.SPEED_STEER, struct.pack(">hh", speed, steer))
|
||||
|
||||
|
||||
def encode_arm(arm: bool) -> bytes:
|
||||
"""ARM frame — 0=disarm, 1=arm."""
|
||||
return _build_frame(CmdType.ARM, struct.pack("B", 1 if arm else 0))
|
||||
|
||||
|
||||
def encode_set_mode(mode: int) -> bytes:
|
||||
"""SET_MODE frame — mode byte."""
|
||||
return _build_frame(CmdType.SET_MODE, struct.pack("B", mode & 0xFF))
|
||||
|
||||
|
||||
def encode_pid_update(kp: float, ki: float, kd: float) -> bytes:
|
||||
"""PID_UPDATE frame — three float32 values."""
|
||||
return _build_frame(CmdType.PID_UPDATE, struct.pack(">fff", kp, ki, kd))
|
||||
|
||||
|
||||
# ── Frame decoder (state-machine parser) ─────────────────────────────────────
|
||||
|
||||
class ParserState(IntEnum):
|
||||
WAIT_STX = 0
|
||||
WAIT_TYPE = 1
|
||||
WAIT_LEN = 2
|
||||
PAYLOAD = 3
|
||||
CRC_HI = 4
|
||||
CRC_LO = 5
|
||||
WAIT_ETX = 6
|
||||
|
||||
|
||||
class ParseError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class FrameParser:
|
||||
"""Byte-by-byte streaming parser for STM32 telemetry frames.
|
||||
|
||||
Feed individual bytes via feed(); returns a decoded TelemetryFrame (or raw
|
||||
bytes tuple) when a complete valid frame is received.
|
||||
|
||||
Thread-safety: single-threaded — wrap in a lock if shared across threads.
|
||||
|
||||
Usage::
|
||||
parser = FrameParser()
|
||||
for b in incoming:
|
||||
result = parser.feed(b)
|
||||
if result is not None:
|
||||
process(result)
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._state = ParserState.WAIT_STX
|
||||
self._type = 0
|
||||
self._length = 0
|
||||
self._payload = bytearray()
|
||||
self._crc_rcvd = 0
|
||||
self.frames_ok = 0
|
||||
self.frames_error = 0
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset parser to initial state (call after error or port reconnect)."""
|
||||
self._state = ParserState.WAIT_STX
|
||||
self._payload = bytearray()
|
||||
|
||||
def feed(self, byte: int) -> Optional[TelemetryFrame | tuple]:
|
||||
"""Process one byte. Returns decoded frame on success, None otherwise.
|
||||
|
||||
On CRC error, increments frames_error and resets. The return value on
|
||||
success is a dataclass (ImuFrame, BatteryFrame, etc.) or a
|
||||
(type_code, raw_payload) tuple for unknown type codes.
|
||||
"""
|
||||
s = self._state
|
||||
|
||||
if s == ParserState.WAIT_STX:
|
||||
if byte == STX:
|
||||
self._state = ParserState.WAIT_TYPE
|
||||
return None
|
||||
|
||||
if s == ParserState.WAIT_TYPE:
|
||||
self._type = byte
|
||||
self._state = ParserState.WAIT_LEN
|
||||
return None
|
||||
|
||||
if s == ParserState.WAIT_LEN:
|
||||
self._length = byte
|
||||
self._payload = bytearray()
|
||||
if self._length > MAX_PAYLOAD_LEN:
|
||||
# Corrupt frame — too big; reset
|
||||
self.frames_error += 1
|
||||
self.reset()
|
||||
return None
|
||||
if self._length == 0:
|
||||
self._state = ParserState.CRC_HI
|
||||
else:
|
||||
self._state = ParserState.PAYLOAD
|
||||
return None
|
||||
|
||||
if s == ParserState.PAYLOAD:
|
||||
self._payload.append(byte)
|
||||
if len(self._payload) == self._length:
|
||||
self._state = ParserState.CRC_HI
|
||||
return None
|
||||
|
||||
if s == ParserState.CRC_HI:
|
||||
self._crc_rcvd = byte << 8
|
||||
self._state = ParserState.CRC_LO
|
||||
return None
|
||||
|
||||
if s == ParserState.CRC_LO:
|
||||
self._crc_rcvd |= byte
|
||||
self._state = ParserState.WAIT_ETX
|
||||
return None
|
||||
|
||||
if s == ParserState.WAIT_ETX:
|
||||
self.reset() # always reset so we look for next STX
|
||||
if byte != ETX:
|
||||
self.frames_error += 1
|
||||
return None
|
||||
|
||||
# Verify CRC
|
||||
crc_data = bytes([self._type, self._length]) + self._payload
|
||||
expected = _crc16_ccitt(crc_data)
|
||||
if expected != self._crc_rcvd:
|
||||
self.frames_error += 1
|
||||
return None
|
||||
|
||||
# Decode
|
||||
self.frames_ok += 1
|
||||
return _decode_telemetry(self._type, bytes(self._payload))
|
||||
|
||||
# Should never reach here
|
||||
self.reset()
|
||||
return None
|
||||
|
||||
|
||||
# ── Telemetry decoder ─────────────────────────────────────────────────────────
|
||||
|
||||
def _decode_telemetry(type_code: int, payload: bytes) -> Optional[TelemetryFrame | tuple]:
|
||||
"""Decode a validated telemetry payload into a typed dataclass."""
|
||||
try:
|
||||
if type_code == TelType.IMU:
|
||||
if len(payload) < 12:
|
||||
return None
|
||||
p, r, y, ax, ay, az = struct.unpack_from(">hhhhhh", payload)
|
||||
return ImuFrame(
|
||||
pitch_deg=p / 100.0,
|
||||
roll_deg=r / 100.0,
|
||||
yaw_deg=y / 100.0,
|
||||
accel_x=ax / 100.0,
|
||||
accel_y=ay / 100.0,
|
||||
accel_z=az / 100.0,
|
||||
)
|
||||
|
||||
if type_code == TelType.BATTERY:
|
||||
if len(payload) < 5:
|
||||
return None
|
||||
v_mv, i_ma, soc = struct.unpack_from(">HhB", payload)
|
||||
return BatteryFrame(voltage_mv=v_mv, current_ma=i_ma, soc_pct=soc)
|
||||
|
||||
if type_code == TelType.MOTOR_RPM:
|
||||
if len(payload) < 4:
|
||||
return None
|
||||
left, right = struct.unpack_from(">hh", payload)
|
||||
return MotorRpmFrame(left_rpm=left, right_rpm=right)
|
||||
|
||||
if type_code == TelType.ARM_STATE:
|
||||
if len(payload) < 2:
|
||||
return None
|
||||
state, flags = struct.unpack_from("BB", payload)
|
||||
return ArmStateFrame(state=state, error_flags=flags)
|
||||
|
||||
if type_code == TelType.ERROR:
|
||||
if len(payload) < 2:
|
||||
return None
|
||||
code, sub = struct.unpack_from("BB", payload)
|
||||
return ErrorFrame(error_code=code, subcode=sub)
|
||||
|
||||
except struct.error:
|
||||
return None
|
||||
|
||||
# Unknown telemetry type — return raw
|
||||
return (type_code, payload)
|
||||
@ -13,11 +13,15 @@ setup(
|
||||
"launch/bridge.launch.py",
|
||||
"launch/cmd_vel_bridge.launch.py",
|
||||
"launch/remote_estop.launch.py",
|
||||
"launch/stm32_cmd.launch.py",
|
||||
"launch/battery.launch.py",
|
||||
]),
|
||||
(f"share/{package_name}/config", [
|
||||
"config/bridge_params.yaml",
|
||||
"config/cmd_vel_bridge_params.yaml",
|
||||
"config/estop_params.yaml",
|
||||
"config/stm32_cmd_params.yaml",
|
||||
"config/battery_params.yaml",
|
||||
]),
|
||||
],
|
||||
install_requires=["setuptools", "pyserial"],
|
||||
@ -36,6 +40,10 @@ setup(
|
||||
# Nav2 cmd_vel bridge: velocity limits + ramp + deadman + mode gate
|
||||
"cmd_vel_bridge_node = saltybot_bridge.cmd_vel_bridge_node:main",
|
||||
"remote_estop_node = saltybot_bridge.remote_estop_node:main",
|
||||
# Binary-framed STM32 command node (Issue #119)
|
||||
"stm32_cmd_node = saltybot_bridge.stm32_cmd_node:main",
|
||||
# Battery management node (Issue #125)
|
||||
"battery_node = saltybot_bridge.battery_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
341
jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_cmd_node.py
Normal file
341
jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_cmd_node.py
Normal file
@ -0,0 +1,341 @@
|
||||
"""test_stm32_cmd_node.py — Unit tests for Stm32CmdNode with mock serial port.
|
||||
|
||||
Tests:
|
||||
- Serial open/close lifecycle
|
||||
- Twist→SPEED_STEER frame encoding and clamping
|
||||
- Heartbeat cadence (mock timer)
|
||||
- Watchdog fires after 500ms silence and sends zero-speed
|
||||
- ARM service request → ARM frame on serial
|
||||
- PID_UPDATE topic → PID_UPDATE frame on serial
|
||||
- Telemetry receive path: mock serial → publisher callbacks
|
||||
- Auto-reconnect on serial disconnect
|
||||
- Zero-speed sent on node shutdown
|
||||
- CRC errors counted correctly
|
||||
|
||||
Run with: pytest test/test_stm32_cmd_node.py -v
|
||||
No ROS2 runtime required — uses mock Node infrastructure.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import struct
|
||||
import threading
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from saltybot_bridge.stm32_protocol import (
|
||||
STX, ETX, CmdType, TelType,
|
||||
encode_speed_steer, encode_heartbeat, encode_arm, encode_pid_update,
|
||||
_build_frame, _crc16_ccitt,
|
||||
ImuFrame, BatteryFrame, ArmStateFrame, MotorRpmFrame, ErrorFrame,
|
||||
)
|
||||
|
||||
|
||||
# ── Mock serial ───────────────────────────────────────────────────────────────
|
||||
|
||||
class MockSerial:
|
||||
"""In-memory serial port mock for testing TX/RX paths without hardware."""
|
||||
|
||||
def __init__(self, rx_data: bytes = b""):
|
||||
self._tx_buf = io.BytesIO()
|
||||
self._rx_buf = io.BytesIO(rx_data)
|
||||
self.is_open = True
|
||||
self._lock = threading.Lock()
|
||||
self.write_calls = 0
|
||||
self.raise_on_write: Exception | None = None
|
||||
|
||||
def write(self, data: bytes) -> int:
|
||||
if self.raise_on_write:
|
||||
raise self.raise_on_write
|
||||
with self._lock:
|
||||
self._tx_buf.write(data)
|
||||
self.write_calls += 1
|
||||
return len(data)
|
||||
|
||||
@property
|
||||
def in_waiting(self) -> int:
|
||||
pos = self._rx_buf.tell()
|
||||
end = self._rx_buf.seek(0, 2)
|
||||
self._rx_buf.seek(pos)
|
||||
return end - pos
|
||||
|
||||
def read(self, n: int) -> bytes:
|
||||
return self._rx_buf.read(n)
|
||||
|
||||
def reset_input_buffer(self) -> None:
|
||||
pass
|
||||
|
||||
def close(self) -> None:
|
||||
self.is_open = False
|
||||
|
||||
def get_written(self) -> bytes:
|
||||
with self._lock:
|
||||
return self._tx_buf.getvalue()
|
||||
|
||||
def clear_written(self) -> None:
|
||||
with self._lock:
|
||||
self._tx_buf = io.BytesIO()
|
||||
|
||||
|
||||
# ── Frame builder helpers ─────────────────────────────────────────────────────
|
||||
|
||||
def _imu_frame_bytes(pitch=100, roll=0, yaw=0, ax=0, ay=0, az=981) -> bytes:
|
||||
payload = struct.pack(">hhhhhh", pitch, roll, yaw, ax, ay, az)
|
||||
return _build_frame(TelType.IMU, payload)
|
||||
|
||||
|
||||
def _battery_frame_bytes(v_mv=11100, i_ma=500, soc=80) -> bytes:
|
||||
payload = struct.pack(">HhB", v_mv, i_ma, soc)
|
||||
return _build_frame(TelType.BATTERY, payload)
|
||||
|
||||
|
||||
def _arm_state_frame_bytes(state=1, flags=0) -> bytes:
|
||||
payload = struct.pack("BB", state, flags)
|
||||
return _build_frame(TelType.ARM_STATE, payload)
|
||||
|
||||
|
||||
def _motor_rpm_frame_bytes(left=1200, right=1200) -> bytes:
|
||||
payload = struct.pack(">hh", left, right)
|
||||
return _build_frame(TelType.MOTOR_RPM, payload)
|
||||
|
||||
|
||||
def _error_frame_bytes(code=0x01, sub=0x00) -> bytes:
|
||||
payload = struct.pack("BB", code, sub)
|
||||
return _build_frame(TelType.ERROR, payload)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Protocol logic tests (no ROS2)
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestCommandEncoding:
|
||||
"""Verify the TX path logic without spinning up ROS2."""
|
||||
|
||||
def test_zero_twist_sends_zero_frame(self):
|
||||
frame = encode_speed_steer(0, 0)
|
||||
speed, steer = struct.unpack(">hh", frame[3:7])
|
||||
assert speed == 0
|
||||
assert steer == 0
|
||||
|
||||
def test_forward_1ms_scales_to_1000(self):
|
||||
"""linear.x = 1.0 m/s with speed_scale=1000 → speed=1000."""
|
||||
speed_scale = 1000.0
|
||||
speed = int(max(-1000, min(1000, 1.0 * speed_scale)))
|
||||
frame = encode_speed_steer(speed, 0)
|
||||
s, _ = struct.unpack(">hh", frame[3:7])
|
||||
assert s == 1000
|
||||
|
||||
def test_left_turn_scales_correctly(self):
|
||||
"""angular.z = 1.0 with steer_scale=-500 → steer=-500."""
|
||||
steer_scale = -500.0
|
||||
steer = int(max(-1000, min(1000, 1.0 * steer_scale)))
|
||||
frame = encode_speed_steer(0, steer)
|
||||
_, st = struct.unpack(">hh", frame[3:7])
|
||||
assert st == -500
|
||||
|
||||
def test_heartbeat_frame_is_minimal(self):
|
||||
frame = encode_heartbeat()
|
||||
assert frame[1] == CmdType.HEARTBEAT
|
||||
assert frame[2] == 0 # zero payload
|
||||
|
||||
def test_arm_frame_payload(self):
|
||||
frame_arm = encode_arm(True)
|
||||
frame_disarm = encode_arm(False)
|
||||
assert frame_arm[3] == 1
|
||||
assert frame_disarm[3] == 0
|
||||
|
||||
def test_pid_frame_all_zeros(self):
|
||||
frame = encode_pid_update(0.0, 0.0, 0.0)
|
||||
kp, ki, kd = struct.unpack(">fff", frame[3:15])
|
||||
assert kp == pytest.approx(0.0)
|
||||
assert ki == pytest.approx(0.0)
|
||||
assert kd == pytest.approx(0.0)
|
||||
|
||||
def test_pid_frame_typical_values(self):
|
||||
frame = encode_pid_update(3.0, 0.02, 0.12)
|
||||
kp, ki, kd = struct.unpack(">fff", frame[3:15])
|
||||
assert kp == pytest.approx(3.0, rel=1e-5)
|
||||
assert ki == pytest.approx(0.02, rel=1e-5)
|
||||
assert kd == pytest.approx(0.12, rel=1e-5)
|
||||
|
||||
|
||||
class TestMockSerialTX:
|
||||
"""Test TX path using MockSerial — verifies bytes sent over the wire."""
|
||||
|
||||
def _write_and_get(self, frames: list[bytes]) -> bytes:
|
||||
"""Write multiple frames to a MockSerial and return all written bytes."""
|
||||
ms = MockSerial()
|
||||
for f in frames:
|
||||
ms.write(f)
|
||||
return ms.get_written()
|
||||
|
||||
def test_speed_steer_bytes_on_wire(self):
|
||||
frame = encode_speed_steer(300, -150)
|
||||
ms = MockSerial()
|
||||
ms.write(frame)
|
||||
written = ms.get_written()
|
||||
assert written[0] == STX
|
||||
assert written[1] == CmdType.SPEED_STEER
|
||||
assert written[-1] == ETX
|
||||
|
||||
def test_heartbeat_bytes_on_wire(self):
|
||||
frame = encode_heartbeat()
|
||||
ms = MockSerial()
|
||||
ms.write(frame)
|
||||
written = ms.get_written()
|
||||
assert written[0] == STX
|
||||
assert written[1] == CmdType.HEARTBEAT
|
||||
|
||||
def test_write_error_detected(self):
|
||||
import serial
|
||||
ms = MockSerial()
|
||||
ms.raise_on_write = serial.SerialException("test error")
|
||||
with pytest.raises(serial.SerialException):
|
||||
ms.write(b"\x00")
|
||||
|
||||
def test_multiple_frames_written_sequentially(self):
|
||||
ms = MockSerial()
|
||||
ms.write(encode_heartbeat())
|
||||
ms.write(encode_speed_steer(100, 0))
|
||||
ms.write(encode_arm(True))
|
||||
written = ms.get_written()
|
||||
# All three frames should be in the buffer
|
||||
assert written.count(STX) == 3
|
||||
assert written.count(ETX) == 3
|
||||
|
||||
def test_write_call_count_tracked(self):
|
||||
ms = MockSerial()
|
||||
ms.write(encode_heartbeat())
|
||||
ms.write(encode_heartbeat())
|
||||
assert ms.write_calls == 2
|
||||
|
||||
|
||||
class TestMockSerialRX:
|
||||
"""Test RX parsing path using MockSerial with pre-loaded telemetry data."""
|
||||
|
||||
from saltybot_bridge.stm32_protocol import FrameParser
|
||||
|
||||
def test_rx_imu_frame(self):
|
||||
from saltybot_bridge.stm32_protocol import FrameParser, ImuFrame
|
||||
raw = _imu_frame_bytes(pitch=500, roll=-200, yaw=100, ax=0, ay=0, az=981)
|
||||
ms = MockSerial(rx_data=raw)
|
||||
parser = FrameParser()
|
||||
results = []
|
||||
while ms.in_waiting:
|
||||
chunk = ms.read(ms.in_waiting)
|
||||
for b in chunk:
|
||||
r = parser.feed(b)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
assert len(results) == 1
|
||||
f = results[0]
|
||||
assert isinstance(f, ImuFrame)
|
||||
assert f.pitch_deg == pytest.approx(5.0)
|
||||
assert f.roll_deg == pytest.approx(-2.0)
|
||||
assert f.accel_z == pytest.approx(9.81)
|
||||
|
||||
def test_rx_battery_frame(self):
|
||||
from saltybot_bridge.stm32_protocol import FrameParser, BatteryFrame
|
||||
raw = _battery_frame_bytes(v_mv=10500, i_ma=1200, soc=45)
|
||||
ms = MockSerial(rx_data=raw)
|
||||
parser = FrameParser()
|
||||
results = []
|
||||
while ms.in_waiting:
|
||||
for b in ms.read(ms.in_waiting):
|
||||
r = parser.feed(b)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
f = results[0]
|
||||
assert isinstance(f, BatteryFrame)
|
||||
assert f.voltage_mv == 10500
|
||||
assert f.soc_pct == 45
|
||||
|
||||
def test_rx_multiple_frames_in_one_read(self):
|
||||
from saltybot_bridge.stm32_protocol import FrameParser
|
||||
raw = (_imu_frame_bytes() + _arm_state_frame_bytes() + _battery_frame_bytes())
|
||||
ms = MockSerial(rx_data=raw)
|
||||
parser = FrameParser()
|
||||
results = []
|
||||
data = ms.read(len(raw))
|
||||
for b in data:
|
||||
r = parser.feed(b)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
assert len(results) == 3
|
||||
assert parser.frames_error == 0
|
||||
|
||||
def test_rx_bad_crc_counted_as_error(self):
|
||||
from saltybot_bridge.stm32_protocol import FrameParser
|
||||
raw = bytearray(_arm_state_frame_bytes(state=1))
|
||||
raw[-3] ^= 0xFF # corrupt CRC
|
||||
ms = MockSerial(rx_data=bytes(raw))
|
||||
parser = FrameParser()
|
||||
for b in ms.read(len(raw)):
|
||||
parser.feed(b)
|
||||
assert parser.frames_ok == 0
|
||||
assert parser.frames_error == 1
|
||||
|
||||
def test_rx_resync_after_corrupt_byte(self):
|
||||
from saltybot_bridge.stm32_protocol import FrameParser, ArmStateFrame
|
||||
garbage = b"\xDE\xAD\x00\x00"
|
||||
valid = _arm_state_frame_bytes(state=1)
|
||||
ms = MockSerial(rx_data=garbage + valid)
|
||||
parser = FrameParser()
|
||||
results = []
|
||||
while ms.in_waiting:
|
||||
for b in ms.read(ms.in_waiting):
|
||||
r = parser.feed(b)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
assert len(results) == 1
|
||||
assert isinstance(results[0], ArmStateFrame)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Watchdog logic tests
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestWatchdog:
|
||||
"""Test the watchdog logic in isolation without a full ROS2 node."""
|
||||
|
||||
def test_watchdog_fires_after_timeout(self):
|
||||
"""Simulate watchdog: record time, wait >500ms, verify zero-speed sent."""
|
||||
ms = MockSerial()
|
||||
last_cmd_t = time.monotonic() - 0.6 # 600ms ago
|
||||
wd_timeout = 0.5
|
||||
|
||||
# Watchdog check logic (mirrors _watchdog_cb)
|
||||
if time.monotonic() - last_cmd_t >= wd_timeout:
|
||||
ms.write(encode_speed_steer(0, 0))
|
||||
|
||||
written = ms.get_written()
|
||||
speed, steer = struct.unpack(">hh", written[3:7])
|
||||
assert speed == 0
|
||||
assert steer == 0
|
||||
|
||||
def test_watchdog_does_not_fire_before_timeout(self):
|
||||
"""Watchdog must not fire if /cmd_vel was recent."""
|
||||
ms = MockSerial()
|
||||
last_cmd_t = time.monotonic() - 0.2 # 200ms ago — within timeout
|
||||
wd_timeout = 0.5
|
||||
|
||||
if time.monotonic() - last_cmd_t >= wd_timeout:
|
||||
ms.write(encode_speed_steer(0, 0))
|
||||
|
||||
assert ms.get_written() == b""
|
||||
|
||||
def test_zero_speed_frame_structure(self):
|
||||
"""Zero-speed frame must have correct structure."""
|
||||
frame = encode_speed_steer(0, 0)
|
||||
assert frame[0] == STX
|
||||
assert frame[1] == CmdType.SPEED_STEER
|
||||
assert frame[-1] == ETX
|
||||
speed, steer = struct.unpack(">hh", frame[3:7])
|
||||
assert speed == 0
|
||||
assert steer == 0
|
||||
483
jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_protocol.py
Normal file
483
jetson/ros2_ws/src/saltybot_bridge/test/test_stm32_protocol.py
Normal file
@ -0,0 +1,483 @@
|
||||
"""test_stm32_protocol.py — Unit tests for binary STM32 frame codec.
|
||||
|
||||
Tests:
|
||||
- CRC16-CCITT correctness
|
||||
- All encoder functions produce correctly structured frames
|
||||
- FrameParser decodes valid frames for all telemetry types
|
||||
- FrameParser rejects frames with bad CRC
|
||||
- FrameParser rejects frames with bad ETX
|
||||
- FrameParser resyncs after corrupt data (finds next STX)
|
||||
- FrameParser handles streaming byte-by-byte input
|
||||
- FrameParser handles oversized payload gracefully
|
||||
- Speed/steer clamping in encode_speed_steer
|
||||
- Round-trip encode → decode for all known telemetry types
|
||||
|
||||
Run with: pytest test/test_stm32_protocol.py -v
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import struct
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
# ── Path setup (no ROS2 install needed) ──────────────────────────────────────
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from saltybot_bridge.stm32_protocol import (
|
||||
STX, ETX,
|
||||
CmdType, TelType,
|
||||
ImuFrame, BatteryFrame, MotorRpmFrame, ArmStateFrame, ErrorFrame,
|
||||
FrameParser,
|
||||
_crc16_ccitt, _build_frame, _decode_telemetry,
|
||||
encode_heartbeat, encode_speed_steer, encode_arm,
|
||||
encode_set_mode, encode_pid_update,
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _parse_all(raw: bytes):
|
||||
"""Feed all bytes into a fresh FrameParser, return list of decoded frames."""
|
||||
parser = FrameParser()
|
||||
results = []
|
||||
for b in raw:
|
||||
r = parser.feed(b)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
return results, parser
|
||||
|
||||
|
||||
def _build_telem_frame(type_code: int, payload: bytes) -> bytes:
|
||||
"""Build a raw telemetry frame (same framing as command encoder)."""
|
||||
return _build_frame(type_code, payload)
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# CRC16 tests
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestCrc16:
|
||||
def test_known_vector_empty(self):
|
||||
"""CRC of empty data must equal 0xFFFF (init value)."""
|
||||
assert _crc16_ccitt(b"") == 0xFFFF
|
||||
|
||||
def test_known_vector_123456789(self):
|
||||
"""Classic CCITT-16 test vector: '123456789' → 0x29B1."""
|
||||
assert _crc16_ccitt(b"123456789") == 0x29B1
|
||||
|
||||
def test_crc_changes_on_bit_flip(self):
|
||||
data = b"\x02\x04\x42"
|
||||
flipped = b"\x02\x04\x43"
|
||||
assert _crc16_ccitt(data) != _crc16_ccitt(flipped)
|
||||
|
||||
def test_crc_all_zeros(self):
|
||||
crc = _crc16_ccitt(bytes(16))
|
||||
assert isinstance(crc, int)
|
||||
assert 0 <= crc <= 0xFFFF
|
||||
|
||||
def test_crc_single_byte(self):
|
||||
c1 = _crc16_ccitt(b"\x01")
|
||||
c2 = _crc16_ccitt(b"\x02")
|
||||
assert c1 != c2
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Encoder tests
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestEncoders:
|
||||
def test_heartbeat_structure(self):
|
||||
frame = encode_heartbeat()
|
||||
# STX TYPE LEN CRC_HI CRC_LO ETX (no payload → 6 bytes)
|
||||
assert len(frame) == 6
|
||||
assert frame[0] == STX
|
||||
assert frame[1] == CmdType.HEARTBEAT
|
||||
assert frame[2] == 0 # LEN=0
|
||||
assert frame[-1] == ETX
|
||||
|
||||
def test_heartbeat_crc_valid(self):
|
||||
frame = encode_heartbeat()
|
||||
crc_expected = _crc16_ccitt(bytes([CmdType.HEARTBEAT, 0]))
|
||||
crc_actual = struct.unpack(">H", frame[3:5])[0]
|
||||
assert crc_actual == crc_expected
|
||||
|
||||
def test_speed_steer_structure(self):
|
||||
frame = encode_speed_steer(300, -150)
|
||||
assert frame[0] == STX
|
||||
assert frame[1] == CmdType.SPEED_STEER
|
||||
assert frame[2] == 4 # LEN=4 (two int16)
|
||||
assert frame[-1] == ETX
|
||||
assert len(frame) == 10 # 3 header + 4 payload + 2 CRC + 1 ETX
|
||||
|
||||
def test_speed_steer_payload_values(self):
|
||||
frame = encode_speed_steer(500, -200)
|
||||
speed, steer = struct.unpack(">hh", frame[3:7])
|
||||
assert speed == 500
|
||||
assert steer == -200
|
||||
|
||||
def test_speed_steer_clamped_max(self):
|
||||
frame = encode_speed_steer(9999, 9999)
|
||||
speed, steer = struct.unpack(">hh", frame[3:7])
|
||||
assert speed == 1000
|
||||
assert steer == 1000
|
||||
|
||||
def test_speed_steer_clamped_min(self):
|
||||
frame = encode_speed_steer(-9999, -9999)
|
||||
speed, steer = struct.unpack(">hh", frame[3:7])
|
||||
assert speed == -1000
|
||||
assert steer == -1000
|
||||
|
||||
def test_speed_steer_zero(self):
|
||||
frame = encode_speed_steer(0, 0)
|
||||
speed, steer = struct.unpack(">hh", frame[3:7])
|
||||
assert speed == 0
|
||||
assert steer == 0
|
||||
|
||||
def test_arm_true(self):
|
||||
frame = encode_arm(True)
|
||||
assert frame[1] == CmdType.ARM
|
||||
assert frame[2] == 1 # LEN=1
|
||||
assert frame[3] == 1 # payload = 1 (arm)
|
||||
|
||||
def test_arm_false(self):
|
||||
frame = encode_arm(False)
|
||||
assert frame[3] == 0 # payload = 0 (disarm)
|
||||
|
||||
def test_set_mode(self):
|
||||
frame = encode_set_mode(3)
|
||||
assert frame[1] == CmdType.SET_MODE
|
||||
assert frame[3] == 3
|
||||
|
||||
def test_set_mode_masked(self):
|
||||
frame = encode_set_mode(0x1FF)
|
||||
assert frame[3] == 0xFF # masked to uint8
|
||||
|
||||
def test_pid_update_structure(self):
|
||||
frame = encode_pid_update(1.5, 0.01, 0.1)
|
||||
assert frame[1] == CmdType.PID_UPDATE
|
||||
assert frame[2] == 12 # LEN=12 (three float32)
|
||||
kp, ki, kd = struct.unpack(">fff", frame[3:15])
|
||||
assert kp == pytest.approx(1.5)
|
||||
assert ki == pytest.approx(0.01)
|
||||
assert kd == pytest.approx(0.1)
|
||||
|
||||
def test_pid_update_zero(self):
|
||||
frame = encode_pid_update(0.0, 0.0, 0.0)
|
||||
kp, ki, kd = struct.unpack(">fff", frame[3:15])
|
||||
assert kp == pytest.approx(0.0)
|
||||
|
||||
def test_all_frames_end_with_etx(self):
|
||||
frames = [
|
||||
encode_heartbeat(),
|
||||
encode_speed_steer(100, -100),
|
||||
encode_arm(True),
|
||||
encode_set_mode(1),
|
||||
encode_pid_update(2.0, 0.1, 0.05),
|
||||
]
|
||||
for f in frames:
|
||||
assert f[-1] == ETX, f"Frame for type 0x{f[1]:02X} doesn't end with ETX"
|
||||
|
||||
def test_all_frames_start_with_stx(self):
|
||||
for f in [encode_heartbeat(), encode_speed_steer(0, 0)]:
|
||||
assert f[0] == STX
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# FrameParser tests
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestFrameParser:
|
||||
|
||||
# ── IMU telemetry ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_decode_imu_frame(self):
|
||||
payload = struct.pack(">hhhhhh",
|
||||
1250, -300, 900, # pitch×100, roll×100, yaw×100
|
||||
981, 0, -50, # ax×100, ay×100, az×100
|
||||
)
|
||||
raw = _build_telem_frame(TelType.IMU, payload)
|
||||
results, parser = _parse_all(raw)
|
||||
assert len(results) == 1
|
||||
f = results[0]
|
||||
assert isinstance(f, ImuFrame)
|
||||
assert f.pitch_deg == pytest.approx(12.5)
|
||||
assert f.roll_deg == pytest.approx(-3.0)
|
||||
assert f.yaw_deg == pytest.approx(9.0)
|
||||
assert f.accel_x == pytest.approx(9.81)
|
||||
assert f.accel_y == pytest.approx(0.0)
|
||||
assert f.accel_z == pytest.approx(-0.5)
|
||||
|
||||
def test_decode_imu_zero(self):
|
||||
payload = struct.pack(">hhhhhh", 0, 0, 0, 0, 0, 0)
|
||||
raw = _build_telem_frame(TelType.IMU, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
f = results[0]
|
||||
assert f.pitch_deg == pytest.approx(0.0)
|
||||
assert f.accel_z == pytest.approx(0.0)
|
||||
|
||||
# ── Battery telemetry ─────────────────────────────────────────────────────
|
||||
|
||||
def test_decode_battery_frame(self):
|
||||
payload = struct.pack(">HhB", 11100, -500, 72) # 11.1V, -500mA (charging), 72%
|
||||
raw = _build_telem_frame(TelType.BATTERY, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
assert len(results) == 1
|
||||
f = results[0]
|
||||
assert isinstance(f, BatteryFrame)
|
||||
assert f.voltage_mv == 11100
|
||||
assert f.current_ma == -500
|
||||
assert f.soc_pct == 72
|
||||
|
||||
def test_decode_battery_discharging(self):
|
||||
payload = struct.pack(">HhB", 10200, 2000, 45) # 10.2V, 2A draw, 45%
|
||||
raw = _build_telem_frame(TelType.BATTERY, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
f = results[0]
|
||||
assert f.current_ma == 2000 # positive = discharging
|
||||
|
||||
# ── Motor RPM telemetry ───────────────────────────────────────────────────
|
||||
|
||||
def test_decode_motor_rpm(self):
|
||||
payload = struct.pack(">hh", 1500, -1500) # left=1500, right=-1500 (spinning in place)
|
||||
raw = _build_telem_frame(TelType.MOTOR_RPM, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
f = results[0]
|
||||
assert isinstance(f, MotorRpmFrame)
|
||||
assert f.left_rpm == 1500
|
||||
assert f.right_rpm == -1500
|
||||
|
||||
def test_decode_motor_rpm_zero(self):
|
||||
payload = struct.pack(">hh", 0, 0)
|
||||
raw = _build_telem_frame(TelType.MOTOR_RPM, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
assert results[0].left_rpm == 0
|
||||
|
||||
# ── Arm state telemetry ───────────────────────────────────────────────────
|
||||
|
||||
def test_decode_arm_state_armed(self):
|
||||
payload = struct.pack("BB", 1, 0) # ARMED, no error flags
|
||||
raw = _build_telem_frame(TelType.ARM_STATE, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
f = results[0]
|
||||
assert isinstance(f, ArmStateFrame)
|
||||
assert f.state == 1
|
||||
assert f.error_flags == 0
|
||||
|
||||
def test_decode_arm_state_tilt_fault(self):
|
||||
payload = struct.pack("BB", 2, 0x04) # TILT_FAULT, bit 2 set
|
||||
raw = _build_telem_frame(TelType.ARM_STATE, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
f = results[0]
|
||||
assert f.state == 2
|
||||
assert f.error_flags == 0x04
|
||||
|
||||
# ── Error telemetry ───────────────────────────────────────────────────────
|
||||
|
||||
def test_decode_error_frame(self):
|
||||
payload = struct.pack("BB", 0x0A, 0x01) # error 0x0A, sub 0x01
|
||||
raw = _build_telem_frame(TelType.ERROR, payload)
|
||||
results, _ = _parse_all(raw)
|
||||
f = results[0]
|
||||
assert isinstance(f, ErrorFrame)
|
||||
assert f.error_code == 0x0A
|
||||
assert f.subcode == 0x01
|
||||
|
||||
# ── CRC errors ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_bad_crc_rejected(self):
|
||||
raw = bytearray(_build_telem_frame(TelType.BATTERY,
|
||||
struct.pack(">HhB", 11000, 0, 80)))
|
||||
raw[-3] ^= 0xFF # flip CRC_HI byte
|
||||
results, parser = _parse_all(bytes(raw))
|
||||
assert results == []
|
||||
assert parser.frames_error == 1
|
||||
assert parser.frames_ok == 0
|
||||
|
||||
def test_bad_crc_both_bytes(self):
|
||||
raw = bytearray(_build_telem_frame(TelType.MOTOR_RPM,
|
||||
struct.pack(">hh", 100, 100)))
|
||||
raw[-3] ^= 0xAA
|
||||
raw[-2] ^= 0x55
|
||||
results, parser = _parse_all(bytes(raw))
|
||||
assert results == []
|
||||
assert parser.frames_error >= 1
|
||||
|
||||
# ── ETX errors ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_bad_etx_rejected(self):
|
||||
raw = bytearray(_build_telem_frame(TelType.ARM_STATE,
|
||||
struct.pack("BB", 1, 0)))
|
||||
raw[-1] = 0xFF # corrupt ETX
|
||||
results, parser = _parse_all(bytes(raw))
|
||||
assert results == []
|
||||
assert parser.frames_error == 1
|
||||
|
||||
# ── Resync after corrupt data ─────────────────────────────────────────────
|
||||
|
||||
def test_resync_after_garbage(self):
|
||||
"""Parser must resync and decode a valid frame after leading garbage."""
|
||||
garbage = b"\xDE\xAD\xBE\xEF\xFF\x00\x42"
|
||||
valid = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
|
||||
results, parser = _parse_all(garbage + valid)
|
||||
assert len(results) == 1
|
||||
assert isinstance(results[0], ArmStateFrame)
|
||||
|
||||
def test_two_consecutive_frames(self):
|
||||
"""Parser must handle two back-to-back valid frames."""
|
||||
f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
|
||||
f2 = _build_telem_frame(TelType.MOTOR_RPM, struct.pack(">hh", 500, 500))
|
||||
results, parser = _parse_all(f1 + f2)
|
||||
assert len(results) == 2
|
||||
assert isinstance(results[0], ArmStateFrame)
|
||||
assert isinstance(results[1], MotorRpmFrame)
|
||||
assert parser.frames_error == 0
|
||||
|
||||
def test_three_frames_with_garbage_between(self):
|
||||
"""Corrupt bytes between valid frames must not discard subsequent valid frames."""
|
||||
f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
|
||||
f2 = _build_telem_frame(TelType.BATTERY, struct.pack(">HhB", 11000, 0, 80))
|
||||
junk = b"\xDE\xAD"
|
||||
results, parser = _parse_all(f1 + junk + f2)
|
||||
assert len(results) == 2
|
||||
|
||||
def test_unknown_type_returns_raw_tuple(self):
|
||||
"""Unknown telemetry type must be returned as (type_code, payload) tuple."""
|
||||
raw = _build_telem_frame(0x99, b"\xAB\xCD")
|
||||
results, _ = _parse_all(raw)
|
||||
assert len(results) == 1
|
||||
type_code, payload = results[0]
|
||||
assert type_code == 0x99
|
||||
assert payload == b"\xAB\xCD"
|
||||
|
||||
def test_oversized_payload_rejected(self):
|
||||
"""A frame claiming LEN > MAX_PAYLOAD_LEN must be silently skipped."""
|
||||
# Craft a raw frame with LEN=200 (> 64 limit)
|
||||
raw = bytes([STX, TelType.IMU, 200])
|
||||
results, parser = _parse_all(raw)
|
||||
assert results == []
|
||||
assert parser.frames_error == 1
|
||||
|
||||
def test_empty_payload_frame(self):
|
||||
"""A frame with LEN=0 (heartbeat style) must decode correctly."""
|
||||
raw = _build_frame(CmdType.HEARTBEAT, b"")
|
||||
results, parser = _parse_all(raw)
|
||||
assert len(results) == 1
|
||||
type_code, payload = results[0]
|
||||
assert type_code == CmdType.HEARTBEAT
|
||||
assert payload == b""
|
||||
assert parser.frames_error == 0
|
||||
|
||||
# ── Streaming byte-by-byte ────────────────────────────────────────────────
|
||||
|
||||
def test_stream_one_byte_at_a_time(self):
|
||||
"""Every byte fed individually must produce the same result as bulk."""
|
||||
payload = struct.pack(">HhB", 12000, -100, 90)
|
||||
raw = _build_telem_frame(TelType.BATTERY, payload)
|
||||
parser = FrameParser()
|
||||
results = []
|
||||
for b in raw:
|
||||
r = parser.feed(b)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
assert len(results) == 1
|
||||
f = results[0]
|
||||
assert isinstance(f, BatteryFrame)
|
||||
assert f.soc_pct == 90
|
||||
|
||||
# ── Parser reset ──────────────────────────────────────────────────────────
|
||||
|
||||
def test_parser_reset_clears_state(self):
|
||||
"""After reset(), parser must accept a new valid frame."""
|
||||
payload = struct.pack("BB", 0, 0)
|
||||
raw = _build_telem_frame(TelType.ARM_STATE, payload)
|
||||
parser = FrameParser()
|
||||
# Feed partial frame
|
||||
for b in raw[:3]:
|
||||
parser.feed(b)
|
||||
# Reset mid-frame
|
||||
parser.reset()
|
||||
# Feed full frame
|
||||
results = []
|
||||
for b in raw:
|
||||
r = parser.feed(b)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
assert len(results) == 1
|
||||
|
||||
def test_frames_ok_counter(self):
|
||||
f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
|
||||
f2 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 0, 0))
|
||||
_, parser = _parse_all(f1 + f2)
|
||||
assert parser.frames_ok == 2
|
||||
|
||||
def test_frames_error_counter(self):
|
||||
raw = bytearray(_build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0)))
|
||||
raw[-3] ^= 0xFF # corrupt CRC
|
||||
_, parser = _parse_all(bytes(raw))
|
||||
assert parser.frames_error == 1
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Decode edge cases
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestDecodeEdgeCases:
|
||||
def test_imu_short_payload_returns_none(self):
|
||||
result = _decode_telemetry(TelType.IMU, b"\x00\x01") # only 2 bytes, need 12
|
||||
assert result is None
|
||||
|
||||
def test_battery_short_payload_returns_none(self):
|
||||
result = _decode_telemetry(TelType.BATTERY, b"\x00")
|
||||
assert result is None
|
||||
|
||||
def test_motor_rpm_short_payload_returns_none(self):
|
||||
result = _decode_telemetry(TelType.MOTOR_RPM, b"\x00\x01")
|
||||
assert result is None
|
||||
|
||||
def test_arm_state_short_payload_returns_none(self):
|
||||
result = _decode_telemetry(TelType.ARM_STATE, b"\x01")
|
||||
assert result is None
|
||||
|
||||
def test_error_short_payload_returns_none(self):
|
||||
result = _decode_telemetry(TelType.ERROR, b"\x01")
|
||||
assert result is None
|
||||
|
||||
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
# Round-trip tests (encode command → re-parse frame body)
|
||||
# ═══════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestRoundTrip:
|
||||
"""Verify encoder output satisfies parser expectations (same framing)."""
|
||||
|
||||
def _parse_cmd_as_telem(self, frame: bytes):
|
||||
"""Feed a command frame through FrameParser (type codes overlap logic)."""
|
||||
results, parser = _parse_all(frame)
|
||||
return results, parser
|
||||
|
||||
def test_heartbeat_roundtrip(self):
|
||||
frame = encode_heartbeat()
|
||||
results, parser = self._parse_cmd_as_telem(frame)
|
||||
assert len(results) == 1
|
||||
assert parser.frames_error == 0
|
||||
|
||||
def test_speed_steer_roundtrip(self):
|
||||
frame = encode_speed_steer(750, -300)
|
||||
results, _ = self._parse_cmd_as_telem(frame)
|
||||
assert len(results) == 1
|
||||
type_code, payload = results[0]
|
||||
assert type_code == CmdType.SPEED_STEER
|
||||
speed, steer = struct.unpack(">hh", payload)
|
||||
assert speed == 750
|
||||
assert steer == -300
|
||||
|
||||
def test_pid_roundtrip(self):
|
||||
frame = encode_pid_update(2.5, 0.05, 0.08)
|
||||
results, _ = self._parse_cmd_as_telem(frame)
|
||||
type_code, payload = results[0]
|
||||
kp, ki, kd = struct.unpack(">fff", payload)
|
||||
assert kp == pytest.approx(2.5, rel=1e-5)
|
||||
assert ki == pytest.approx(0.05, rel=1e-5)
|
||||
assert kd == pytest.approx(0.08, rel=1e-5)
|
||||
Loading…
x
Reference in New Issue
Block a user