Merge pull request 'feat: Orin UART/USB serial comms with ESP32 Balance (bd-wim1)' (#727) from sl-perception/bd-wim1-orin-uart-esp32 into main

This commit is contained in:
sl-jetson 2026-04-17 23:09:53 -04:00
commit 61f241ae1d
11 changed files with 1201 additions and 0 deletions

View File

@ -0,0 +1,20 @@
esp32_balance_node:
ros__parameters:
# USB-CDC serial port: CH343 chip, VID:PID 1a86:55d3
# Udev symlink set by jetson/docs/pinout.md udev rules
serial_port: /dev/esp32-balance
baud_rate: 460800
# /cmd_vel linear.x (m/s) → speed units sent to ESP32-S3 BALANCE
speed_scale: 1000.0
# /cmd_vel angular.z (rad/s) → steer units (negative = flip turn convention)
steer_scale: -500.0
# Safety: send CMD_DRIVE(0,0) if /cmd_vel silent for this many seconds
command_timeout_s: 0.5
# CMD_HEARTBEAT TX interval in seconds (ESP32-S3 watchdog fires at 500 ms)
heartbeat_period: 0.2
# Reconnect attempt interval when serial is lost
reconnect_delay: 2.0

View File

@ -0,0 +1,73 @@
"""esp32_balance.launch.py — Launch the Orin↔ESP32-S3 BALANCE USB-serial bridge.
bd-wim1: replaces can_bridge_node (CANable2/python-can) with USB-CDC serial.
The freed CANable2 slot is used by Here4 GPS (bd-p47c).
USB device
----------
CH343 USB-CDC chip on ESP32-S3 BALANCE board
VID:PID 1a86:55d3
Symlink /dev/esp32-balance (via udev rule in jetson/docs/pinout.md)
Baud 460800
Usage
-----
# Default:
ros2 launch saltybot_esp32_serial esp32_balance.launch.py
# Override port (e.g. direct ACM path):
ros2 launch saltybot_esp32_serial esp32_balance.launch.py serial_port:=/dev/ttyACM0
# Custom velocity scales:
ros2 launch saltybot_esp32_serial esp32_balance.launch.py speed_scale:=800.0 steer_scale:=-400.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
pkg_share = get_package_share_directory("saltybot_esp32_serial")
params_file = os.path.join(pkg_share, "config", "esp32_balance_params.yaml")
serial_port_arg = DeclareLaunchArgument(
"serial_port",
default_value="/dev/esp32-balance",
description="USB-CDC serial port for ESP32-S3 BALANCE (CH343, 1a86:55d3)",
)
speed_scale_arg = DeclareLaunchArgument(
"speed_scale",
default_value="1000.0",
description="linear.x (m/s) → motor speed units",
)
steer_scale_arg = DeclareLaunchArgument(
"steer_scale",
default_value="-500.0",
description="angular.z (rad/s) → steer units",
)
node = Node(
package="saltybot_esp32_serial",
executable="esp32_balance_node",
name="esp32_balance_node",
output="screen",
parameters=[
params_file,
{
"serial_port": LaunchConfiguration("serial_port"),
"speed_scale": LaunchConfiguration("speed_scale"),
"steer_scale": LaunchConfiguration("steer_scale"),
},
],
)
return LaunchDescription([
serial_port_arg,
speed_scale_arg,
steer_scale_arg,
node,
])

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_esp32_serial</name>
<version>0.1.0</version>
<description>
UART/USB-CDC serial bridge for SaltyBot Orin — interfaces with the
ESP32-S3 BALANCE board (CH343, 1a86:55d3) using a binary framing protocol
[0xAA][LEN][TYPE][PAYLOAD][CRC8]. Replaces saltybot_can_bridge (CANable2)
to free the CAN bus for Here4 GPS (bd-p47c).
Publishes the same ROS2 topics as saltybot_can_bridge for backward compat:
/saltybot/attitude, /can/battery, /can/vesc/{left,right}/state,
/can/connection_status
System dependency: pyserial (apt: python3-serial or pip: pyserial)
Bead: bd-wim1 Counterpart firmware: bd-66hx (hal)
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<!-- pyserial: apt install python3-serial or pip install pyserial -->
<exec_depend>python3-serial</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
# saltybot_esp32_serial — UART/USB-CDC bridge for ESP32-S3 BALANCE (bd-wim1)

View File

@ -0,0 +1,369 @@
"""esp32_balance_node.py — ROS2 bridge: Orin ↔ ESP32-S3 BALANCE via USB-CDC serial.
bd-wim1: replaces saltybot_can_bridge (CANable2/python-can) with USB serial.
The CANable2 is freed for Here4 GPS (bd-p47c).
Physical connection
-------------------
Device : /dev/esp32-balance (CH343 USB-CDC, VID:PID 1a86:55d3)
Baud : 460800, 8N1
Protocol: [0xAA][LEN][TYPE][PAYLOAD][CRC8] (see esp32_balance_protocol.py)
Subscriptions
-------------
/cmd_vel geometry_msgs/Twist CMD_DRIVE (at up to 50 Hz)
/estop std_msgs/Bool CMD_ESTOP
/saltybot/arm std_msgs/Bool CMD_ARM
/saltybot/pid_update std_msgs/String JSON CMD_PID {"kp":,"ki":,"kd":}
Publications
------------
/saltybot/attitude std_msgs/String JSON pitch, motor_cmd, state, etc.
/saltybot/balance_state std_msgs/String same as /saltybot/attitude (alias)
/can/battery sensor_msgs/BatteryState from TELEM_STATUS.vbat_mv
/can/vesc/left/state std_msgs/Float32MultiArray [erpm, duty0, voltage_mv/1000, current_ma/1000]
/can/vesc/right/state std_msgs/Float32MultiArray same layout
/can/connection_status std_msgs/String "connected" | "disconnected"
NOTE: /can/* topic names are preserved for backward compatibility with
nodes that subscribed to saltybot_can_bridge outputs.
Parameters
----------
serial_port str USB-CDC port default: /dev/esp32-balance
baud_rate int Baud rate default: 460800
speed_scale float linear.x (m/s) units default: 1000.0
steer_scale float angular.z (rad/s) units default: -500.0
command_timeout_s float /cmd_vel watchdog default: 0.5
heartbeat_period float CMD_HEARTBEAT interval default: 0.2
reconnect_delay float Retry interval, s default: 2.0
"""
from __future__ import annotations
import json
import threading
import time
from typing import Optional
import rclpy
import serial
from geometry_msgs.msg import Twist
from rclpy.node import Node
from sensor_msgs.msg import BatteryState
from std_msgs.msg import Bool, Float32MultiArray, String
from .esp32_balance_protocol import (
BAUD_RATE,
SERIAL_PORT_DEFAULT,
AckFrame,
BalanceState,
BALANCE_STATE_LABEL,
FrameParser,
NackFrame,
StatusFrame,
VescFrame,
encode_arm,
encode_drive,
encode_estop,
encode_heartbeat,
encode_pid,
)
_WATCHDOG_HZ: float = 20.0 # TX watchdog check rate
class Esp32BalanceNode(Node):
"""Serial bridge replacing CANable2 CAN comms with USB-CDC to ESP32-S3 BALANCE."""
def __init__(self) -> None:
super().__init__("esp32_balance_node")
# ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("serial_port", SERIAL_PORT_DEFAULT)
self.declare_parameter("baud_rate", BAUD_RATE)
self.declare_parameter("speed_scale", 1000.0)
self.declare_parameter("steer_scale", -500.0)
self.declare_parameter("command_timeout_s", 0.5)
self.declare_parameter("heartbeat_period", 0.2)
self.declare_parameter("reconnect_delay", 2.0)
self._port_name = self.get_parameter("serial_port").value
self._baud = self.get_parameter("baud_rate").value
self._speed_scale = self.get_parameter("speed_scale").value
self._steer_scale = self.get_parameter("steer_scale").value
self._cmd_timeout = self.get_parameter("command_timeout_s").value
self._hb_period = self.get_parameter("heartbeat_period").value
self._reconnect_delay = self.get_parameter("reconnect_delay").value
# ── State ─────────────────────────────────────────────────────────
self._ser: Optional[serial.Serial] = None
self._ser_lock = threading.Lock()
self._parser = FrameParser()
self._last_cmd_time = time.monotonic()
self._watchdog_sent = False
# ── Publishers ────────────────────────────────────────────────────
self._pub_attitude = self.create_publisher(String, "/saltybot/attitude", 10)
self._pub_balance = self.create_publisher(String, "/saltybot/balance_state", 10)
self._pub_battery = self.create_publisher(BatteryState, "/can/battery", 10)
self._pub_vesc_left = self.create_publisher(Float32MultiArray, "/can/vesc/left/state", 10)
self._pub_vesc_right = self.create_publisher(Float32MultiArray, "/can/vesc/right/state", 10)
self._pub_status = self.create_publisher(String, "/can/connection_status", 10)
# ── Subscriptions ─────────────────────────────────────────────────
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
self.create_subscription(Bool, "/estop", self._estop_cb, 10)
self.create_subscription(Bool, "/saltybot/arm", self._arm_cb, 10)
self.create_subscription(String, "/saltybot/pid_update", self._pid_cb, 10)
# ── Timers ────────────────────────────────────────────────────────
self.create_timer(self._hb_period, self._heartbeat_cb)
self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb)
self.create_timer(self._reconnect_delay, self._reconnect_cb)
# ── Open port + start reader thread ───────────────────────────────
self._try_connect()
self._reader_thread = threading.Thread(
target=self._reader_loop, daemon=True, name="balance_serial_reader"
)
self._reader_thread.start()
self.get_logger().info(
f"esp32_balance_node — {self._port_name} @ {self._baud} baud | "
f"speed_scale={self._speed_scale} steer_scale={self._steer_scale}"
)
# ── Serial management ─────────────────────────────────────────────────
def _try_connect(self) -> None:
with self._ser_lock:
if self._ser is not None and self._ser.is_open:
return
try:
self._ser = serial.Serial(
port=self._port_name,
baudrate=self._baud,
timeout=0.05,
write_timeout=0.1,
)
self._ser.reset_input_buffer()
self._parser.reset()
self.get_logger().info(f"Serial open: {self._port_name}")
self._publish_status("connected")
except serial.SerialException as exc:
self.get_logger().warning(
f"Cannot open {self._port_name}: {exc} — retry in {self._reconnect_delay:.0f} s",
throttle_duration_sec=self._reconnect_delay,
)
self._ser = None
self._publish_status("disconnected")
def _reconnect_cb(self) -> None:
with self._ser_lock:
open_ok = self._ser is not None and self._ser.is_open
if not open_ok:
self._try_connect()
def _handle_serial_error(self, exc: Exception, context: str) -> None:
self.get_logger().warning(f"Serial error in {context}: {exc}")
with self._ser_lock:
if self._ser is not None:
try:
self._ser.close()
except Exception:
pass
self._ser = None
self._publish_status("disconnected")
# ── Serial write helper ───────────────────────────────────────────────
def _write(self, data: bytes, context: str = "") -> bool:
with self._ser_lock:
if self._ser is None or not self._ser.is_open:
return False
ser = self._ser
try:
ser.write(data)
return True
except serial.SerialException as exc:
self._handle_serial_error(exc, f"write({context})")
return False
# ── ROS subscriptions ─────────────────────────────────────────────────
def _cmd_vel_cb(self, msg: Twist) -> None:
"""Convert /cmd_vel to CMD_DRIVE at up to 50 Hz."""
self._last_cmd_time = time.monotonic()
self._watchdog_sent = False
speed = int(msg.linear.x * self._speed_scale)
steer = int(msg.angular.z * self._steer_scale)
self._write(encode_drive(speed, steer), "cmd_vel")
def _estop_cb(self, msg: Bool) -> None:
frame = encode_estop(msg.data)
self._write(frame, "estop")
if msg.data:
self.get_logger().warning("E-stop asserted → CMD_ESTOP sent to ESP32-S3 BALANCE")
def _arm_cb(self, msg: Bool) -> None:
self._write(encode_arm(msg.data), "arm")
def _pid_cb(self, msg: String) -> None:
"""Parse JSON {"kp":…,"ki":…,"kd":…} and send CMD_PID."""
try:
d = json.loads(msg.data)
kp = float(d["kp"])
ki = float(d["ki"])
kd = float(d["kd"])
except (KeyError, ValueError, json.JSONDecodeError) as exc:
self.get_logger().error(f"Bad /saltybot/pid_update JSON: {exc}")
return
self._write(encode_pid(kp, ki, kd), "pid_update")
# ── Timers ────────────────────────────────────────────────────────────
def _heartbeat_cb(self) -> None:
self._write(encode_heartbeat(), "heartbeat")
def _watchdog_cb(self) -> None:
"""Send CMD_DRIVE(0,0) if /cmd_vel has been silent for command_timeout_s."""
if self._watchdog_sent:
return
if time.monotonic() - self._last_cmd_time > self._cmd_timeout:
self._write(encode_drive(0, 0), "watchdog")
self._watchdog_sent = True
# ── Background reader ─────────────────────────────────────────────────
def _reader_loop(self) -> None:
while rclpy.ok():
with self._ser_lock:
ser = self._ser if (self._ser and self._ser.is_open) else None
if ser is None:
time.sleep(0.05)
continue
try:
n = ser.in_waiting
if n:
raw = ser.read(n)
else:
time.sleep(0.002)
continue
except serial.SerialException as exc:
self._handle_serial_error(exc, "reader_loop")
continue
for byte in raw:
frame = self._parser.feed(byte)
if frame is not None:
self._dispatch(frame)
def _dispatch(self, frame) -> None:
try:
if isinstance(frame, StatusFrame):
self._handle_status(frame)
elif isinstance(frame, VescFrame):
self._handle_vesc(frame)
elif isinstance(frame, AckFrame):
self.get_logger().debug(
f"ACK for cmd type 0x{frame.echoed_type:02X}"
)
elif isinstance(frame, NackFrame):
self.get_logger().warning(
f"NACK cmd=0x{frame.cmd_type:02X} err=0x{frame.error_code:02X}"
)
elif isinstance(frame, tuple):
type_code, _ = frame
self.get_logger().debug(f"Unknown telemetry type 0x{type_code:02X}")
except Exception as exc:
self.get_logger().warning(f"Error dispatching frame {type(frame).__name__}: {exc}")
# ── Frame handlers ────────────────────────────────────────────────────
def _handle_status(self, f: StatusFrame) -> None:
"""Publish balance controller status — replaces CAN 0x400 handler."""
now = self.get_clock().now().to_msg()
payload = {
"pitch_deg": round(f.pitch_deg, 2),
"motor_cmd": f.motor_cmd,
"vbat_mv": f.vbat_mv,
"balance_state": f.balance_state,
"state_label": BALANCE_STATE_LABEL.get(f.balance_state,
f"UNKNOWN({f.balance_state})"),
"flags": f.flags,
"estop_active": bool(f.flags & 0x01),
"hb_timeout": bool(f.flags & 0x02),
"ts": f"{now.sec}.{now.nanosec:09d}",
}
msg = String()
msg.data = json.dumps(payload)
self._pub_attitude.publish(msg)
self._pub_balance.publish(msg)
# Battery piggyback on STATUS frame for nodes that use /can/battery
bat = BatteryState()
bat.header.stamp = now
bat.voltage = f.vbat_mv / 1000.0
bat.present = True
bat.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
self._pub_battery.publish(bat)
def _handle_vesc(self, f: VescFrame) -> None:
"""Publish proxied VESC telemetry — replaces CAN VESC_STATUS_1 handler.
Float32MultiArray layout: [erpm, voltage_v, current_a, temp_c]
(duty not available via serial proxy; 0.0 placeholder)
"""
arr = Float32MultiArray()
arr.data = [
float(f.erpm),
float(f.voltage_mv) / 1000.0,
float(f.current_ma) / 1000.0,
f.temp_c,
]
if f.vesc_id == 56:
self._pub_vesc_left.publish(arr)
elif f.vesc_id == 68:
self._pub_vesc_right.publish(arr)
else:
self.get_logger().warning(f"Unknown VESC ID {f.vesc_id} in telemetry frame")
# ── Status helper ─────────────────────────────────────────────────────
def _publish_status(self, status: str) -> None:
msg = String()
msg.data = status
self._pub_status.publish(msg)
# ── Shutdown ──────────────────────────────────────────────────────────
def destroy_node(self) -> None:
# Send zero drive + estop before closing
self._write(encode_drive(0, 0), "shutdown")
self._write(encode_estop(True), "shutdown")
with self._ser_lock:
if self._ser is not None:
try:
self._ser.close()
except Exception:
pass
self._ser = None
super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = Esp32BalanceNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,355 @@
"""esp32_balance_protocol.py — Binary frame codec for Orin↔ESP32-S3 BALANCE UART/USB comms.
bd-wim1: replaces CANable2/python-can ESP32 interface with USB-CDC serial.
Counterpart firmware bead: bd-66hx (hal's ESP32-S3 BALANCE side).
Frame layout:
SYNC LEN TYPE PAYLOAD CRC8
0xAA 1B 1B LEN bytes 1B
LEN = number of payload bytes (062)
CRC8 = CRC8-SMBUS (poly=0x07, init=0x00), computed over LEN+TYPE+PAYLOAD
Physical layer
Device : /dev/esp32-balance (CH343 USB-CDC, VID:PID 1a86:55d3)
Baud : 460800, 8N1
Command types (Orin ESP32-S3 BALANCE):
0x01 CMD_HEARTBEAT no payload len=0
0x02 CMD_DRIVE int16 speed_units, int16 steer_units len=4 range 1000..+1000
0x03 CMD_ESTOP uint8 (1=assert, 0=clear) len=1
0x04 CMD_ARM uint8 (1=arm, 0=disarm) len=1
0x05 CMD_PID float32 kp, float32 ki, float32 kd len=12
Telemetry types (ESP32-S3 BALANCE Orin):
0x80 TELEM_STATUS int16 pitch_x10, int16 motor_cmd,
uint16 vbat_mv, uint8 balance_state, uint8 flags len=8
0x81 TELEM_VESC_LEFT int32 erpm, uint16 voltage_mv,
int16 current_ma, uint16 temp_c_x10 len=10
0x82 TELEM_VESC_RIGHT same layout as TELEM_VESC_LEFT len=10
0xA0 RESP_ACK uint8 echoed cmd type len=1
0xA1 RESP_NACK uint8 cmd type, uint8 error_code len=2
Balance state values (TELEM_STATUS.balance_state):
0 DISARMED
1 ARMED
2 TILT_FAULT
3 ESTOP
NACK error codes:
0x01 ERR_BAD_CRC
0x02 ERR_BAD_LEN
0x03 ERR_ESTOP_ACTIVE
0x04 ERR_DISARMED
NOTE: Message type IDs coordinated with hal (bd-66hx) on 2026-04-17.
Confirm against bd-66hx PR before deploying on hardware.
"""
from __future__ import annotations
import struct
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
# ── Physical layer constants ──────────────────────────────────────────────────
SERIAL_PORT_DEFAULT = "/dev/esp32-balance"
BAUD_RATE = 460800
MAX_PAYLOAD_LEN = 62 # hard cap; larger frames are considered corrupt
# ── Framing ───────────────────────────────────────────────────────────────────
SYNC_BYTE = 0xAA
# ── Command type codes (Orin → ESP32-S3 BALANCE) ─────────────────────────────
class CmdType(IntEnum):
HEARTBEAT = 0x01
DRIVE = 0x02
ESTOP = 0x03
ARM = 0x04
PID = 0x05
# ── Telemetry type codes (ESP32-S3 BALANCE → Orin) ───────────────────────────
class TelType(IntEnum):
STATUS = 0x80
VESC_LEFT = 0x81
VESC_RIGHT = 0x82
ACK = 0xA0
NACK = 0xA1
# ── Balance state enum ────────────────────────────────────────────────────────
class BalanceState(IntEnum):
DISARMED = 0
ARMED = 1
TILT_FAULT = 2
ESTOP = 3
BALANCE_STATE_LABEL = {
BalanceState.DISARMED: "DISARMED",
BalanceState.ARMED: "ARMED",
BalanceState.TILT_FAULT: "TILT_FAULT",
BalanceState.ESTOP: "ESTOP",
}
# ── Parsed telemetry dataclasses ─────────────────────────────────────────────
@dataclass
class StatusFrame:
"""TELEM_STATUS (0x80): balance controller status at ~10 Hz."""
pitch_deg: float # degrees (positive = forward tilt); raw pitch_x10 / 10.0
motor_cmd: int # ESC command, 1000..+1000
vbat_mv: int # battery voltage, millivolts
balance_state: int # BalanceState enum
flags: int # bitmask — bit 0: estop_active, bit 1: heartbeat_timeout
@dataclass
class VescFrame:
"""TELEM_VESC_LEFT (0x81) or TELEM_VESC_RIGHT (0x82): VESC proxied telemetry at ~10 Hz."""
vesc_id: int # 56 = left, 68 = right (maps to CAN IDs from hardware spec)
erpm: int # electrical RPM (signed)
voltage_mv: int # bus voltage, millivolts
current_ma: int # phase current, milliamps (signed)
temp_c: float # motor temperature, °C; raw temp_c_x10 / 10.0
@dataclass
class AckFrame:
"""RESP_ACK (0xA0): acknowledgement for a successfully parsed command."""
echoed_type: int
@dataclass
class NackFrame:
"""RESP_NACK (0xA1): rejection; contains original cmd type and error code."""
cmd_type: int
error_code: int
TelemetryFrame = StatusFrame | VescFrame | AckFrame | NackFrame
# ── CRC8-SMBUS ────────────────────────────────────────────────────────────────
def _crc8(data: bytes) -> int:
"""CRC8-SMBUS: polynomial 0x07, init 0x00, no final XOR.
Coverage: LEN + TYPE + PAYLOAD bytes.
"""
crc = 0
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x07
else:
crc <<= 1
crc &= 0xFF
return crc
# ── Frame encoder ─────────────────────────────────────────────────────────────
def _build_frame(cmd_type: int, payload: bytes) -> bytes:
"""Build a complete binary frame: [0xAA][LEN][TYPE][PAYLOAD][CRC8]."""
if len(payload) > MAX_PAYLOAD_LEN:
raise ValueError(f"Payload length {len(payload)} exceeds max {MAX_PAYLOAD_LEN}")
length = len(payload)
crc_data = bytes([length, cmd_type]) + payload
crc = _crc8(crc_data)
return bytes([SYNC_BYTE, length, cmd_type]) + payload + bytes([crc])
def encode_heartbeat() -> bytes:
"""CMD_HEARTBEAT — no payload. Send every ≤500 ms to keep watchdog alive."""
return _build_frame(CmdType.HEARTBEAT, b"")
def encode_drive(speed: int, steer: int) -> bytes:
"""CMD_DRIVE — int16 speed_units + int16 steer_units (big-endian, 1000..+1000).
Parameters
----------
speed : target longitudinal speed in motor units (positive = forward)
steer : steering demand in motor units (positive = right turn)
"""
speed = max(-1000, min(1000, int(speed)))
steer = max(-1000, min(1000, int(steer)))
return _build_frame(CmdType.DRIVE, struct.pack(">hh", speed, steer))
def encode_estop(assert_stop: bool) -> bytes:
"""CMD_ESTOP — uint8, 1=assert, 0=clear."""
return _build_frame(CmdType.ESTOP, struct.pack("B", 1 if assert_stop else 0))
def encode_arm(arm: bool) -> bytes:
"""CMD_ARM — uint8, 1=arm, 0=disarm."""
return _build_frame(CmdType.ARM, struct.pack("B", 1 if arm else 0))
def encode_pid(kp: float, ki: float, kd: float) -> bytes:
"""CMD_PID — three float32 values, big-endian."""
if kp < 0.0 or ki < 0.0 or kd < 0.0:
raise ValueError("PID gains must be non-negative")
return _build_frame(CmdType.PID, struct.pack(">fff", kp, ki, kd))
# ── Frame decoder (streaming state-machine) ───────────────────────────────────
class _ParseState(IntEnum):
WAIT_SYNC = 0
WAIT_LEN = 1
WAIT_TYPE = 2
PAYLOAD = 3
WAIT_CRC = 4
class FrameParser:
"""Byte-by-byte streaming parser for ESP32-S3 BALANCE telemetry frames.
Feed individual bytes via ``feed()``; returns a decoded dataclass when a
complete, CRC-valid frame arrives, otherwise returns ``None``.
Not thread-safe guard with a lock if accessed from multiple threads.
Counters
--------
frames_ok : successfully decoded frames
frames_error : CRC failures or oversized payloads
"""
def __init__(self) -> None:
self.frames_ok = 0
self.frames_error = 0
self._reset()
def reset(self) -> None:
"""Reset to initial state — call after serial reconnect."""
self._reset()
def _reset(self) -> None:
self._state = _ParseState.WAIT_SYNC
self._length = 0
self._type = 0
self._payload = bytearray()
def feed(self, byte: int) -> Optional[TelemetryFrame | tuple]:
"""Process one byte.
Returns a decoded dataclass on success, ``None`` while waiting for more
bytes, or a ``(type_code, raw_payload)`` tuple for unrecognised types.
"""
s = self._state
if s == _ParseState.WAIT_SYNC:
if byte == SYNC_BYTE:
self._state = _ParseState.WAIT_LEN
return None
if s == _ParseState.WAIT_LEN:
if byte > MAX_PAYLOAD_LEN:
self.frames_error += 1
self._reset()
return None
self._length = byte
self._state = _ParseState.WAIT_TYPE
return None
if s == _ParseState.WAIT_TYPE:
self._type = byte
self._payload = bytearray()
if self._length == 0:
self._state = _ParseState.WAIT_CRC
else:
self._state = _ParseState.PAYLOAD
return None
if s == _ParseState.PAYLOAD:
self._payload.append(byte)
if len(self._payload) == self._length:
self._state = _ParseState.WAIT_CRC
return None
if s == _ParseState.WAIT_CRC:
# Verify CRC before resetting parser state
crc_data = bytes([self._length, self._type]) + self._payload
expected = _crc8(crc_data)
if byte != expected:
self.frames_error += 1
self._reset()
return None
self.frames_ok += 1
result = _decode(self._type, bytes(self._payload))
self._reset()
return result
self._reset()
return None
# ── Telemetry decoder ─────────────────────────────────────────────────────────
# Maps VESC_LEFT/RIGHT type codes to hardware CAN IDs (from memory)
_VESC_ID_MAP = {
TelType.VESC_LEFT: 56, # left VESC CAN ID
TelType.VESC_RIGHT: 68, # right VESC CAN ID
}
def _decode(type_code: int, payload: bytes) -> Optional[TelemetryFrame | tuple]:
"""Decode a validated telemetry payload into a typed dataclass."""
try:
if type_code == TelType.STATUS:
# int16 pitch_x10, int16 motor_cmd, uint16 vbat_mv, uint8 state, uint8 flags
if len(payload) < 8:
return None
pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack_from(">hhHBB", payload)
return StatusFrame(
pitch_deg=pitch_x10 / 10.0,
motor_cmd=motor_cmd,
vbat_mv=vbat_mv,
balance_state=state,
flags=flags,
)
if type_code in (TelType.VESC_LEFT, TelType.VESC_RIGHT):
# int32 erpm, uint16 voltage_mv, int16 current_ma, uint16 temp_c_x10
if len(payload) < 10:
return None
erpm, voltage_mv, current_ma, temp_x10 = struct.unpack_from(">iHhH", payload)
return VescFrame(
vesc_id=_VESC_ID_MAP[type_code],
erpm=erpm,
voltage_mv=voltage_mv,
current_ma=current_ma,
temp_c=temp_x10 / 10.0,
)
if type_code == TelType.ACK:
if len(payload) < 1:
return None
return AckFrame(echoed_type=payload[0])
if type_code == TelType.NACK:
if len(payload) < 2:
return None
return NackFrame(cmd_type=payload[0], error_code=payload[1])
except struct.error:
return None
# Unknown type — return raw for forward-compatibility
return (type_code, payload)

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_esp32_serial
[install]
install_scripts=$base/lib/saltybot_esp32_serial

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_esp32_serial"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/config", ["config/esp32_balance_params.yaml"]),
(f"share/{package_name}/launch", ["launch/esp32_balance.launch.py"]),
],
install_requires=["setuptools", "pyserial"],
zip_safe=True,
maintainer="sl-perception",
maintainer_email="sl-perception@saltylab.local",
description="UART/USB-CDC serial bridge for ESP32-S3 BALANCE (bd-wim1)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"esp32_balance_node = saltybot_esp32_serial.esp32_balance_node:main",
],
},
)

View File

@ -0,0 +1,311 @@
#!/usr/bin/env python3
"""Unit tests for saltybot_esp32_serial.esp32_balance_protocol.
No ROS2 or hardware required exercises encode/decode round-trips,
framing, CRC8, and parser state-machine in pure Python.
Run with: pytest test/ -v
"""
import struct
import unittest
from saltybot_esp32_serial.esp32_balance_protocol import (
SYNC_BYTE,
CmdType,
TelType,
BalanceState,
StatusFrame,
VescFrame,
AckFrame,
NackFrame,
FrameParser,
_crc8,
encode_heartbeat,
encode_drive,
encode_estop,
encode_arm,
encode_pid,
)
# ── CRC8 tests ────────────────────────────────────────────────────────────────
class TestCrc8(unittest.TestCase):
"""Verify CRC8-SMBUS implementation."""
def test_empty(self):
self.assertEqual(_crc8(b""), 0x00)
def test_single_byte_zero(self):
# CRC of 0x00: no XOR, no poly — stays 0
self.assertEqual(_crc8(b"\x00"), 0x00)
def test_single_byte_nonzero(self):
# Pre-computed expected value for 0xFF
result = _crc8(b"\xFF")
self.assertIsInstance(result, int)
self.assertGreaterEqual(result, 0)
self.assertLessEqual(result, 0xFF)
def test_deterministic(self):
data = b"\x01\x02\x04\x08"
self.assertEqual(_crc8(data), _crc8(data))
def test_different_data_different_crc(self):
self.assertNotEqual(_crc8(b"\x01"), _crc8(b"\x02"))
def test_byte_range(self):
for b in range(256):
r = _crc8(bytes([b]))
self.assertGreaterEqual(r, 0)
self.assertLessEqual(r, 255)
# ── Frame structure tests ─────────────────────────────────────────────────────
class TestFrameStructure(unittest.TestCase):
"""Verify encode functions produce correctly structured frames."""
def _check_frame(self, data: bytes, expected_type: int, expected_payload: bytes):
"""Assert frame matches [SYNC][LEN][TYPE][PAYLOAD][CRC8]."""
self.assertGreaterEqual(len(data), 4)
self.assertEqual(data[0], SYNC_BYTE, "SYNC byte mismatch")
length = data[1]
self.assertEqual(data[2], expected_type, "TYPE byte mismatch")
self.assertEqual(length, len(expected_payload), "LEN byte mismatch")
self.assertEqual(data[3:3 + length], expected_payload, "PAYLOAD mismatch")
# Verify CRC over [LEN][TYPE][PAYLOAD]
crc_data = bytes([length, expected_type]) + expected_payload
expected_crc = _crc8(crc_data)
self.assertEqual(data[-1], expected_crc, "CRC8 mismatch")
def test_heartbeat_frame(self):
frame = encode_heartbeat()
self._check_frame(frame, CmdType.HEARTBEAT, b"")
def test_drive_frame_zeros(self):
frame = encode_drive(0, 0)
payload = struct.pack(">hh", 0, 0)
self._check_frame(frame, CmdType.DRIVE, payload)
def test_drive_frame_values(self):
frame = encode_drive(500, -300)
payload = struct.pack(">hh", 500, -300)
self._check_frame(frame, CmdType.DRIVE, payload)
def test_drive_clamping(self):
# Values outside ±1000 must be clamped
frame = encode_drive(5000, -9999)
_, _, _, p0, p1, p2, p3, _ = frame
speed, steer = struct.unpack(">hh", bytes([p0, p1, p2, p3]))
self.assertEqual(speed, 1000)
self.assertEqual(steer, -1000)
def test_estop_assert(self):
frame = encode_estop(True)
self._check_frame(frame, CmdType.ESTOP, b"\x01")
def test_estop_clear(self):
frame = encode_estop(False)
self._check_frame(frame, CmdType.ESTOP, b"\x00")
def test_arm_frame(self):
for arm_val in (True, False):
frame = encode_arm(arm_val)
self._check_frame(frame, CmdType.ARM, bytes([1 if arm_val else 0]))
def test_pid_frame(self):
frame = encode_pid(1.0, 0.1, 0.01)
payload = struct.pack(">fff", 1.0, 0.1, 0.01)
self._check_frame(frame, CmdType.PID, payload)
def test_pid_negative_raises(self):
with self.assertRaises(ValueError):
encode_pid(-1.0, 0.1, 0.01)
# ── Parser round-trip tests ───────────────────────────────────────────────────
class TestFrameParser(unittest.TestCase):
"""Feed encoded frames back through the parser and verify decode."""
def _parse_all(self, data: bytes):
"""Feed all bytes; return list of decoded frames."""
parser = FrameParser()
results = []
for byte in data:
r = parser.feed(byte)
if r is not None:
results.append(r)
return parser, results
# ── Command echo round-trips (parser recognises unknown cmd types as raw tuples) ──
def test_heartbeat_roundtrip(self):
frame = encode_heartbeat()
parser, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
# HEARTBEAT (0x01) is a command type — no dataclass decoder, returns raw tuple
self.assertIsInstance(results[0], tuple)
type_code, payload = results[0]
self.assertEqual(type_code, CmdType.HEARTBEAT)
self.assertEqual(payload, b"")
def test_drive_roundtrip(self):
frame = encode_drive(250, -100)
parser, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
type_code, payload = results[0]
self.assertEqual(type_code, CmdType.DRIVE)
speed, steer = struct.unpack(">hh", payload)
self.assertEqual(speed, 250)
self.assertEqual(steer, -100)
# ── Telemetry decode round-trips ────────────────────────────────────────
def _build_telem_frame(self, type_code: int, payload: bytes) -> bytes:
length = len(payload)
crc = _crc8(bytes([length, type_code]) + payload)
return bytes([SYNC_BYTE, length, type_code]) + payload + bytes([crc])
def test_status_frame_decode(self):
raw = struct.pack(">hhHBB", 152, 300, 11400, BalanceState.ARMED, 0x00)
frame = self._build_telem_frame(TelType.STATUS, raw)
_, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
f = results[0]
self.assertIsInstance(f, StatusFrame)
self.assertAlmostEqual(f.pitch_deg, 15.2, places=5)
self.assertEqual(f.motor_cmd, 300)
self.assertEqual(f.vbat_mv, 11400)
self.assertEqual(f.balance_state, BalanceState.ARMED)
self.assertEqual(f.flags, 0x00)
def test_status_estop_flag(self):
raw = struct.pack(">hhHBB", 0, 0, 10000, BalanceState.ESTOP, 0x01)
frame = self._build_telem_frame(TelType.STATUS, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, StatusFrame)
self.assertTrue(bool(f.flags & 0x01)) # estop_active flag set
def test_vesc_left_frame_decode(self):
# erpm=12000, voltage_mv=22400, current_ma=4500, temp_x10=352
raw = struct.pack(">iHhH", 12000, 22400, 4500, 352)
frame = self._build_telem_frame(TelType.VESC_LEFT, raw)
_, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
f = results[0]
self.assertIsInstance(f, VescFrame)
self.assertEqual(f.vesc_id, 56)
self.assertEqual(f.erpm, 12000)
self.assertEqual(f.voltage_mv, 22400)
self.assertEqual(f.current_ma, 4500)
self.assertAlmostEqual(f.temp_c, 35.2, places=5)
def test_vesc_right_frame_decode(self):
raw = struct.pack(">iHhH", -6000, 22000, -200, 280)
frame = self._build_telem_frame(TelType.VESC_RIGHT, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, VescFrame)
self.assertEqual(f.vesc_id, 68)
self.assertEqual(f.erpm, -6000)
self.assertAlmostEqual(f.temp_c, 28.0, places=5)
def test_ack_frame_decode(self):
raw = bytes([CmdType.DRIVE])
frame = self._build_telem_frame(TelType.ACK, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, AckFrame)
self.assertEqual(f.echoed_type, CmdType.DRIVE)
def test_nack_frame_decode(self):
raw = bytes([CmdType.DRIVE, 0x03]) # ERR_ESTOP_ACTIVE
frame = self._build_telem_frame(TelType.NACK, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, NackFrame)
self.assertEqual(f.cmd_type, CmdType.DRIVE)
self.assertEqual(f.error_code, 0x03)
# ── Parser robustness ───────────────────────────────────────────────────
def test_bad_crc_discarded(self):
frame = bytearray(encode_heartbeat())
frame[-1] ^= 0xFF # corrupt CRC
parser, results = self._parse_all(bytes(frame))
self.assertEqual(results, [])
self.assertEqual(parser.frames_error, 1)
self.assertEqual(parser.frames_ok, 0)
def test_garbage_before_sync(self):
garbage = bytes([0x00, 0xFF, 0x01, 0x22])
frame = encode_heartbeat()
parser, results = self._parse_all(garbage + frame)
self.assertEqual(len(results), 1)
def test_two_frames_sequential(self):
frames = encode_heartbeat() + encode_drive(100, 50)
parser, results = self._parse_all(frames)
self.assertEqual(len(results), 2)
self.assertEqual(parser.frames_ok, 2)
def test_oversized_payload_rejected(self):
# Craft a frame claiming LEN=63 (> MAX_PAYLOAD_LEN=62)
bad = bytes([SYNC_BYTE, 63, CmdType.DRIVE])
parser, results = self._parse_all(bad)
self.assertEqual(results, [])
self.assertEqual(parser.frames_error, 1)
def test_parser_counter_increments(self):
parser = FrameParser()
frame = encode_drive(10, 10)
for b in frame:
parser.feed(b)
self.assertEqual(parser.frames_ok, 1)
self.assertEqual(parser.frames_error, 0)
def test_reset_clears_state(self):
parser = FrameParser()
# Feed partial frame then reset
partial = encode_drive(0, 0)[:-2]
for b in partial:
parser.feed(b)
parser.reset()
# Now a clean frame should decode correctly
for b in encode_heartbeat():
r = parser.feed(b)
self.assertEqual(parser.frames_ok, 1)
# ── Encode parameter edge cases ───────────────────────────────────────────────
class TestEncodeEdgeCases(unittest.TestCase):
def test_drive_clamp_positive(self):
frame = encode_drive(9999, 9999)
payload = frame[3:7]
speed, steer = struct.unpack(">hh", payload)
self.assertEqual(speed, 1000)
self.assertEqual(steer, 1000)
def test_drive_clamp_negative(self):
frame = encode_drive(-9999, -9999)
payload = frame[3:7]
speed, steer = struct.unpack(">hh", payload)
self.assertEqual(speed, -1000)
self.assertEqual(steer, -1000)
def test_pid_zero_gains_valid(self):
frame = encode_pid(0.0, 0.0, 0.0)
self.assertIsNotNone(frame)
def test_pid_large_gains_valid(self):
frame = encode_pid(100.0, 50.0, 25.0)
self.assertIsNotNone(frame)
if __name__ == "__main__":
unittest.main()