feat: STM32 serial bridge — USB CDC to ROS2 topics #16

Merged
seb merged 1 commits from sl-jetson/stm32-serial-bridge into main 2026-02-28 17:19:25 -05:00
10 changed files with 508 additions and 0 deletions

View File

@ -0,0 +1,7 @@
__pycache__/
*.pyc
*.pyo
*.egg-info/
build/
install/
log/

View File

@ -0,0 +1,8 @@
stm32_serial_bridge:
ros__parameters:
# STM32 USB CDC device node
# Use /dev/stm32-bridge if udev rule from jetson/docs/pinout.md is applied
serial_port: /dev/ttyACM0
baud_rate: 921600
timeout: 0.1 # serial readline timeout (seconds)
reconnect_delay: 2.0 # seconds between reconnect attempts on disconnect

View File

@ -0,0 +1,34 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
serial_port_arg = DeclareLaunchArgument(
"serial_port",
default_value="/dev/ttyACM0",
description="STM32 USB CDC device node",
)
baud_rate_arg = DeclareLaunchArgument(
"baud_rate",
default_value="921600",
description="Serial baud rate",
)
bridge_node = Node(
package="saltybot_bridge",
executable="serial_bridge_node",
name="stm32_serial_bridge",
output="screen",
parameters=[
{
"serial_port": LaunchConfiguration("serial_port"),
"baud_rate": LaunchConfiguration("baud_rate"),
"timeout": 0.1,
"reconnect_delay": 2.0,
}
],
)
return LaunchDescription([serial_port_arg, baud_rate_arg, bridge_node])

View File

