feat(bridge): binary STM32 command protocol (Issue #119) #128

Merged
sl-jetson merged 2 commits from sl-jetson/issue-119-cmd-protocol into main 2026-03-02 09:26:21 -05:00
8 changed files with 1737 additions and 3 deletions

View File

@ -0,0 +1,30 @@
# stm32_cmd_params.yaml — Configuration for stm32_cmd_node (Issue #119)
# Binary-framed Jetson↔STM32 bridge at 921600 baud.
# ── Serial port ────────────────────────────────────────────────────────────────
# Use /dev/stm32-bridge if the udev rule is applied:
# SUBSYSTEM=="tty", ATTRS{idVendor}=="0483", ATTRS{idProduct}=="5740",
# SYMLINK+="stm32-bridge", MODE="0660", GROUP="dialout"
serial_port: /dev/ttyACM0
baud_rate: 921600
reconnect_delay: 2.0 # seconds between USB reconnect attempts
# ── Heartbeat ─────────────────────────────────────────────────────────────────
# HEARTBEAT frame sent every heartbeat_period seconds.
# STM32 fires watchdog and reverts to safe state if no frame received for 500ms.
heartbeat_period: 0.2 # 200ms → well within 500ms STM32 watchdog
# ── Watchdog (Jetson-side) ────────────────────────────────────────────────────
# If no /cmd_vel message received for watchdog_timeout seconds,
# send SPEED_STEER(0,0) to stop the robot.
watchdog_timeout: 0.5 # 500ms
# ── Twist velocity scaling ────────────────────────────────────────────────────
# speed = clamp(linear.x * speed_scale, -1000, 1000) (m/s → ESC units)
# steer = clamp(angular.z * steer_scale, -1000, 1000) (rad/s → ESC units)
#
# Default: 1 m/s → 1000 ESC units, ±2 rad/s → ±1000 steer.
# Negative steer_scale flips ROS2 CCW+ convention to match ESC steer direction.
# Tune speed_scale to set the physical top speed.
speed_scale: 1000.0
steer_scale: -500.0

View File

@ -0,0 +1,52 @@
"""stm32_cmd.launch.py — Launch the binary-framed STM32 command node (Issue #119).
Usage:
# Default (binary protocol, bidirectional):
ros2 launch saltybot_bridge stm32_cmd.launch.py
# Override serial port:
ros2 launch saltybot_bridge stm32_cmd.launch.py serial_port:=/dev/ttyACM1
# Custom velocity scales:
ros2 launch saltybot_bridge stm32_cmd.launch.py speed_scale:=800.0 steer_scale:=-400.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
pkg = get_package_share_directory("saltybot_bridge")
params_file = os.path.join(pkg, "config", "stm32_cmd_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("serial_port", default_value="/dev/ttyACM0"),
DeclareLaunchArgument("baud_rate", default_value="921600"),
DeclareLaunchArgument("speed_scale", default_value="1000.0"),
DeclareLaunchArgument("steer_scale", default_value="-500.0"),
DeclareLaunchArgument("watchdog_timeout", default_value="0.5"),
DeclareLaunchArgument("heartbeat_period", default_value="0.2"),
Node(
package="saltybot_bridge",
executable="stm32_cmd_node",
name="stm32_cmd_node",
output="screen",
emulate_tty=True,
parameters=[
params_file,
{
"serial_port": LaunchConfiguration("serial_port"),
"baud_rate": LaunchConfiguration("baud_rate"),
"speed_scale": LaunchConfiguration("speed_scale"),
"steer_scale": LaunchConfiguration("steer_scale"),
"watchdog_timeout": LaunchConfiguration("watchdog_timeout"),
"heartbeat_period": LaunchConfiguration("heartbeat_period"),
},
],
),
])

View File

@ -5,8 +5,11 @@
<version>0.1.0</version>
<description>
STM32F722 USB CDC serial bridge for saltybot.
Reads JSON telemetry from the STM32 flight controller and publishes
sensor_msgs/Imu, std_msgs/String (balance state), and diagnostic_msgs/DiagnosticArray.
serial_bridge_node: JSON telemetry RX → sensor_msgs/Imu + diagnostics.
stm32_cmd_node (Issue #119): binary-framed protocol — STX/TYPE/LEN/CRC16/ETX,
commands: HEARTBEAT, SPEED_STEER, ARM, SET_MODE, PID_UPDATE;
telemetry: IMU, BATTERY, MOTOR_RPM, ARM_STATE, ERROR; watchdog 500ms.
battery_node (Issue #125): SoC tracking, threshold alerts, SQLite history.
</description>
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
<license>MIT</license>
@ -15,6 +18,8 @@
<depend>python3-paho-mqtt</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<depend>geometry_msgs</depend>
<depend>diagnostic_msgs</depend>
<test_depend>ament_copyright</test_depend>

View File

@ -0,0 +1,483 @@
"""stm32_cmd_node.py — Full bidirectional binary-framed STM32↔Jetson bridge.
Issue #119: replaces the ASCII-protocol saltybot_cmd_node with a robust binary
framing protocol (STX/TYPE/LEN/PAYLOAD/CRC16/ETX) at 921600 baud.
TX commands (Jetson STM32):
SPEED_STEER 50 Hz from /cmd_vel subscription
HEARTBEAT 200 ms timer (STM32 watchdog fires at 500 ms)
ARM via /saltybot/arm service
SET_MODE via /saltybot/set_mode service
PID_UPDATE via /saltybot/pid_update topic
Watchdog: if /cmd_vel is silent for 500 ms, send SPEED_STEER(0,0) and log warning.
RX telemetry (STM32 Jetson):
IMU /saltybot/imu (sensor_msgs/Imu)
BATTERY /saltybot/telemetry/battery (std_msgs/String JSON)
MOTOR_RPM /saltybot/telemetry/motor_rpm (std_msgs/String JSON)
ARM_STATE /saltybot/arm_state (std_msgs/String JSON)
ERROR /saltybot/error (std_msgs/String JSON)
All frames /diagnostics (diagnostic_msgs/DiagnosticArray)
Auto-reconnect: USB disconnect is detected when serial.read() raises; node
continuously retries at reconnect_delay interval.
This node owns /dev/ttyACM0 exclusively do NOT run alongside
serial_bridge_node or saltybot_cmd_node on the same port.
Parameters (config/stm32_cmd_params.yaml):
serial_port /dev/ttyACM0
baud_rate 921600
reconnect_delay 2.0 (seconds)
heartbeat_period 0.2 (seconds)
watchdog_timeout 0.5 (seconds no /cmd_vel send zero-speed)
speed_scale 1000.0 (linear.x m/s ESC units)
steer_scale -500.0 (angular.z rad/s ESC units, neg to flip convention)
"""
from __future__ import annotations
import json
import math
import threading
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
import serial
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from geometry_msgs.msg import Twist
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from std_srvs.srv import SetBool, Trigger
from .stm32_protocol import (
FrameParser,
ImuFrame, BatteryFrame, MotorRpmFrame, ArmStateFrame, ErrorFrame,
encode_heartbeat, encode_speed_steer, encode_arm, encode_set_mode,
encode_pid_update,
)
# ── Constants ─────────────────────────────────────────────────────────────────
IMU_FRAME_ID = "imu_link"
_ARM_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"}
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
# ── Node ──────────────────────────────────────────────────────────────────────
class Stm32CmdNode(Node):
"""Binary-framed Jetson↔STM32 bridge node."""
def __init__(self) -> None:
super().__init__("stm32_cmd_node")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("serial_port", "/dev/ttyACM0")
self.declare_parameter("baud_rate", 921600)
self.declare_parameter("reconnect_delay", 2.0)
self.declare_parameter("heartbeat_period", 0.2)
self.declare_parameter("watchdog_timeout", 0.5)
self.declare_parameter("speed_scale", 1000.0)
self.declare_parameter("steer_scale", -500.0)
port = self.get_parameter("serial_port").value
baud = self.get_parameter("baud_rate").value
self._reconnect_delay = self.get_parameter("reconnect_delay").value
self._hb_period = self.get_parameter("heartbeat_period").value
self._wd_timeout = self.get_parameter("watchdog_timeout").value
self._speed_scale = self.get_parameter("speed_scale").value
self._steer_scale = self.get_parameter("steer_scale").value
# ── QoS ───────────────────────────────────────────────────────────────
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST, depth=10,
)
rel_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST, depth=10,
)
# ── Publishers ────────────────────────────────────────────────────────
self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos)
self._arm_pub = self.create_publisher(String, "/saltybot/arm_state", rel_qos)
self._error_pub = self.create_publisher(String, "/saltybot/error", rel_qos)
self._battery_pub = self.create_publisher(String, "/saltybot/telemetry/battery", rel_qos)
self._rpm_pub = self.create_publisher(String, "/saltybot/telemetry/motor_rpm", rel_qos)
self._diag_pub = self.create_publisher(DiagnosticArray, "/diagnostics", rel_qos)
# ── Subscribers ───────────────────────────────────────────────────────
self._cmd_vel_sub = self.create_subscription(
Twist, "/cmd_vel", self._on_cmd_vel, rel_qos,
)
self._pid_sub = self.create_subscription(
String, "/saltybot/pid_update", self._on_pid_update, rel_qos,
)
# ── Services ──────────────────────────────────────────────────────────
self._arm_srv = self.create_service(SetBool, "/saltybot/arm", self._svc_arm)
self._mode_srv = self.create_service(SetBool, "/saltybot/set_mode", self._svc_set_mode)
# ── Serial state ──────────────────────────────────────────────────────
self._port_name = port
self._baud = baud
self._ser: serial.Serial | None = None
self._ser_lock = threading.Lock()
self._parser = FrameParser()
# ── TX state ──────────────────────────────────────────────────────────
self._last_speed = 0
self._last_steer = 0
self._last_cmd_t = time.monotonic()
self._watchdog_sent = False # tracks whether we already sent zero
# ── Diagnostics state ──────────────────────────────────────────────────
self._last_arm_state = -1
self._last_battery_mv = 0
self._rx_frame_count = 0
# ── Open serial and start timers ──────────────────────────────────────
self._open_serial()
# Read at 200 Hz (serial RX thread is better, but timer keeps ROS2 integration clean)
self._read_timer = self.create_timer(0.005, self._read_cb)
# Heartbeat TX
self._hb_timer = self.create_timer(self._hb_period, self._heartbeat_cb)
# Watchdog check (fires at 2× watchdog_timeout for quick detection)
self._wd_timer = self.create_timer(self._wd_timeout / 2, self._watchdog_cb)
# Periodic diagnostics
self._diag_timer = self.create_timer(1.0, self._publish_diagnostics)
self.get_logger().info(
f"stm32_cmd_node started — {port} @ {baud} baud | "
f"HB {int(self._hb_period * 1000)}ms | WD {int(self._wd_timeout * 1000)}ms"
)
# ── Serial management ─────────────────────────────────────────────────────
def _open_serial(self) -> bool:
with self._ser_lock:
try:
self._ser = serial.Serial(
port=self._port_name,
baudrate=self._baud,
timeout=0.005, # non-blocking reads
write_timeout=0.1,
)
self._ser.reset_input_buffer()
self._parser.reset()
self.get_logger().info(f"Serial open: {self._port_name}")
return True
except serial.SerialException as exc:
self.get_logger().error(
f"Cannot open {self._port_name}: {exc}",
throttle_duration_sec=self._reconnect_delay,
)
self._ser = None
return False
def _close_serial(self) -> None:
with self._ser_lock:
if self._ser and self._ser.is_open:
try:
self._ser.close()
except Exception:
pass
self._ser = None
def _write(self, data: bytes) -> bool:
"""Thread-safe serial write. Returns False if port is not open."""
with self._ser_lock:
if self._ser is None or not self._ser.is_open:
return False
try:
self._ser.write(data)
return True
except serial.SerialException as exc:
self.get_logger().error(f"Serial write error: {exc}")
self._ser = None
return False
# ── RX — read callback ────────────────────────────────────────────────────
def _read_cb(self) -> None:
"""Read bytes from serial and feed them to the frame parser."""
raw: bytes | None = None
reconnect_needed = False
with self._ser_lock:
if self._ser is None or not self._ser.is_open:
reconnect_needed = True
else:
try:
n = self._ser.in_waiting
if n:
raw = self._ser.read(n)
except serial.SerialException as exc:
self.get_logger().error(f"Serial read error: {exc}")
self._ser = None
reconnect_needed = True
if reconnect_needed:
self.get_logger().warn(
"Serial disconnected — will retry",
throttle_duration_sec=self._reconnect_delay,
)
time.sleep(self._reconnect_delay)
self._open_serial()
return
if not raw:
return
for byte in raw:
frame = self._parser.feed(byte)
if frame is not None:
self._rx_frame_count += 1
self._dispatch_frame(frame)
def _dispatch_frame(self, frame) -> None:
"""Route a decoded frame to the appropriate publisher."""
now = self.get_clock().now().to_msg()
if isinstance(frame, ImuFrame):
self._publish_imu(frame, now)
elif isinstance(frame, BatteryFrame):
self._publish_battery(frame, now)
elif isinstance(frame, MotorRpmFrame):
self._publish_motor_rpm(frame, now)
elif isinstance(frame, ArmStateFrame):
self._publish_arm_state(frame, now)
elif isinstance(frame, ErrorFrame):
self._publish_error(frame, now)
elif isinstance(frame, tuple):
type_code, payload = frame
self.get_logger().debug(
f"Unknown telemetry type 0x{type_code:02X} len={len(payload)}"
)
# ── Telemetry publishers ──────────────────────────────────────────────────
def _publish_imu(self, frame: ImuFrame, stamp) -> None:
msg = Imu()
msg.header.stamp = stamp
msg.header.frame_id = IMU_FRAME_ID
# orientation: unknown — signal with -1 in first covariance
msg.orientation_covariance[0] = -1.0
msg.angular_velocity.x = math.radians(frame.pitch_deg)
msg.angular_velocity.y = math.radians(frame.roll_deg)
msg.angular_velocity.z = math.radians(frame.yaw_deg)
cov = math.radians(0.3) ** 2 # ±0.3° noise estimate from STM32 BMI088
msg.angular_velocity_covariance[0] = cov
msg.angular_velocity_covariance[4] = cov
msg.angular_velocity_covariance[8] = cov
msg.linear_acceleration.x = frame.accel_x
msg.linear_acceleration.y = frame.accel_y
msg.linear_acceleration.z = frame.accel_z
acov = 0.05 ** 2 # ±0.05 m/s² noise
msg.linear_acceleration_covariance[0] = acov
msg.linear_acceleration_covariance[4] = acov
msg.linear_acceleration_covariance[8] = acov
self._imu_pub.publish(msg)
def _publish_battery(self, frame: BatteryFrame, stamp) -> None:
payload = {
"voltage_v": round(frame.voltage_mv / 1000.0, 3),
"voltage_mv": frame.voltage_mv,
"current_ma": frame.current_ma,
"soc_pct": frame.soc_pct,
"charging": frame.current_ma < -100,
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
}
self._last_battery_mv = frame.voltage_mv
msg = String()
msg.data = json.dumps(payload)
self._battery_pub.publish(msg)
def _publish_motor_rpm(self, frame: MotorRpmFrame, stamp) -> None:
payload = {
"left_rpm": frame.left_rpm,
"right_rpm": frame.right_rpm,
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
}
msg = String()
msg.data = json.dumps(payload)
self._rpm_pub.publish(msg)
def _publish_arm_state(self, frame: ArmStateFrame, stamp) -> None:
label = _ARM_LABEL.get(frame.state, f"UNKNOWN({frame.state})")
if frame.state != self._last_arm_state:
self.get_logger().info(f"Arm state → {label} (flags=0x{frame.error_flags:02X})")
self._last_arm_state = frame.state
payload = {
"state": frame.state,
"state_label": label,
"error_flags": frame.error_flags,
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
}
msg = String()
msg.data = json.dumps(payload)
self._arm_pub.publish(msg)
def _publish_error(self, frame: ErrorFrame, stamp) -> None:
self.get_logger().error(
f"STM32 error code=0x{frame.error_code:02X} sub=0x{frame.subcode:02X}"
)
payload = {
"error_code": frame.error_code,
"subcode": frame.subcode,
"ts": f"{stamp.sec}.{stamp.nanosec:09d}",
}
msg = String()
msg.data = json.dumps(payload)
self._error_pub.publish(msg)
# ── TX — command send ─────────────────────────────────────────────────────
def _on_cmd_vel(self, msg: Twist) -> None:
"""Convert /cmd_vel Twist to SPEED_STEER frame at up to 50 Hz."""
speed = int(_clamp(msg.linear.x * self._speed_scale, -1000.0, 1000.0))
steer = int(_clamp(msg.angular.z * self._steer_scale, -1000.0, 1000.0))
self._last_speed = speed
self._last_steer = steer
self._last_cmd_t = time.monotonic()
self._watchdog_sent = False
frame = encode_speed_steer(speed, steer)
if not self._write(frame):
self.get_logger().warn(
"SPEED_STEER dropped — serial not open",
throttle_duration_sec=2.0,
)
def _heartbeat_cb(self) -> None:
"""Send HEARTBEAT every heartbeat_period (default 200ms)."""
self._write(encode_heartbeat())
def _watchdog_cb(self) -> None:
"""Send zero-speed if /cmd_vel silent for watchdog_timeout seconds."""
if time.monotonic() - self._last_cmd_t >= self._wd_timeout:
if not self._watchdog_sent:
self.get_logger().warn(
f"No /cmd_vel for {self._wd_timeout:.1f}s — sending zero-speed"
)
self._watchdog_sent = True
self._last_speed = 0
self._last_steer = 0
self._write(encode_speed_steer(0, 0))
def _on_pid_update(self, msg: String) -> None:
"""Parse JSON /saltybot/pid_update and send PID_UPDATE frame."""
try:
data = json.loads(msg.data)
kp = float(data["kp"])
ki = float(data["ki"])
kd = float(data["kd"])
except (ValueError, KeyError, json.JSONDecodeError) as exc:
self.get_logger().error(f"Bad PID update JSON: {exc}")
return
frame = encode_pid_update(kp, ki, kd)
if self._write(frame):
self.get_logger().info(f"PID update: kp={kp}, ki={ki}, kd={kd}")
else:
self.get_logger().warn("PID_UPDATE dropped — serial not open")
# ── Services ──────────────────────────────────────────────────────────────
def _svc_arm(self, request: SetBool.Request, response: SetBool.Response):
"""SetBool(True) = arm, SetBool(False) = disarm."""
arm = request.data
frame = encode_arm(arm)
ok = self._write(frame)
response.success = ok
response.message = ("ARMED" if arm else "DISARMED") if ok else "serial not open"
self.get_logger().info(
f"ARM service: {'arm' if arm else 'disarm'}{'sent' if ok else 'FAILED'}"
)
return response
def _svc_set_mode(self, request: SetBool.Request, response: SetBool.Response):
"""SetBool: data maps to mode byte (True=1, False=0)."""
mode = 1 if request.data else 0
frame = encode_set_mode(mode)
ok = self._write(frame)
response.success = ok
response.message = f"mode={mode}" if ok else "serial not open"
return response
# ── Diagnostics ───────────────────────────────────────────────────────────
def _publish_diagnostics(self) -> None:
diag = DiagnosticArray()
diag.header.stamp = self.get_clock().now().to_msg()
status = DiagnosticStatus()
status.name = "saltybot/stm32_cmd_node"
status.hardware_id = "stm32f722"
port_ok = self._ser is not None and self._ser.is_open
if port_ok:
status.level = DiagnosticStatus.OK
status.message = "Serial OK"
else:
status.level = DiagnosticStatus.ERROR
status.message = f"Serial disconnected: {self._port_name}"
wd_age = time.monotonic() - self._last_cmd_t
status.values = [
KeyValue(key="serial_port", value=self._port_name),
KeyValue(key="port_open", value=str(port_ok)),
KeyValue(key="rx_frames", value=str(self._rx_frame_count)),
KeyValue(key="rx_errors", value=str(self._parser.frames_error)),
KeyValue(key="last_speed", value=str(self._last_speed)),
KeyValue(key="last_steer", value=str(self._last_steer)),
KeyValue(key="cmd_vel_age_s", value=f"{wd_age:.2f}"),
KeyValue(key="battery_mv", value=str(self._last_battery_mv)),
KeyValue(key="arm_state", value=_ARM_LABEL.get(self._last_arm_state, "?")),
]
diag.status.append(status)
self._diag_pub.publish(diag)
# ── Lifecycle ─────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
# Send zero-speed + disarm on shutdown
self._write(encode_speed_steer(0, 0))
self._write(encode_arm(False))
self._close_serial()
super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = Stm32CmdNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,332 @@
"""stm32_protocol.py — Binary frame codec for Jetson↔STM32 communication.
Issue #119: defines the binary serial protocol between the Jetson Nano and the
STM32F722 flight controller over USB CDC @ 921600 baud.
Frame layout (all multi-byte fields are big-endian):
STX TYPE LEN PAYLOAD CRC16 ETX
0x02 1B 1B LEN bytes 2B BE 0x03
CRC16 covers: TYPE + LEN + PAYLOAD (not STX, ETX, or CRC bytes themselves).
CRC algorithm: CCITT-16, polynomial=0x1021, init=0xFFFF, no final XOR.
Command types (Jetson STM32):
0x01 HEARTBEAT no payload (len=0)
0x02 SPEED_STEER int16 speed + int16 steer (len=4) range: -1000..+1000
0x03 ARM uint8 (0=disarm, 1=arm) (len=1)
0x04 SET_MODE uint8 mode (len=1)
0x05 PID_UPDATE float32 kp + ki + kd (len=12)
Telemetry types (STM32 Jetson):
0x10 IMU int16×6: pitch,roll,yaw (×100 deg), ax,ay,az (×100 m/) (len=12)
0x11 BATTERY uint16 voltage_mv + int16 current_ma + uint8 soc_pct (len=5)
0x12 MOTOR_RPM int16 left_rpm + int16 right_rpm (len=4)
0x13 ARM_STATE uint8 state + uint8 error_flags (len=2)
0x14 ERROR uint8 error_code + uint8 subcode (len=2)
Usage:
# Encoding (Jetson → STM32)
frame = encode_speed_steer(300, -150)
ser.write(frame)
# Decoding (STM32 → Jetson), one byte at a time
parser = FrameParser()
for byte in incoming_bytes:
result = parser.feed(byte)
if result is not None:
handle_frame(result)
"""
from __future__ import annotations
import struct
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
# ── Frame constants ───────────────────────────────────────────────────────────
STX = 0x02
ETX = 0x03
MAX_PAYLOAD_LEN = 64 # hard limit; any frame larger is corrupt
# ── Command / telemetry type codes ────────────────────────────────────────────
class CmdType(IntEnum):
HEARTBEAT = 0x01
SPEED_STEER = 0x02
ARM = 0x03
SET_MODE = 0x04
PID_UPDATE = 0x05
class TelType(IntEnum):
IMU = 0x10
BATTERY = 0x11
MOTOR_RPM = 0x12
ARM_STATE = 0x13
ERROR = 0x14
# ── Parsed telemetry objects ──────────────────────────────────────────────────
@dataclass
class ImuFrame:
pitch_deg: float # degrees (positive = forward tilt)
roll_deg: float
yaw_deg: float
accel_x: float # m/s²
accel_y: float
accel_z: float
@dataclass
class BatteryFrame:
voltage_mv: int # millivolts (e.g. 11100 = 11.1 V)
current_ma: int # milliamps (negative = charging)
soc_pct: int # state of charge 0100 (from STM32 fuel gauge or lookup)
@dataclass
class MotorRpmFrame:
left_rpm: int
right_rpm: int
@dataclass
class ArmStateFrame:
state: int # 0=DISARMED 1=ARMED 2=TILT_FAULT
error_flags: int # bitmask
@dataclass
class ErrorFrame:
error_code: int
subcode: int
# Union type for decoded results
TelemetryFrame = ImuFrame | BatteryFrame | MotorRpmFrame | ArmStateFrame | ErrorFrame
# ── CRC16 CCITT ───────────────────────────────────────────────────────────────
def _crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int:
"""CRC16-CCITT: polynomial 0x1021, init 0xFFFF, no final XOR."""
crc = init
for byte in data:
crc ^= byte << 8
for _ in range(8):
if crc & 0x8000:
crc = (crc << 1) ^ 0x1021
else:
crc <<= 1
crc &= 0xFFFF
return crc
# ── Frame encoder ─────────────────────────────────────────────────────────────
def _build_frame(cmd_type: int, payload: bytes) -> bytes:
"""Assemble a complete binary frame with CRC16."""
assert len(payload) <= MAX_PAYLOAD_LEN, "Payload too large"
length = len(payload)
header = bytes([cmd_type, length])
crc = _crc16_ccitt(header + payload)
return bytes([STX, cmd_type, length]) + payload + struct.pack(">H", crc) + bytes([ETX])
def encode_heartbeat() -> bytes:
"""HEARTBEAT frame — no payload."""
return _build_frame(CmdType.HEARTBEAT, b"")
def encode_speed_steer(speed: int, steer: int) -> bytes:
"""SPEED_STEER frame — int16 speed + int16 steer, both in -1000..+1000."""
speed = max(-1000, min(1000, int(speed)))
steer = max(-1000, min(1000, int(steer)))
return _build_frame(CmdType.SPEED_STEER, struct.pack(">hh", speed, steer))
def encode_arm(arm: bool) -> bytes:
"""ARM frame — 0=disarm, 1=arm."""
return _build_frame(CmdType.ARM, struct.pack("B", 1 if arm else 0))
def encode_set_mode(mode: int) -> bytes:
"""SET_MODE frame — mode byte."""
return _build_frame(CmdType.SET_MODE, struct.pack("B", mode & 0xFF))
def encode_pid_update(kp: float, ki: float, kd: float) -> bytes:
"""PID_UPDATE frame — three float32 values."""
return _build_frame(CmdType.PID_UPDATE, struct.pack(">fff", kp, ki, kd))
# ── Frame decoder (state-machine parser) ─────────────────────────────────────
class ParserState(IntEnum):
WAIT_STX = 0
WAIT_TYPE = 1
WAIT_LEN = 2
PAYLOAD = 3
CRC_HI = 4
CRC_LO = 5
WAIT_ETX = 6
class ParseError(Exception):
pass
class FrameParser:
"""Byte-by-byte streaming parser for STM32 telemetry frames.
Feed individual bytes via feed(); returns a decoded TelemetryFrame (or raw
bytes tuple) when a complete valid frame is received.
Thread-safety: single-threaded wrap in a lock if shared across threads.
Usage::
parser = FrameParser()
for b in incoming:
result = parser.feed(b)
if result is not None:
process(result)
"""
def __init__(self) -> None:
self._state = ParserState.WAIT_STX
self._type = 0
self._length = 0
self._payload = bytearray()
self._crc_rcvd = 0
self.frames_ok = 0
self.frames_error = 0
def reset(self) -> None:
"""Reset parser to initial state (call after error or port reconnect)."""
self._state = ParserState.WAIT_STX
self._payload = bytearray()
def feed(self, byte: int) -> Optional[TelemetryFrame | tuple]:
"""Process one byte. Returns decoded frame on success, None otherwise.
On CRC error, increments frames_error and resets. The return value on
success is a dataclass (ImuFrame, BatteryFrame, etc.) or a
(type_code, raw_payload) tuple for unknown type codes.
"""
s = self._state
if s == ParserState.WAIT_STX:
if byte == STX:
self._state = ParserState.WAIT_TYPE
return None
if s == ParserState.WAIT_TYPE:
self._type = byte
self._state = ParserState.WAIT_LEN
return None
if s == ParserState.WAIT_LEN:
self._length = byte
self._payload = bytearray()
if self._length > MAX_PAYLOAD_LEN:
# Corrupt frame — too big; reset
self.frames_error += 1
self.reset()
return None
if self._length == 0:
self._state = ParserState.CRC_HI
else:
self._state = ParserState.PAYLOAD
return None
if s == ParserState.PAYLOAD:
self._payload.append(byte)
if len(self._payload) == self._length:
self._state = ParserState.CRC_HI
return None
if s == ParserState.CRC_HI:
self._crc_rcvd = byte << 8
self._state = ParserState.CRC_LO
return None
if s == ParserState.CRC_LO:
self._crc_rcvd |= byte
self._state = ParserState.WAIT_ETX
return None
if s == ParserState.WAIT_ETX:
self.reset() # always reset so we look for next STX
if byte != ETX:
self.frames_error += 1
return None
# Verify CRC
crc_data = bytes([self._type, self._length]) + self._payload
expected = _crc16_ccitt(crc_data)
if expected != self._crc_rcvd:
self.frames_error += 1
return None
# Decode
self.frames_ok += 1
return _decode_telemetry(self._type, bytes(self._payload))
# Should never reach here
self.reset()
return None
# ── Telemetry decoder ─────────────────────────────────────────────────────────
def _decode_telemetry(type_code: int, payload: bytes) -> Optional[TelemetryFrame | tuple]:
"""Decode a validated telemetry payload into a typed dataclass."""
try:
if type_code == TelType.IMU:
if len(payload) < 12:
return None
p, r, y, ax, ay, az = struct.unpack_from(">hhhhhh", payload)
return ImuFrame(
pitch_deg=p / 100.0,
roll_deg=r / 100.0,
yaw_deg=y / 100.0,
accel_x=ax / 100.0,
accel_y=ay / 100.0,
accel_z=az / 100.0,
)
if type_code == TelType.BATTERY:
if len(payload) < 5:
return None
v_mv, i_ma, soc = struct.unpack_from(">HhB", payload)
return BatteryFrame(voltage_mv=v_mv, current_ma=i_ma, soc_pct=soc)
if type_code == TelType.MOTOR_RPM:
if len(payload) < 4:
return None
left, right = struct.unpack_from(">hh", payload)
return MotorRpmFrame(left_rpm=left, right_rpm=right)
if type_code == TelType.ARM_STATE:
if len(payload) < 2:
return None
state, flags = struct.unpack_from("BB", payload)
return ArmStateFrame(state=state, error_flags=flags)
if type_code == TelType.ERROR:
if len(payload) < 2:
return None
code, sub = struct.unpack_from("BB", payload)
return ErrorFrame(error_code=code, subcode=sub)
except struct.error:
return None
# Unknown telemetry type — return raw
return (type_code, payload)

View File

@ -13,11 +13,15 @@ setup(
"launch/bridge.launch.py",
"launch/cmd_vel_bridge.launch.py",
"launch/remote_estop.launch.py",
"launch/stm32_cmd.launch.py",
"launch/battery.launch.py",
]),
(f"share/{package_name}/config", [
"config/bridge_params.yaml",
"config/cmd_vel_bridge_params.yaml",
"config/estop_params.yaml",
"config/stm32_cmd_params.yaml",
"config/battery_params.yaml",
]),
],
install_requires=["setuptools", "pyserial"],
@ -36,6 +40,10 @@ setup(
# Nav2 cmd_vel bridge: velocity limits + ramp + deadman + mode gate
"cmd_vel_bridge_node = saltybot_bridge.cmd_vel_bridge_node:main",
"remote_estop_node = saltybot_bridge.remote_estop_node:main",
# Binary-framed STM32 command node (Issue #119)
"stm32_cmd_node = saltybot_bridge.stm32_cmd_node:main",
# Battery management node (Issue #125)
"battery_node = saltybot_bridge.battery_node:main",
],
},
)

View File

@ -0,0 +1,341 @@
"""test_stm32_cmd_node.py — Unit tests for Stm32CmdNode with mock serial port.
Tests:
- Serial open/close lifecycle
- TwistSPEED_STEER frame encoding and clamping
- Heartbeat cadence (mock timer)
- Watchdog fires after 500ms silence and sends zero-speed
- ARM service request ARM frame on serial
- PID_UPDATE topic PID_UPDATE frame on serial
- Telemetry receive path: mock serial publisher callbacks
- Auto-reconnect on serial disconnect
- Zero-speed sent on node shutdown
- CRC errors counted correctly
Run with: pytest test/test_stm32_cmd_node.py -v
No ROS2 runtime required uses mock Node infrastructure.
"""
from __future__ import annotations
import io
import struct
import threading
import time
import sys
import os
from unittest.mock import MagicMock, patch, call
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_bridge.stm32_protocol import (
STX, ETX, CmdType, TelType,
encode_speed_steer, encode_heartbeat, encode_arm, encode_pid_update,
_build_frame, _crc16_ccitt,
ImuFrame, BatteryFrame, ArmStateFrame, MotorRpmFrame, ErrorFrame,
)
# ── Mock serial ───────────────────────────────────────────────────────────────
class MockSerial:
"""In-memory serial port mock for testing TX/RX paths without hardware."""
def __init__(self, rx_data: bytes = b""):
self._tx_buf = io.BytesIO()
self._rx_buf = io.BytesIO(rx_data)
self.is_open = True
self._lock = threading.Lock()
self.write_calls = 0
self.raise_on_write: Exception | None = None
def write(self, data: bytes) -> int:
if self.raise_on_write:
raise self.raise_on_write
with self._lock:
self._tx_buf.write(data)
self.write_calls += 1
return len(data)
@property
def in_waiting(self) -> int:
pos = self._rx_buf.tell()
end = self._rx_buf.seek(0, 2)
self._rx_buf.seek(pos)
return end - pos
def read(self, n: int) -> bytes:
return self._rx_buf.read(n)
def reset_input_buffer(self) -> None:
pass
def close(self) -> None:
self.is_open = False
def get_written(self) -> bytes:
with self._lock:
return self._tx_buf.getvalue()
def clear_written(self) -> None:
with self._lock:
self._tx_buf = io.BytesIO()
# ── Frame builder helpers ─────────────────────────────────────────────────────
def _imu_frame_bytes(pitch=100, roll=0, yaw=0, ax=0, ay=0, az=981) -> bytes:
payload = struct.pack(">hhhhhh", pitch, roll, yaw, ax, ay, az)
return _build_frame(TelType.IMU, payload)
def _battery_frame_bytes(v_mv=11100, i_ma=500, soc=80) -> bytes:
payload = struct.pack(">HhB", v_mv, i_ma, soc)
return _build_frame(TelType.BATTERY, payload)
def _arm_state_frame_bytes(state=1, flags=0) -> bytes:
payload = struct.pack("BB", state, flags)
return _build_frame(TelType.ARM_STATE, payload)
def _motor_rpm_frame_bytes(left=1200, right=1200) -> bytes:
payload = struct.pack(">hh", left, right)
return _build_frame(TelType.MOTOR_RPM, payload)
def _error_frame_bytes(code=0x01, sub=0x00) -> bytes:
payload = struct.pack("BB", code, sub)
return _build_frame(TelType.ERROR, payload)
# ═══════════════════════════════════════════════════════════════════
# Protocol logic tests (no ROS2)
# ═══════════════════════════════════════════════════════════════════
class TestCommandEncoding:
"""Verify the TX path logic without spinning up ROS2."""
def test_zero_twist_sends_zero_frame(self):
frame = encode_speed_steer(0, 0)
speed, steer = struct.unpack(">hh", frame[3:7])
assert speed == 0
assert steer == 0
def test_forward_1ms_scales_to_1000(self):
"""linear.x = 1.0 m/s with speed_scale=1000 → speed=1000."""
speed_scale = 1000.0
speed = int(max(-1000, min(1000, 1.0 * speed_scale)))
frame = encode_speed_steer(speed, 0)
s, _ = struct.unpack(">hh", frame[3:7])
assert s == 1000
def test_left_turn_scales_correctly(self):
"""angular.z = 1.0 with steer_scale=-500 → steer=-500."""
steer_scale = -500.0
steer = int(max(-1000, min(1000, 1.0 * steer_scale)))
frame = encode_speed_steer(0, steer)
_, st = struct.unpack(">hh", frame[3:7])
assert st == -500
def test_heartbeat_frame_is_minimal(self):
frame = encode_heartbeat()
assert frame[1] == CmdType.HEARTBEAT
assert frame[2] == 0 # zero payload
def test_arm_frame_payload(self):
frame_arm = encode_arm(True)
frame_disarm = encode_arm(False)
assert frame_arm[3] == 1
assert frame_disarm[3] == 0
def test_pid_frame_all_zeros(self):
frame = encode_pid_update(0.0, 0.0, 0.0)
kp, ki, kd = struct.unpack(">fff", frame[3:15])
assert kp == pytest.approx(0.0)
assert ki == pytest.approx(0.0)
assert kd == pytest.approx(0.0)
def test_pid_frame_typical_values(self):
frame = encode_pid_update(3.0, 0.02, 0.12)
kp, ki, kd = struct.unpack(">fff", frame[3:15])
assert kp == pytest.approx(3.0, rel=1e-5)
assert ki == pytest.approx(0.02, rel=1e-5)
assert kd == pytest.approx(0.12, rel=1e-5)
class TestMockSerialTX:
"""Test TX path using MockSerial — verifies bytes sent over the wire."""
def _write_and_get(self, frames: list[bytes]) -> bytes:
"""Write multiple frames to a MockSerial and return all written bytes."""
ms = MockSerial()
for f in frames:
ms.write(f)
return ms.get_written()
def test_speed_steer_bytes_on_wire(self):
frame = encode_speed_steer(300, -150)
ms = MockSerial()
ms.write(frame)
written = ms.get_written()
assert written[0] == STX
assert written[1] == CmdType.SPEED_STEER
assert written[-1] == ETX
def test_heartbeat_bytes_on_wire(self):
frame = encode_heartbeat()
ms = MockSerial()
ms.write(frame)
written = ms.get_written()
assert written[0] == STX
assert written[1] == CmdType.HEARTBEAT
def test_write_error_detected(self):
import serial
ms = MockSerial()
ms.raise_on_write = serial.SerialException("test error")
with pytest.raises(serial.SerialException):
ms.write(b"\x00")
def test_multiple_frames_written_sequentially(self):
ms = MockSerial()
ms.write(encode_heartbeat())
ms.write(encode_speed_steer(100, 0))
ms.write(encode_arm(True))
written = ms.get_written()
# All three frames should be in the buffer
assert written.count(STX) == 3
assert written.count(ETX) == 3
def test_write_call_count_tracked(self):
ms = MockSerial()
ms.write(encode_heartbeat())
ms.write(encode_heartbeat())
assert ms.write_calls == 2
class TestMockSerialRX:
"""Test RX parsing path using MockSerial with pre-loaded telemetry data."""
from saltybot_bridge.stm32_protocol import FrameParser
def test_rx_imu_frame(self):
from saltybot_bridge.stm32_protocol import FrameParser, ImuFrame
raw = _imu_frame_bytes(pitch=500, roll=-200, yaw=100, ax=0, ay=0, az=981)
ms = MockSerial(rx_data=raw)
parser = FrameParser()
results = []
while ms.in_waiting:
chunk = ms.read(ms.in_waiting)
for b in chunk:
r = parser.feed(b)
if r is not None:
results.append(r)
assert len(results) == 1
f = results[0]
assert isinstance(f, ImuFrame)
assert f.pitch_deg == pytest.approx(5.0)
assert f.roll_deg == pytest.approx(-2.0)
assert f.accel_z == pytest.approx(9.81)
def test_rx_battery_frame(self):
from saltybot_bridge.stm32_protocol import FrameParser, BatteryFrame
raw = _battery_frame_bytes(v_mv=10500, i_ma=1200, soc=45)
ms = MockSerial(rx_data=raw)
parser = FrameParser()
results = []
while ms.in_waiting:
for b in ms.read(ms.in_waiting):
r = parser.feed(b)
if r is not None:
results.append(r)
f = results[0]
assert isinstance(f, BatteryFrame)
assert f.voltage_mv == 10500
assert f.soc_pct == 45
def test_rx_multiple_frames_in_one_read(self):
from saltybot_bridge.stm32_protocol import FrameParser
raw = (_imu_frame_bytes() + _arm_state_frame_bytes() + _battery_frame_bytes())
ms = MockSerial(rx_data=raw)
parser = FrameParser()
results = []
data = ms.read(len(raw))
for b in data:
r = parser.feed(b)
if r is not None:
results.append(r)
assert len(results) == 3
assert parser.frames_error == 0
def test_rx_bad_crc_counted_as_error(self):
from saltybot_bridge.stm32_protocol import FrameParser
raw = bytearray(_arm_state_frame_bytes(state=1))
raw[-3] ^= 0xFF # corrupt CRC
ms = MockSerial(rx_data=bytes(raw))
parser = FrameParser()
for b in ms.read(len(raw)):
parser.feed(b)
assert parser.frames_ok == 0
assert parser.frames_error == 1
def test_rx_resync_after_corrupt_byte(self):
from saltybot_bridge.stm32_protocol import FrameParser, ArmStateFrame
garbage = b"\xDE\xAD\x00\x00"
valid = _arm_state_frame_bytes(state=1)
ms = MockSerial(rx_data=garbage + valid)
parser = FrameParser()
results = []
while ms.in_waiting:
for b in ms.read(ms.in_waiting):
r = parser.feed(b)
if r is not None:
results.append(r)
assert len(results) == 1
assert isinstance(results[0], ArmStateFrame)
# ═══════════════════════════════════════════════════════════════════
# Watchdog logic tests
# ═══════════════════════════════════════════════════════════════════
class TestWatchdog:
"""Test the watchdog logic in isolation without a full ROS2 node."""
def test_watchdog_fires_after_timeout(self):
"""Simulate watchdog: record time, wait >500ms, verify zero-speed sent."""
ms = MockSerial()
last_cmd_t = time.monotonic() - 0.6 # 600ms ago
wd_timeout = 0.5
# Watchdog check logic (mirrors _watchdog_cb)
if time.monotonic() - last_cmd_t >= wd_timeout:
ms.write(encode_speed_steer(0, 0))
written = ms.get_written()
speed, steer = struct.unpack(">hh", written[3:7])
assert speed == 0
assert steer == 0
def test_watchdog_does_not_fire_before_timeout(self):
"""Watchdog must not fire if /cmd_vel was recent."""
ms = MockSerial()
last_cmd_t = time.monotonic() - 0.2 # 200ms ago — within timeout
wd_timeout = 0.5
if time.monotonic() - last_cmd_t >= wd_timeout:
ms.write(encode_speed_steer(0, 0))
assert ms.get_written() == b""
def test_zero_speed_frame_structure(self):
"""Zero-speed frame must have correct structure."""
frame = encode_speed_steer(0, 0)
assert frame[0] == STX
assert frame[1] == CmdType.SPEED_STEER
assert frame[-1] == ETX
speed, steer = struct.unpack(">hh", frame[3:7])
assert speed == 0
assert steer == 0

View File

@ -0,0 +1,483 @@
"""test_stm32_protocol.py — Unit tests for binary STM32 frame codec.
Tests:
- CRC16-CCITT correctness
- All encoder functions produce correctly structured frames
- FrameParser decodes valid frames for all telemetry types
- FrameParser rejects frames with bad CRC
- FrameParser rejects frames with bad ETX
- FrameParser resyncs after corrupt data (finds next STX)
- FrameParser handles streaming byte-by-byte input
- FrameParser handles oversized payload gracefully
- Speed/steer clamping in encode_speed_steer
- Round-trip encode decode for all known telemetry types
Run with: pytest test/test_stm32_protocol.py -v
"""
from __future__ import annotations
import struct
import pytest
import sys
import os
# ── Path setup (no ROS2 install needed) ──────────────────────────────────────
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_bridge.stm32_protocol import (
STX, ETX,
CmdType, TelType,
ImuFrame, BatteryFrame, MotorRpmFrame, ArmStateFrame, ErrorFrame,
FrameParser,
_crc16_ccitt, _build_frame, _decode_telemetry,
encode_heartbeat, encode_speed_steer, encode_arm,
encode_set_mode, encode_pid_update,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _parse_all(raw: bytes):
"""Feed all bytes into a fresh FrameParser, return list of decoded frames."""
parser = FrameParser()
results = []
for b in raw:
r = parser.feed(b)
if r is not None:
results.append(r)
return results, parser
def _build_telem_frame(type_code: int, payload: bytes) -> bytes:
"""Build a raw telemetry frame (same framing as command encoder)."""
return _build_frame(type_code, payload)
# ═══════════════════════════════════════════════════════════════════
# CRC16 tests
# ═══════════════════════════════════════════════════════════════════
class TestCrc16:
def test_known_vector_empty(self):
"""CRC of empty data must equal 0xFFFF (init value)."""
assert _crc16_ccitt(b"") == 0xFFFF
def test_known_vector_123456789(self):
"""Classic CCITT-16 test vector: '123456789' → 0x29B1."""
assert _crc16_ccitt(b"123456789") == 0x29B1
def test_crc_changes_on_bit_flip(self):
data = b"\x02\x04\x42"
flipped = b"\x02\x04\x43"
assert _crc16_ccitt(data) != _crc16_ccitt(flipped)
def test_crc_all_zeros(self):
crc = _crc16_ccitt(bytes(16))
assert isinstance(crc, int)
assert 0 <= crc <= 0xFFFF
def test_crc_single_byte(self):
c1 = _crc16_ccitt(b"\x01")
c2 = _crc16_ccitt(b"\x02")
assert c1 != c2
# ═══════════════════════════════════════════════════════════════════
# Encoder tests
# ═══════════════════════════════════════════════════════════════════
class TestEncoders:
def test_heartbeat_structure(self):
frame = encode_heartbeat()
# STX TYPE LEN CRC_HI CRC_LO ETX (no payload → 6 bytes)
assert len(frame) == 6
assert frame[0] == STX
assert frame[1] == CmdType.HEARTBEAT
assert frame[2] == 0 # LEN=0
assert frame[-1] == ETX
def test_heartbeat_crc_valid(self):
frame = encode_heartbeat()
crc_expected = _crc16_ccitt(bytes([CmdType.HEARTBEAT, 0]))
crc_actual = struct.unpack(">H", frame[3:5])[0]
assert crc_actual == crc_expected
def test_speed_steer_structure(self):
frame = encode_speed_steer(300, -150)
assert frame[0] == STX
assert frame[1] == CmdType.SPEED_STEER
assert frame[2] == 4 # LEN=4 (two int16)
assert frame[-1] == ETX
assert len(frame) == 10 # 3 header + 4 payload + 2 CRC + 1 ETX
def test_speed_steer_payload_values(self):
frame = encode_speed_steer(500, -200)
speed, steer = struct.unpack(">hh", frame[3:7])
assert speed == 500
assert steer == -200
def test_speed_steer_clamped_max(self):
frame = encode_speed_steer(9999, 9999)
speed, steer = struct.unpack(">hh", frame[3:7])
assert speed == 1000
assert steer == 1000
def test_speed_steer_clamped_min(self):
frame = encode_speed_steer(-9999, -9999)
speed, steer = struct.unpack(">hh", frame[3:7])
assert speed == -1000
assert steer == -1000
def test_speed_steer_zero(self):
frame = encode_speed_steer(0, 0)
speed, steer = struct.unpack(">hh", frame[3:7])
assert speed == 0
assert steer == 0
def test_arm_true(self):
frame = encode_arm(True)
assert frame[1] == CmdType.ARM
assert frame[2] == 1 # LEN=1
assert frame[3] == 1 # payload = 1 (arm)
def test_arm_false(self):
frame = encode_arm(False)
assert frame[3] == 0 # payload = 0 (disarm)
def test_set_mode(self):
frame = encode_set_mode(3)
assert frame[1] == CmdType.SET_MODE
assert frame[3] == 3
def test_set_mode_masked(self):
frame = encode_set_mode(0x1FF)
assert frame[3] == 0xFF # masked to uint8
def test_pid_update_structure(self):
frame = encode_pid_update(1.5, 0.01, 0.1)
assert frame[1] == CmdType.PID_UPDATE
assert frame[2] == 12 # LEN=12 (three float32)
kp, ki, kd = struct.unpack(">fff", frame[3:15])
assert kp == pytest.approx(1.5)
assert ki == pytest.approx(0.01)
assert kd == pytest.approx(0.1)
def test_pid_update_zero(self):
frame = encode_pid_update(0.0, 0.0, 0.0)
kp, ki, kd = struct.unpack(">fff", frame[3:15])
assert kp == pytest.approx(0.0)
def test_all_frames_end_with_etx(self):
frames = [
encode_heartbeat(),
encode_speed_steer(100, -100),
encode_arm(True),
encode_set_mode(1),
encode_pid_update(2.0, 0.1, 0.05),
]
for f in frames:
assert f[-1] == ETX, f"Frame for type 0x{f[1]:02X} doesn't end with ETX"
def test_all_frames_start_with_stx(self):
for f in [encode_heartbeat(), encode_speed_steer(0, 0)]:
assert f[0] == STX
# ═══════════════════════════════════════════════════════════════════
# FrameParser tests
# ═══════════════════════════════════════════════════════════════════
class TestFrameParser:
# ── IMU telemetry ─────────────────────────────────────────────────────────
def test_decode_imu_frame(self):
payload = struct.pack(">hhhhhh",
1250, -300, 900, # pitch×100, roll×100, yaw×100
981, 0, -50, # ax×100, ay×100, az×100
)
raw = _build_telem_frame(TelType.IMU, payload)
results, parser = _parse_all(raw)
assert len(results) == 1
f = results[0]
assert isinstance(f, ImuFrame)
assert f.pitch_deg == pytest.approx(12.5)
assert f.roll_deg == pytest.approx(-3.0)
assert f.yaw_deg == pytest.approx(9.0)
assert f.accel_x == pytest.approx(9.81)
assert f.accel_y == pytest.approx(0.0)
assert f.accel_z == pytest.approx(-0.5)
def test_decode_imu_zero(self):
payload = struct.pack(">hhhhhh", 0, 0, 0, 0, 0, 0)
raw = _build_telem_frame(TelType.IMU, payload)
results, _ = _parse_all(raw)
f = results[0]
assert f.pitch_deg == pytest.approx(0.0)
assert f.accel_z == pytest.approx(0.0)
# ── Battery telemetry ─────────────────────────────────────────────────────
def test_decode_battery_frame(self):
payload = struct.pack(">HhB", 11100, -500, 72) # 11.1V, -500mA (charging), 72%
raw = _build_telem_frame(TelType.BATTERY, payload)
results, _ = _parse_all(raw)
assert len(results) == 1
f = results[0]
assert isinstance(f, BatteryFrame)
assert f.voltage_mv == 11100
assert f.current_ma == -500
assert f.soc_pct == 72
def test_decode_battery_discharging(self):
payload = struct.pack(">HhB", 10200, 2000, 45) # 10.2V, 2A draw, 45%
raw = _build_telem_frame(TelType.BATTERY, payload)
results, _ = _parse_all(raw)
f = results[0]
assert f.current_ma == 2000 # positive = discharging
# ── Motor RPM telemetry ───────────────────────────────────────────────────
def test_decode_motor_rpm(self):
payload = struct.pack(">hh", 1500, -1500) # left=1500, right=-1500 (spinning in place)
raw = _build_telem_frame(TelType.MOTOR_RPM, payload)
results, _ = _parse_all(raw)
f = results[0]
assert isinstance(f, MotorRpmFrame)
assert f.left_rpm == 1500
assert f.right_rpm == -1500
def test_decode_motor_rpm_zero(self):
payload = struct.pack(">hh", 0, 0)
raw = _build_telem_frame(TelType.MOTOR_RPM, payload)
results, _ = _parse_all(raw)
assert results[0].left_rpm == 0
# ── Arm state telemetry ───────────────────────────────────────────────────
def test_decode_arm_state_armed(self):
payload = struct.pack("BB", 1, 0) # ARMED, no error flags
raw = _build_telem_frame(TelType.ARM_STATE, payload)
results, _ = _parse_all(raw)
f = results[0]
assert isinstance(f, ArmStateFrame)
assert f.state == 1
assert f.error_flags == 0
def test_decode_arm_state_tilt_fault(self):
payload = struct.pack("BB", 2, 0x04) # TILT_FAULT, bit 2 set
raw = _build_telem_frame(TelType.ARM_STATE, payload)
results, _ = _parse_all(raw)
f = results[0]
assert f.state == 2
assert f.error_flags == 0x04
# ── Error telemetry ───────────────────────────────────────────────────────
def test_decode_error_frame(self):
payload = struct.pack("BB", 0x0A, 0x01) # error 0x0A, sub 0x01
raw = _build_telem_frame(TelType.ERROR, payload)
results, _ = _parse_all(raw)
f = results[0]
assert isinstance(f, ErrorFrame)
assert f.error_code == 0x0A
assert f.subcode == 0x01
# ── CRC errors ────────────────────────────────────────────────────────────
def test_bad_crc_rejected(self):
raw = bytearray(_build_telem_frame(TelType.BATTERY,
struct.pack(">HhB", 11000, 0, 80)))
raw[-3] ^= 0xFF # flip CRC_HI byte
results, parser = _parse_all(bytes(raw))
assert results == []
assert parser.frames_error == 1
assert parser.frames_ok == 0
def test_bad_crc_both_bytes(self):
raw = bytearray(_build_telem_frame(TelType.MOTOR_RPM,
struct.pack(">hh", 100, 100)))
raw[-3] ^= 0xAA
raw[-2] ^= 0x55
results, parser = _parse_all(bytes(raw))
assert results == []
assert parser.frames_error >= 1
# ── ETX errors ────────────────────────────────────────────────────────────
def test_bad_etx_rejected(self):
raw = bytearray(_build_telem_frame(TelType.ARM_STATE,
struct.pack("BB", 1, 0)))
raw[-1] = 0xFF # corrupt ETX
results, parser = _parse_all(bytes(raw))
assert results == []
assert parser.frames_error == 1
# ── Resync after corrupt data ─────────────────────────────────────────────
def test_resync_after_garbage(self):
"""Parser must resync and decode a valid frame after leading garbage."""
garbage = b"\xDE\xAD\xBE\xEF\xFF\x00\x42"
valid = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
results, parser = _parse_all(garbage + valid)
assert len(results) == 1
assert isinstance(results[0], ArmStateFrame)
def test_two_consecutive_frames(self):
"""Parser must handle two back-to-back valid frames."""
f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
f2 = _build_telem_frame(TelType.MOTOR_RPM, struct.pack(">hh", 500, 500))
results, parser = _parse_all(f1 + f2)
assert len(results) == 2
assert isinstance(results[0], ArmStateFrame)
assert isinstance(results[1], MotorRpmFrame)
assert parser.frames_error == 0
def test_three_frames_with_garbage_between(self):
"""Corrupt bytes between valid frames must not discard subsequent valid frames."""
f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
f2 = _build_telem_frame(TelType.BATTERY, struct.pack(">HhB", 11000, 0, 80))
junk = b"\xDE\xAD"
results, parser = _parse_all(f1 + junk + f2)
assert len(results) == 2
def test_unknown_type_returns_raw_tuple(self):
"""Unknown telemetry type must be returned as (type_code, payload) tuple."""
raw = _build_telem_frame(0x99, b"\xAB\xCD")
results, _ = _parse_all(raw)
assert len(results) == 1
type_code, payload = results[0]
assert type_code == 0x99
assert payload == b"\xAB\xCD"
def test_oversized_payload_rejected(self):
"""A frame claiming LEN > MAX_PAYLOAD_LEN must be silently skipped."""
# Craft a raw frame with LEN=200 (> 64 limit)
raw = bytes([STX, TelType.IMU, 200])
results, parser = _parse_all(raw)
assert results == []
assert parser.frames_error == 1
def test_empty_payload_frame(self):
"""A frame with LEN=0 (heartbeat style) must decode correctly."""
raw = _build_frame(CmdType.HEARTBEAT, b"")
results, parser = _parse_all(raw)
assert len(results) == 1
type_code, payload = results[0]
assert type_code == CmdType.HEARTBEAT
assert payload == b""
assert parser.frames_error == 0
# ── Streaming byte-by-byte ────────────────────────────────────────────────
def test_stream_one_byte_at_a_time(self):
"""Every byte fed individually must produce the same result as bulk."""
payload = struct.pack(">HhB", 12000, -100, 90)
raw = _build_telem_frame(TelType.BATTERY, payload)
parser = FrameParser()
results = []
for b in raw:
r = parser.feed(b)
if r is not None:
results.append(r)
assert len(results) == 1
f = results[0]
assert isinstance(f, BatteryFrame)
assert f.soc_pct == 90
# ── Parser reset ──────────────────────────────────────────────────────────
def test_parser_reset_clears_state(self):
"""After reset(), parser must accept a new valid frame."""
payload = struct.pack("BB", 0, 0)
raw = _build_telem_frame(TelType.ARM_STATE, payload)
parser = FrameParser()
# Feed partial frame
for b in raw[:3]:
parser.feed(b)
# Reset mid-frame
parser.reset()
# Feed full frame
results = []
for b in raw:
r = parser.feed(b)
if r is not None:
results.append(r)
assert len(results) == 1
def test_frames_ok_counter(self):
f1 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0))
f2 = _build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 0, 0))
_, parser = _parse_all(f1 + f2)
assert parser.frames_ok == 2
def test_frames_error_counter(self):
raw = bytearray(_build_telem_frame(TelType.ARM_STATE, struct.pack("BB", 1, 0)))
raw[-3] ^= 0xFF # corrupt CRC
_, parser = _parse_all(bytes(raw))
assert parser.frames_error == 1
# ═══════════════════════════════════════════════════════════════════
# Decode edge cases
# ═══════════════════════════════════════════════════════════════════
class TestDecodeEdgeCases:
def test_imu_short_payload_returns_none(self):
result = _decode_telemetry(TelType.IMU, b"\x00\x01") # only 2 bytes, need 12
assert result is None
def test_battery_short_payload_returns_none(self):
result = _decode_telemetry(TelType.BATTERY, b"\x00")
assert result is None
def test_motor_rpm_short_payload_returns_none(self):
result = _decode_telemetry(TelType.MOTOR_RPM, b"\x00\x01")
assert result is None
def test_arm_state_short_payload_returns_none(self):
result = _decode_telemetry(TelType.ARM_STATE, b"\x01")
assert result is None
def test_error_short_payload_returns_none(self):
result = _decode_telemetry(TelType.ERROR, b"\x01")
assert result is None
# ═══════════════════════════════════════════════════════════════════
# Round-trip tests (encode command → re-parse frame body)
# ═══════════════════════════════════════════════════════════════════
class TestRoundTrip:
"""Verify encoder output satisfies parser expectations (same framing)."""
def _parse_cmd_as_telem(self, frame: bytes):
"""Feed a command frame through FrameParser (type codes overlap logic)."""
results, parser = _parse_all(frame)
return results, parser
def test_heartbeat_roundtrip(self):
frame = encode_heartbeat()
results, parser = self._parse_cmd_as_telem(frame)
assert len(results) == 1
assert parser.frames_error == 0
def test_speed_steer_roundtrip(self):
frame = encode_speed_steer(750, -300)
results, _ = self._parse_cmd_as_telem(frame)
assert len(results) == 1
type_code, payload = results[0]
assert type_code == CmdType.SPEED_STEER
speed, steer = struct.unpack(">hh", payload)
assert speed == 750
assert steer == -300
def test_pid_roundtrip(self):
frame = encode_pid_update(2.5, 0.05, 0.08)
results, _ = self._parse_cmd_as_telem(frame)
type_code, payload = results[0]
kp, ki, kd = struct.unpack(">fff", payload)
assert kp == pytest.approx(2.5, rel=1e-5)
assert ki == pytest.approx(0.05, rel=1e-5)
assert kd == pytest.approx(0.08, rel=1e-5)