@ -0,0 +1,27 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_bridge</name>
<version>0.1.0</version>
<description>
STM32F722 USB CDC serial bridge for saltybot.
Reads JSON telemetry from the STM32 flight controller and publishes
sensor_msgs/Imu, std_msgs/String (balance state), and diagnostic_msgs/DiagnosticArray.
</description>
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>diagnostic_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,304 @@
"""
saltybot_bridge serial_bridge_node
STM32F722 USB CDC ROS2 topic publisher
Telemetry frame (50 Hz, newline-delimited JSON):
{"p":<pitch×10>,"r":<roll×10>,"e":<err×10>,"ig":<integral×10>,
"m":<motor_cmd -1000..1000>,"s":<state 0-2>,"y":<yaw×10>}
Error frame (IMU fault):
{"err":<imu_errno>}
State values (balance_state_t):
0 = DISARMED 1 = ARMED 2 = TILT_FAULT
Published topics:
/saltybot/imu sensor_msgs/Imu pitch/roll/yaw as angular velocity
/saltybot/balance_state std_msgs/String (JSON) full PID diagnostics
"""
import json
import math
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import serial
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
# Balance state labels matching STM32 balance_state_t enum
_STATE_LABEL = {0: "DISARMED", 1: "ARMED", 2: "TILT_FAULT"}
# Sensor frame_id published in Imu header
IMU_FRAME_ID = "imu_link"
class SerialBridgeNode(Node):
def __init__(self):
super().__init__("stm32_serial_bridge")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("serial_port", "/dev/ttyACM0")
self.declare_parameter("baud_rate", 921600)
self.declare_parameter("timeout", 0.1)
self.declare_parameter("reconnect_delay", 2.0)
port = self.get_parameter("serial_port").value
baud = self.get_parameter("baud_rate").value
timeout = self.get_parameter("timeout").value
self._reconnect_delay = self.get_parameter("reconnect_delay").value
# ── QoS ───────────────────────────────────────────────────────────────
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
diag_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
# ── Publishers ────────────────────────────────────────────────────────
self._imu_pub = self.create_publisher(Imu, "/saltybot/imu", sensor_qos)
self._balance_pub = self.create_publisher(
String, "/saltybot/balance_state", diag_qos
)
self._diag_pub = self.create_publisher(
DiagnosticArray, "/diagnostics", diag_qos
)
# ── State ─────────────────────────────────────────────────────────────
self._port_name = port
self._baud = baud
self._timeout = timeout
self._ser: serial.Serial | None = None
self._frame_count = 0
self._error_count = 0
self._last_state = -1
# ── Open serial and start read timer ──────────────────────────────────
self._open_serial()
# Poll at 100 Hz — STM32 sends at 50 Hz, so we never miss a frame
self._timer = self.create_timer(0.01, self._read_cb)
self.get_logger().info(
f"stm32_serial_bridge started — {port} @ {baud} baud"
)
# ── Serial management ─────────────────────────────────────────────────────
def _open_serial(self) -> bool:
try:
self._ser = serial.Serial(
port=self._port_name,
baudrate=self._baud,
timeout=self._timeout,
)
self._ser.reset_input_buffer()
self.get_logger().info(f"Serial port open: {self._port_name}")
return True
except serial.SerialException as exc:
self.get_logger().error(f"Cannot open {self._port_name}: {exc}")
self._ser = None
return False
def _close_serial(self):
if self._ser and self._ser.is_open:
try:
self._ser.close()
except Exception:
pass
self._ser = None
# ── Read callback ─────────────────────────────────────────────────────────
def _read_cb(self):
if self._ser is None or not self._ser.is_open:
# Attempt reconnect on next timer fire after delay
self.get_logger().warn(
f"Serial disconnected — retrying in {self._reconnect_delay}s",
throttle_duration_sec=self._reconnect_delay,
)
self._open_serial()
return
try:
# Read all lines buffered since last call
while self._ser.in_waiting:
raw = self._ser.readline()
if raw:
self._parse_and_publish(raw)
except serial.SerialException as exc:
self.get_logger().error(f"Serial read error: {exc}")
self._close_serial()
# ── Parsing + publishing ──────────────────────────────────────────────────
def _parse_and_publish(self, raw: bytes):
try:
text = raw.decode("ascii", errors="ignore").strip()
if not text:
return
data = json.loads(text)
except (ValueError, UnicodeDecodeError) as exc:
self.get_logger().debug(f"Parse error ({exc}): {raw!r}")
self._error_count += 1
return
now = self.get_clock().now().to_msg()
# IMU fault frame — {"err": <errno>}
if "err" in data:
self._publish_imu_fault(data["err"], now)
return
# Normal telemetry frame — validate required keys
required = ("p", "r", "e", "ig", "m", "s", "y")
if not all(k in data for k in required):
self.get_logger().debug(f"Incomplete frame: {text}")
return
# Values are ×10 integers from firmware — convert to float
pitch_deg = data["p"] / 10.0
roll_deg = data["r"] / 10.0
yaw_deg = data["y"] / 10.0
error_deg = data["e"] / 10.0
integral = data["ig"] / 10.0
motor_cmd = float(data["m"]) # -1000..+1000 ESC units
state = int(data["s"])
self._frame_count += 1
self._publish_imu(pitch_deg, roll_deg, yaw_deg, now)
self._publish_balance_state(
pitch_deg, roll_deg, yaw_deg,
error_deg, integral, motor_cmd, state, now,
)
# Log state transitions
if state != self._last_state:
label = _STATE_LABEL.get(state, f"UNKNOWN({state})")
self.get_logger().info(f"Balance state → {label}")
self._last_state = state
def _publish_imu(self, pitch_deg, roll_deg, yaw_deg, stamp):
"""
Publish sensor_msgs/Imu.
The STM32 IMU gives Euler angles (pitch/roll from accelerometer+gyro
fusion, yaw from gyro integration). We publish them as angular_velocity
for immediate use by slam_toolbox / robot_localization.
Note: orientation quaternion is left zeroed (covariance [-1,...]) until
a proper fusion node (e.g. robot_localization EKF) computes it.
"""
msg = Imu()
msg.header.stamp = stamp
msg.header.frame_id = IMU_FRAME_ID
# orientation unknown — signal with -1 in first covariance element
msg.orientation_covariance[0] = -1.0
# angular_velocity: Euler rates approximation (good enough at low freq)
msg.angular_velocity.x = math.radians(pitch_deg) # pitch → x
msg.angular_velocity.y = math.radians(roll_deg) # roll → y
msg.angular_velocity.z = math.radians(yaw_deg) # yaw → z
# angular_velocity covariance (diagonal, degrees converted to rad)
cov_rad = math.radians(0.5) ** 2 # ±0.5° noise estimate
msg.angular_velocity_covariance[0] = cov_rad
msg.angular_velocity_covariance[4] = cov_rad
msg.angular_velocity_covariance[8] = cov_rad
# linear_acceleration: unknown from this frame
msg.linear_acceleration_covariance[0] = -1.0
self._imu_pub.publish(msg)
def _publish_balance_state(
self, pitch, roll, yaw, error, integral, motor_cmd, state, stamp
):
"""Publish full PID diagnostics as JSON string and /diagnostics."""
state_label = _STATE_LABEL.get(state, f"UNKNOWN({state})")
payload = {
"stamp": f"{stamp.sec}.{stamp.nanosec:09d}",
"state": state_label,
"pitch_deg": round(pitch, 1),
"roll_deg": round(roll, 1),
"yaw_deg": round(yaw, 1),
"pid_error_deg": round(error, 1),
"integral": round(integral, 1),
"motor_cmd": int(motor_cmd),
"frames": self._frame_count,
"parse_errors": self._error_count,
}
str_msg = String()
str_msg.data = json.dumps(payload)
self._balance_pub.publish(str_msg)
# /diagnostics
diag = DiagnosticArray()
diag.header.stamp = stamp
status = DiagnosticStatus()
status.name = "saltybot/balance_controller"
status.hardware_id = "stm32f722"
status.message = state_label
if state == 1: # ARMED
status.level = DiagnosticStatus.OK
elif state == 0: # DISARMED
status.level = DiagnosticStatus.WARN
else: # TILT_FAULT
status.level = DiagnosticStatus.ERROR
status.values = [
KeyValue(key="pitch_deg", value=f"{pitch:.1f}"),
KeyValue(key="roll_deg", value=f"{roll:.1f}"),
KeyValue(key="yaw_deg", value=f"{yaw:.1f}"),
KeyValue(key="pid_error_deg", value=f"{error:.1f}"),
KeyValue(key="integral", value=f"{integral:.1f}"),
KeyValue(key="motor_cmd", value=f"{int(motor_cmd)}"),
KeyValue(key="state", value=state_label),
]
diag.status.append(status)
self._diag_pub.publish(diag)
def _publish_imu_fault(self, errno: int, stamp):
"""On IMU fault frame, publish an ERROR diagnostic."""
diag = DiagnosticArray()
diag.header.stamp = stamp
status = DiagnosticStatus()
status.level = DiagnosticStatus.ERROR
status.name = "saltybot/balance_controller"
status.hardware_id = "stm32f722"
status.message = f"IMU fault errno={errno}"
diag.status.append(status)
self._diag_pub.publish(diag)
self.get_logger().error(f"STM32 reported IMU fault: errno={errno}")
def destroy_node(self):
self._close_serial()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = SerialBridgeNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_bridge
[install]
install_scripts=$base/lib/saltybot_bridge

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_bridge"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/bridge.launch.py"]),
(f"share/{package_name}/config", ["config/bridge_params.yaml"]),
],
install_requires=["setuptools", "pyserial"],
zip_safe=True,
maintainer="sl-jetson",
maintainer_email="sl-jetson@saltylab.local",
description="STM32 USB CDC → ROS2 serial bridge for saltybot",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"serial_bridge_node = saltybot_bridge.serial_bridge_node:main",
],
},
)

View File

@ -0,0 +1,97 @@
"""
Unit tests for STM32 telemetry parsing logic.
Run with: pytest jetson/ros2_ws/src/saltybot_bridge/test/test_parse.py
"""
import json
import pytest
# ── Minimal stub so we can test parsing without a ROS2 runtime ───────────────
def parse_frame(raw: bytes):
"""Mirror of the parsing logic in serial_bridge_node._parse_and_publish."""
text = raw.decode("ascii", errors="ignore").strip()
if not text:
return None
data = json.loads(text)
if "err" in data:
return {"type": "fault", "errno": data["err"]}
required = ("p", "r", "e", "ig", "m", "s", "y")
if not all(k in data for k in required):
raise ValueError(f"Incomplete frame: {data}")
return {
"type": "telemetry",
"pitch_deg": data["p"] / 10.0,
"roll_deg": data["r"] / 10.0,
"yaw_deg": data["y"] / 10.0,
"pid_error_deg": data["e"] / 10.0,
"integral": data["ig"] / 10.0,
"motor_cmd": int(data["m"]),
"state": int(data["s"]),
}
# ── Tests ─────────────────────────────────────────────────────────────────────
def test_normal_frame():
raw = b'{"p":125,"r":-30,"e":50,"ig":20,"m":350,"s":1,"y":0}\n'
result = parse_frame(raw)
assert result["type"] == "telemetry"
assert result["pitch_deg"] == pytest.approx(12.5)
assert result["roll_deg"] == pytest.approx(-3.0)
assert result["yaw_deg"] == pytest.approx(0.0)
assert result["pid_error_deg"] == pytest.approx(5.0)
assert result["integral"] == pytest.approx(2.0)
assert result["motor_cmd"] == 350
assert result["state"] == 1 # ARMED
def test_fault_frame():
raw = b'{"err":-1}\n'
result = parse_frame(raw)
assert result["type"] == "fault"
assert result["errno"] == -1
def test_zero_frame():
raw = b'{"p":0,"r":0,"e":0,"ig":0,"m":0,"s":0,"y":0}\n'
result = parse_frame(raw)
assert result["type"] == "telemetry"
assert result["pitch_deg"] == pytest.approx(0.0)
assert result["state"] == 0 # DISARMED
def test_tilt_fault_state():
raw = b'{"p":450,"r":0,"e":400,"ig":999,"m":1000,"s":2,"y":0}\n'
result = parse_frame(raw)
assert result["state"] == 2 # TILT_FAULT
assert result["pitch_deg"] == pytest.approx(45.0)
assert result["motor_cmd"] == 1000
def test_negative_motor_cmd():
raw = b'{"p":-100,"r":0,"e":-100,"ig":-50,"m":-750,"s":1,"y":10}\n'
result = parse_frame(raw)
assert result["motor_cmd"] == -750
assert result["pitch_deg"] == pytest.approx(-10.0)
def test_incomplete_frame_raises():
raw = b'{"p":100,"r":0}\n'
with pytest.raises(ValueError, match="Incomplete frame"):
parse_frame(raw)
def test_empty_line_returns_none():
assert parse_frame(b"\n") is None
assert parse_frame(b" \n") is None
def test_corrupt_frame_raises_json_error():
raw = b'not_json\n'
with pytest.raises(json.JSONDecodeError):
parse_frame(raw)