From 9b460e34dbcc86e921a5a5ba271d732434fdea0e Mon Sep 17 00:00:00 2001 From: sl-perception Date: Fri, 17 Apr 2026 19:38:38 -0400 Subject: [PATCH] feat: Orin UART/USB serial interface for ESP32 Balance (bd-wim1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New package saltybot_esp32_serial replaces saltybot_can_bridge (CANable2/python-can) with direct USB-CDC serial to ESP32-S3 BALANCE (CH343 chip, 1a86:55d3, /dev/esp32-balance @ 460800 baud). Framing: [0xAA][LEN][TYPE][PAYLOAD][CRC8-SMBUS] matching bd-66hx spec. esp32_balance_protocol.py — codec + streaming FrameParser (state-machine) - Commands: HEARTBEAT(0x01), DRIVE(0x02), ESTOP(0x03), ARM(0x04), PID(0x05) - Telemetry: STATUS(0x80), VESC_LEFT(0x81), VESC_RIGHT(0x82), ACK/NACK esp32_balance_node.py — ROS2 node - Subs: /cmd_vel, /estop, /saltybot/arm, /saltybot/pid_update - Pubs: /saltybot/attitude, /saltybot/balance_state, /can/battery, /can/vesc/{left,right}/state, /can/connection_status - 500ms /cmd_vel watchdog → CMD_DRIVE(0,0) - 200ms CMD_HEARTBEAT keepalive timer - Auto-reconnect on serial disconnect Proxied VESC telemetry: erpm, voltage, current, temp for IDs 56(L)/68(R). Frees CANable2 for bd-p47c (Here4 GPS). 33 unit tests — all pass. Co-Authored-By: Claude Sonnet 4.6 --- .../config/esp32_balance_params.yaml | 20 + .../launch/esp32_balance.launch.py | 73 ++++ .../src/saltybot_esp32_serial/package.xml | 40 ++ .../resource/saltybot_esp32_serial | 0 .../saltybot_esp32_serial/__init__.py | 1 + .../esp32_balance_node.py | 369 ++++++++++++++++++ .../esp32_balance_protocol.py | 355 +++++++++++++++++ .../src/saltybot_esp32_serial/setup.cfg | 5 + .../src/saltybot_esp32_serial/setup.py | 27 ++ .../saltybot_esp32_serial/test/__init__.py | 0 .../test/test_esp32_balance_protocol.py | 311 +++++++++++++++ 11 files changed, 1201 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/config/esp32_balance_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/launch/esp32_balance.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/resource/saltybot_esp32_serial create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_node.py create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_protocol.py create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_esp32_serial/test/test_esp32_balance_protocol.py diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/config/esp32_balance_params.yaml b/jetson/ros2_ws/src/saltybot_esp32_serial/config/esp32_balance_params.yaml new file mode 100644 index 0000000..8aad9a8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/config/esp32_balance_params.yaml @@ -0,0 +1,20 @@ +esp32_balance_node: + ros__parameters: + # USB-CDC serial port: CH343 chip, VID:PID 1a86:55d3 + # Udev symlink set by jetson/docs/pinout.md udev rules + serial_port: /dev/esp32-balance + baud_rate: 460800 + + # /cmd_vel linear.x (m/s) → speed units sent to ESP32-S3 BALANCE + speed_scale: 1000.0 + # /cmd_vel angular.z (rad/s) → steer units (negative = flip turn convention) + steer_scale: -500.0 + + # Safety: send CMD_DRIVE(0,0) if /cmd_vel silent for this many seconds + command_timeout_s: 0.5 + + # CMD_HEARTBEAT TX interval in seconds (ESP32-S3 watchdog fires at 500 ms) + heartbeat_period: 0.2 + + # Reconnect attempt interval when serial is lost + reconnect_delay: 2.0 diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/launch/esp32_balance.launch.py b/jetson/ros2_ws/src/saltybot_esp32_serial/launch/esp32_balance.launch.py new file mode 100644 index 0000000..7d1c719 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/launch/esp32_balance.launch.py @@ -0,0 +1,73 @@ +"""esp32_balance.launch.py — Launch the Orin↔ESP32-S3 BALANCE USB-serial bridge. + +bd-wim1: replaces can_bridge_node (CANable2/python-can) with USB-CDC serial. +The freed CANable2 slot is used by Here4 GPS (bd-p47c). + +USB device +---------- + CH343 USB-CDC chip on ESP32-S3 BALANCE board + VID:PID 1a86:55d3 + Symlink /dev/esp32-balance (via udev rule in jetson/docs/pinout.md) + Baud 460800 + +Usage +----- + # Default: + ros2 launch saltybot_esp32_serial esp32_balance.launch.py + + # Override port (e.g. direct ACM path): + ros2 launch saltybot_esp32_serial esp32_balance.launch.py serial_port:=/dev/ttyACM0 + + # Custom velocity scales: + ros2 launch saltybot_esp32_serial esp32_balance.launch.py speed_scale:=800.0 steer_scale:=-400.0 +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + pkg_share = get_package_share_directory("saltybot_esp32_serial") + params_file = os.path.join(pkg_share, "config", "esp32_balance_params.yaml") + + serial_port_arg = DeclareLaunchArgument( + "serial_port", + default_value="/dev/esp32-balance", + description="USB-CDC serial port for ESP32-S3 BALANCE (CH343, 1a86:55d3)", + ) + speed_scale_arg = DeclareLaunchArgument( + "speed_scale", + default_value="1000.0", + description="linear.x (m/s) → motor speed units", + ) + steer_scale_arg = DeclareLaunchArgument( + "steer_scale", + default_value="-500.0", + description="angular.z (rad/s) → steer units", + ) + + node = Node( + package="saltybot_esp32_serial", + executable="esp32_balance_node", + name="esp32_balance_node", + output="screen", + parameters=[ + params_file, + { + "serial_port": LaunchConfiguration("serial_port"), + "speed_scale": LaunchConfiguration("speed_scale"), + "steer_scale": LaunchConfiguration("steer_scale"), + }, + ], + ) + + return LaunchDescription([ + serial_port_arg, + speed_scale_arg, + steer_scale_arg, + node, + ]) diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/package.xml b/jetson/ros2_ws/src/saltybot_esp32_serial/package.xml new file mode 100644 index 0000000..069b04c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/package.xml @@ -0,0 +1,40 @@ + + + + saltybot_esp32_serial + 0.1.0 + + UART/USB-CDC serial bridge for SaltyBot Orin — interfaces with the + ESP32-S3 BALANCE board (CH343, 1a86:55d3) using a binary framing protocol + [0xAA][LEN][TYPE][PAYLOAD][CRC8]. Replaces saltybot_can_bridge (CANable2) + to free the CAN bus for Here4 GPS (bd-p47c). + + Publishes the same ROS2 topics as saltybot_can_bridge for backward compat: + /saltybot/attitude, /can/battery, /can/vesc/{left,right}/state, + /can/connection_status + + System dependency: pyserial (apt: python3-serial or pip: pyserial) + Bead: bd-wim1 Counterpart firmware: bd-66hx (hal) + + sl-perception + MIT + + rclpy + std_msgs + geometry_msgs + sensor_msgs + + + python3-serial + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/resource/saltybot_esp32_serial b/jetson/ros2_ws/src/saltybot_esp32_serial/resource/saltybot_esp32_serial new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/__init__.py b/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/__init__.py new file mode 100644 index 0000000..b4cc690 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/__init__.py @@ -0,0 +1 @@ +# saltybot_esp32_serial — UART/USB-CDC bridge for ESP32-S3 BALANCE (bd-wim1) diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_node.py b/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_node.py new file mode 100644 index 0000000..167301e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_node.py @@ -0,0 +1,369 @@ +"""esp32_balance_node.py — ROS2 bridge: Orin ↔ ESP32-S3 BALANCE via USB-CDC serial. + +bd-wim1: replaces saltybot_can_bridge (CANable2/python-can) with USB serial. +The CANable2 is freed for Here4 GPS (bd-p47c). + +Physical connection +------------------- + Device : /dev/esp32-balance (CH343 USB-CDC, VID:PID 1a86:55d3) + Baud : 460800, 8N1 + Protocol: [0xAA][LEN][TYPE][PAYLOAD][CRC8] (see esp32_balance_protocol.py) + +Subscriptions +------------- + /cmd_vel geometry_msgs/Twist → CMD_DRIVE (at up to 50 Hz) + /estop std_msgs/Bool → CMD_ESTOP + /saltybot/arm std_msgs/Bool → CMD_ARM + /saltybot/pid_update std_msgs/String JSON → CMD_PID {"kp":…,"ki":…,"kd":…} + +Publications +------------ + /saltybot/attitude std_msgs/String JSON pitch, motor_cmd, state, etc. + /saltybot/balance_state std_msgs/String same as /saltybot/attitude (alias) + /can/battery sensor_msgs/BatteryState from TELEM_STATUS.vbat_mv + /can/vesc/left/state std_msgs/Float32MultiArray [erpm, duty≡0, voltage_mv/1000, current_ma/1000] + /can/vesc/right/state std_msgs/Float32MultiArray same layout + /can/connection_status std_msgs/String "connected" | "disconnected" + + NOTE: /can/* topic names are preserved for backward compatibility with + nodes that subscribed to saltybot_can_bridge outputs. + +Parameters +---------- + serial_port str USB-CDC port default: /dev/esp32-balance + baud_rate int Baud rate default: 460800 + speed_scale float linear.x (m/s) → units default: 1000.0 + steer_scale float angular.z (rad/s) → units default: -500.0 + command_timeout_s float /cmd_vel watchdog default: 0.5 + heartbeat_period float CMD_HEARTBEAT interval default: 0.2 + reconnect_delay float Retry interval, s default: 2.0 +""" + +from __future__ import annotations + +import json +import threading +import time +from typing import Optional + +import rclpy +import serial +from geometry_msgs.msg import Twist +from rclpy.node import Node +from sensor_msgs.msg import BatteryState +from std_msgs.msg import Bool, Float32MultiArray, String + +from .esp32_balance_protocol import ( + BAUD_RATE, + SERIAL_PORT_DEFAULT, + AckFrame, + BalanceState, + BALANCE_STATE_LABEL, + FrameParser, + NackFrame, + StatusFrame, + VescFrame, + encode_arm, + encode_drive, + encode_estop, + encode_heartbeat, + encode_pid, +) + +_WATCHDOG_HZ: float = 20.0 # TX watchdog check rate + + +class Esp32BalanceNode(Node): + """Serial bridge replacing CANable2 CAN comms with USB-CDC to ESP32-S3 BALANCE.""" + + def __init__(self) -> None: + super().__init__("esp32_balance_node") + + # ── Parameters ──────────────────────────────────────────────────── + self.declare_parameter("serial_port", SERIAL_PORT_DEFAULT) + self.declare_parameter("baud_rate", BAUD_RATE) + self.declare_parameter("speed_scale", 1000.0) + self.declare_parameter("steer_scale", -500.0) + self.declare_parameter("command_timeout_s", 0.5) + self.declare_parameter("heartbeat_period", 0.2) + self.declare_parameter("reconnect_delay", 2.0) + + self._port_name = self.get_parameter("serial_port").value + self._baud = self.get_parameter("baud_rate").value + self._speed_scale = self.get_parameter("speed_scale").value + self._steer_scale = self.get_parameter("steer_scale").value + self._cmd_timeout = self.get_parameter("command_timeout_s").value + self._hb_period = self.get_parameter("heartbeat_period").value + self._reconnect_delay = self.get_parameter("reconnect_delay").value + + # ── State ───────────────────────────────────────────────────────── + self._ser: Optional[serial.Serial] = None + self._ser_lock = threading.Lock() + self._parser = FrameParser() + self._last_cmd_time = time.monotonic() + self._watchdog_sent = False + + # ── Publishers ──────────────────────────────────────────────────── + self._pub_attitude = self.create_publisher(String, "/saltybot/attitude", 10) + self._pub_balance = self.create_publisher(String, "/saltybot/balance_state", 10) + self._pub_battery = self.create_publisher(BatteryState, "/can/battery", 10) + self._pub_vesc_left = self.create_publisher(Float32MultiArray, "/can/vesc/left/state", 10) + self._pub_vesc_right = self.create_publisher(Float32MultiArray, "/can/vesc/right/state", 10) + self._pub_status = self.create_publisher(String, "/can/connection_status", 10) + + # ── Subscriptions ───────────────────────────────────────────────── + self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10) + self.create_subscription(Bool, "/estop", self._estop_cb, 10) + self.create_subscription(Bool, "/saltybot/arm", self._arm_cb, 10) + self.create_subscription(String, "/saltybot/pid_update", self._pid_cb, 10) + + # ── Timers ──────────────────────────────────────────────────────── + self.create_timer(self._hb_period, self._heartbeat_cb) + self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb) + self.create_timer(self._reconnect_delay, self._reconnect_cb) + + # ── Open port + start reader thread ─────────────────────────────── + self._try_connect() + self._reader_thread = threading.Thread( + target=self._reader_loop, daemon=True, name="balance_serial_reader" + ) + self._reader_thread.start() + + self.get_logger().info( + f"esp32_balance_node — {self._port_name} @ {self._baud} baud | " + f"speed_scale={self._speed_scale} steer_scale={self._steer_scale}" + ) + + # ── Serial management ───────────────────────────────────────────────── + + def _try_connect(self) -> None: + with self._ser_lock: + if self._ser is not None and self._ser.is_open: + return + try: + self._ser = serial.Serial( + port=self._port_name, + baudrate=self._baud, + timeout=0.05, + write_timeout=0.1, + ) + self._ser.reset_input_buffer() + self._parser.reset() + self.get_logger().info(f"Serial open: {self._port_name}") + self._publish_status("connected") + except serial.SerialException as exc: + self.get_logger().warning( + f"Cannot open {self._port_name}: {exc} — retry in {self._reconnect_delay:.0f} s", + throttle_duration_sec=self._reconnect_delay, + ) + self._ser = None + self._publish_status("disconnected") + + def _reconnect_cb(self) -> None: + with self._ser_lock: + open_ok = self._ser is not None and self._ser.is_open + if not open_ok: + self._try_connect() + + def _handle_serial_error(self, exc: Exception, context: str) -> None: + self.get_logger().warning(f"Serial error in {context}: {exc}") + with self._ser_lock: + if self._ser is not None: + try: + self._ser.close() + except Exception: + pass + self._ser = None + self._publish_status("disconnected") + + # ── Serial write helper ─────────────────────────────────────────────── + + def _write(self, data: bytes, context: str = "") -> bool: + with self._ser_lock: + if self._ser is None or not self._ser.is_open: + return False + ser = self._ser + try: + ser.write(data) + return True + except serial.SerialException as exc: + self._handle_serial_error(exc, f"write({context})") + return False + + # ── ROS subscriptions ───────────────────────────────────────────────── + + def _cmd_vel_cb(self, msg: Twist) -> None: + """Convert /cmd_vel to CMD_DRIVE at up to 50 Hz.""" + self._last_cmd_time = time.monotonic() + self._watchdog_sent = False + + speed = int(msg.linear.x * self._speed_scale) + steer = int(msg.angular.z * self._steer_scale) + self._write(encode_drive(speed, steer), "cmd_vel") + + def _estop_cb(self, msg: Bool) -> None: + frame = encode_estop(msg.data) + self._write(frame, "estop") + if msg.data: + self.get_logger().warning("E-stop asserted → CMD_ESTOP sent to ESP32-S3 BALANCE") + + def _arm_cb(self, msg: Bool) -> None: + self._write(encode_arm(msg.data), "arm") + + def _pid_cb(self, msg: String) -> None: + """Parse JSON {"kp":…,"ki":…,"kd":…} and send CMD_PID.""" + try: + d = json.loads(msg.data) + kp = float(d["kp"]) + ki = float(d["ki"]) + kd = float(d["kd"]) + except (KeyError, ValueError, json.JSONDecodeError) as exc: + self.get_logger().error(f"Bad /saltybot/pid_update JSON: {exc}") + return + self._write(encode_pid(kp, ki, kd), "pid_update") + + # ── Timers ──────────────────────────────────────────────────────────── + + def _heartbeat_cb(self) -> None: + self._write(encode_heartbeat(), "heartbeat") + + def _watchdog_cb(self) -> None: + """Send CMD_DRIVE(0,0) if /cmd_vel has been silent for command_timeout_s.""" + if self._watchdog_sent: + return + if time.monotonic() - self._last_cmd_time > self._cmd_timeout: + self._write(encode_drive(0, 0), "watchdog") + self._watchdog_sent = True + + # ── Background reader ───────────────────────────────────────────────── + + def _reader_loop(self) -> None: + while rclpy.ok(): + with self._ser_lock: + ser = self._ser if (self._ser and self._ser.is_open) else None + if ser is None: + time.sleep(0.05) + continue + try: + n = ser.in_waiting + if n: + raw = ser.read(n) + else: + time.sleep(0.002) + continue + except serial.SerialException as exc: + self._handle_serial_error(exc, "reader_loop") + continue + + for byte in raw: + frame = self._parser.feed(byte) + if frame is not None: + self._dispatch(frame) + + def _dispatch(self, frame) -> None: + try: + if isinstance(frame, StatusFrame): + self._handle_status(frame) + elif isinstance(frame, VescFrame): + self._handle_vesc(frame) + elif isinstance(frame, AckFrame): + self.get_logger().debug( + f"ACK for cmd type 0x{frame.echoed_type:02X}" + ) + elif isinstance(frame, NackFrame): + self.get_logger().warning( + f"NACK cmd=0x{frame.cmd_type:02X} err=0x{frame.error_code:02X}" + ) + elif isinstance(frame, tuple): + type_code, _ = frame + self.get_logger().debug(f"Unknown telemetry type 0x{type_code:02X}") + except Exception as exc: + self.get_logger().warning(f"Error dispatching frame {type(frame).__name__}: {exc}") + + # ── Frame handlers ──────────────────────────────────────────────────── + + def _handle_status(self, f: StatusFrame) -> None: + """Publish balance controller status — replaces CAN 0x400 handler.""" + now = self.get_clock().now().to_msg() + payload = { + "pitch_deg": round(f.pitch_deg, 2), + "motor_cmd": f.motor_cmd, + "vbat_mv": f.vbat_mv, + "balance_state": f.balance_state, + "state_label": BALANCE_STATE_LABEL.get(f.balance_state, + f"UNKNOWN({f.balance_state})"), + "flags": f.flags, + "estop_active": bool(f.flags & 0x01), + "hb_timeout": bool(f.flags & 0x02), + "ts": f"{now.sec}.{now.nanosec:09d}", + } + msg = String() + msg.data = json.dumps(payload) + self._pub_attitude.publish(msg) + self._pub_balance.publish(msg) + + # Battery piggyback on STATUS frame for nodes that use /can/battery + bat = BatteryState() + bat.header.stamp = now + bat.voltage = f.vbat_mv / 1000.0 + bat.present = True + bat.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING + self._pub_battery.publish(bat) + + def _handle_vesc(self, f: VescFrame) -> None: + """Publish proxied VESC telemetry — replaces CAN VESC_STATUS_1 handler. + + Float32MultiArray layout: [erpm, voltage_v, current_a, temp_c] + (duty not available via serial proxy; 0.0 placeholder) + """ + arr = Float32MultiArray() + arr.data = [ + float(f.erpm), + float(f.voltage_mv) / 1000.0, + float(f.current_ma) / 1000.0, + f.temp_c, + ] + if f.vesc_id == 56: + self._pub_vesc_left.publish(arr) + elif f.vesc_id == 68: + self._pub_vesc_right.publish(arr) + else: + self.get_logger().warning(f"Unknown VESC ID {f.vesc_id} in telemetry frame") + + # ── Status helper ───────────────────────────────────────────────────── + + def _publish_status(self, status: str) -> None: + msg = String() + msg.data = status + self._pub_status.publish(msg) + + # ── Shutdown ────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + # Send zero drive + estop before closing + self._write(encode_drive(0, 0), "shutdown") + self._write(encode_estop(True), "shutdown") + with self._ser_lock: + if self._ser is not None: + try: + self._ser.close() + except Exception: + pass + self._ser = None + super().destroy_node() + + +def main(args=None) -> None: + rclpy.init(args=args) + node = Esp32BalanceNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_protocol.py b/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_protocol.py new file mode 100644 index 0000000..dc54022 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/saltybot_esp32_serial/esp32_balance_protocol.py @@ -0,0 +1,355 @@ +"""esp32_balance_protocol.py — Binary frame codec for Orin↔ESP32-S3 BALANCE UART/USB comms. + +bd-wim1: replaces CANable2/python-can ESP32 interface with USB-CDC serial. +Counterpart firmware bead: bd-66hx (hal's ESP32-S3 BALANCE side). + +Frame layout: + ┌───────┬─────┬──────┬──────────────────┬───────┐ + │ SYNC │ LEN │ TYPE │ PAYLOAD │ CRC8 │ + │ 0xAA │ 1B │ 1B │ LEN bytes │ 1B │ + └───────┴─────┴──────┴──────────────────┴───────┘ + + LEN = number of payload bytes (0–62) + CRC8 = CRC8-SMBUS (poly=0x07, init=0x00), computed over LEN+TYPE+PAYLOAD + +Physical layer + Device : /dev/esp32-balance (CH343 USB-CDC, VID:PID 1a86:55d3) + Baud : 460800, 8N1 + +Command types (Orin → ESP32-S3 BALANCE): + 0x01 CMD_HEARTBEAT — no payload len=0 + 0x02 CMD_DRIVE — int16 speed_units, int16 steer_units len=4 range −1000..+1000 + 0x03 CMD_ESTOP — uint8 (1=assert, 0=clear) len=1 + 0x04 CMD_ARM — uint8 (1=arm, 0=disarm) len=1 + 0x05 CMD_PID — float32 kp, float32 ki, float32 kd len=12 + +Telemetry types (ESP32-S3 BALANCE → Orin): + 0x80 TELEM_STATUS — int16 pitch_x10, int16 motor_cmd, + uint16 vbat_mv, uint8 balance_state, uint8 flags len=8 + 0x81 TELEM_VESC_LEFT — int32 erpm, uint16 voltage_mv, + int16 current_ma, uint16 temp_c_x10 len=10 + 0x82 TELEM_VESC_RIGHT — same layout as TELEM_VESC_LEFT len=10 + 0xA0 RESP_ACK — uint8 echoed cmd type len=1 + 0xA1 RESP_NACK — uint8 cmd type, uint8 error_code len=2 + +Balance state values (TELEM_STATUS.balance_state): + 0 DISARMED + 1 ARMED + 2 TILT_FAULT + 3 ESTOP + +NACK error codes: + 0x01 ERR_BAD_CRC + 0x02 ERR_BAD_LEN + 0x03 ERR_ESTOP_ACTIVE + 0x04 ERR_DISARMED + +NOTE: Message type IDs coordinated with hal (bd-66hx) on 2026-04-17. + Confirm against bd-66hx PR before deploying on hardware. +""" + +from __future__ import annotations + +import struct +from dataclasses import dataclass +from enum import IntEnum +from typing import Optional + +# ── Physical layer constants ────────────────────────────────────────────────── + +SERIAL_PORT_DEFAULT = "/dev/esp32-balance" +BAUD_RATE = 460800 +MAX_PAYLOAD_LEN = 62 # hard cap; larger frames are considered corrupt + +# ── Framing ─────────────────────────────────────────────────────────────────── + +SYNC_BYTE = 0xAA + + +# ── Command type codes (Orin → ESP32-S3 BALANCE) ───────────────────────────── + +class CmdType(IntEnum): + HEARTBEAT = 0x01 + DRIVE = 0x02 + ESTOP = 0x03 + ARM = 0x04 + PID = 0x05 + + +# ── Telemetry type codes (ESP32-S3 BALANCE → Orin) ─────────────────────────── + +class TelType(IntEnum): + STATUS = 0x80 + VESC_LEFT = 0x81 + VESC_RIGHT = 0x82 + ACK = 0xA0 + NACK = 0xA1 + + +# ── Balance state enum ──────────────────────────────────────────────────────── + +class BalanceState(IntEnum): + DISARMED = 0 + ARMED = 1 + TILT_FAULT = 2 + ESTOP = 3 + + +BALANCE_STATE_LABEL = { + BalanceState.DISARMED: "DISARMED", + BalanceState.ARMED: "ARMED", + BalanceState.TILT_FAULT: "TILT_FAULT", + BalanceState.ESTOP: "ESTOP", +} + + +# ── Parsed telemetry dataclasses ───────────────────────────────────────────── + +@dataclass +class StatusFrame: + """TELEM_STATUS (0x80): balance controller status at ~10 Hz.""" + pitch_deg: float # degrees (positive = forward tilt); raw pitch_x10 / 10.0 + motor_cmd: int # ESC command, −1000..+1000 + vbat_mv: int # battery voltage, millivolts + balance_state: int # BalanceState enum + flags: int # bitmask — bit 0: estop_active, bit 1: heartbeat_timeout + + +@dataclass +class VescFrame: + """TELEM_VESC_LEFT (0x81) or TELEM_VESC_RIGHT (0x82): VESC proxied telemetry at ~10 Hz.""" + vesc_id: int # 56 = left, 68 = right (maps to CAN IDs from hardware spec) + erpm: int # electrical RPM (signed) + voltage_mv: int # bus voltage, millivolts + current_ma: int # phase current, milliamps (signed) + temp_c: float # motor temperature, °C; raw temp_c_x10 / 10.0 + + +@dataclass +class AckFrame: + """RESP_ACK (0xA0): acknowledgement for a successfully parsed command.""" + echoed_type: int + + +@dataclass +class NackFrame: + """RESP_NACK (0xA1): rejection; contains original cmd type and error code.""" + cmd_type: int + error_code: int + + +TelemetryFrame = StatusFrame | VescFrame | AckFrame | NackFrame + + +# ── CRC8-SMBUS ──────────────────────────────────────────────────────────────── + +def _crc8(data: bytes) -> int: + """CRC8-SMBUS: polynomial 0x07, init 0x00, no final XOR. + + Coverage: LEN + TYPE + PAYLOAD bytes. + """ + crc = 0 + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x80: + crc = (crc << 1) ^ 0x07 + else: + crc <<= 1 + crc &= 0xFF + return crc + + +# ── Frame encoder ───────────────────────────────────────────────────────────── + +def _build_frame(cmd_type: int, payload: bytes) -> bytes: + """Build a complete binary frame: [0xAA][LEN][TYPE][PAYLOAD][CRC8].""" + if len(payload) > MAX_PAYLOAD_LEN: + raise ValueError(f"Payload length {len(payload)} exceeds max {MAX_PAYLOAD_LEN}") + length = len(payload) + crc_data = bytes([length, cmd_type]) + payload + crc = _crc8(crc_data) + return bytes([SYNC_BYTE, length, cmd_type]) + payload + bytes([crc]) + + +def encode_heartbeat() -> bytes: + """CMD_HEARTBEAT — no payload. Send every ≤500 ms to keep watchdog alive.""" + return _build_frame(CmdType.HEARTBEAT, b"") + + +def encode_drive(speed: int, steer: int) -> bytes: + """CMD_DRIVE — int16 speed_units + int16 steer_units (big-endian, −1000..+1000). + + Parameters + ---------- + speed : target longitudinal speed in motor units (positive = forward) + steer : steering demand in motor units (positive = right turn) + """ + speed = max(-1000, min(1000, int(speed))) + steer = max(-1000, min(1000, int(steer))) + return _build_frame(CmdType.DRIVE, struct.pack(">hh", speed, steer)) + + +def encode_estop(assert_stop: bool) -> bytes: + """CMD_ESTOP — uint8, 1=assert, 0=clear.""" + return _build_frame(CmdType.ESTOP, struct.pack("B", 1 if assert_stop else 0)) + + +def encode_arm(arm: bool) -> bytes: + """CMD_ARM — uint8, 1=arm, 0=disarm.""" + return _build_frame(CmdType.ARM, struct.pack("B", 1 if arm else 0)) + + +def encode_pid(kp: float, ki: float, kd: float) -> bytes: + """CMD_PID — three float32 values, big-endian.""" + if kp < 0.0 or ki < 0.0 or kd < 0.0: + raise ValueError("PID gains must be non-negative") + return _build_frame(CmdType.PID, struct.pack(">fff", kp, ki, kd)) + + +# ── Frame decoder (streaming state-machine) ─────────────────────────────────── + +class _ParseState(IntEnum): + WAIT_SYNC = 0 + WAIT_LEN = 1 + WAIT_TYPE = 2 + PAYLOAD = 3 + WAIT_CRC = 4 + + +class FrameParser: + """Byte-by-byte streaming parser for ESP32-S3 BALANCE telemetry frames. + + Feed individual bytes via ``feed()``; returns a decoded dataclass when a + complete, CRC-valid frame arrives, otherwise returns ``None``. + + Not thread-safe — guard with a lock if accessed from multiple threads. + + Counters + -------- + frames_ok : successfully decoded frames + frames_error : CRC failures or oversized payloads + """ + + def __init__(self) -> None: + self.frames_ok = 0 + self.frames_error = 0 + self._reset() + + def reset(self) -> None: + """Reset to initial state — call after serial reconnect.""" + self._reset() + + def _reset(self) -> None: + self._state = _ParseState.WAIT_SYNC + self._length = 0 + self._type = 0 + self._payload = bytearray() + + def feed(self, byte: int) -> Optional[TelemetryFrame | tuple]: + """Process one byte. + + Returns a decoded dataclass on success, ``None`` while waiting for more + bytes, or a ``(type_code, raw_payload)`` tuple for unrecognised types. + """ + s = self._state + + if s == _ParseState.WAIT_SYNC: + if byte == SYNC_BYTE: + self._state = _ParseState.WAIT_LEN + return None + + if s == _ParseState.WAIT_LEN: + if byte > MAX_PAYLOAD_LEN: + self.frames_error += 1 + self._reset() + return None + self._length = byte + self._state = _ParseState.WAIT_TYPE + return None + + if s == _ParseState.WAIT_TYPE: + self._type = byte + self._payload = bytearray() + if self._length == 0: + self._state = _ParseState.WAIT_CRC + else: + self._state = _ParseState.PAYLOAD + return None + + if s == _ParseState.PAYLOAD: + self._payload.append(byte) + if len(self._payload) == self._length: + self._state = _ParseState.WAIT_CRC + return None + + if s == _ParseState.WAIT_CRC: + # Verify CRC before resetting parser state + crc_data = bytes([self._length, self._type]) + self._payload + expected = _crc8(crc_data) + if byte != expected: + self.frames_error += 1 + self._reset() + return None + self.frames_ok += 1 + result = _decode(self._type, bytes(self._payload)) + self._reset() + return result + + self._reset() + return None + + +# ── Telemetry decoder ───────────────────────────────────────────────────────── + +# Maps VESC_LEFT/RIGHT type codes to hardware CAN IDs (from memory) +_VESC_ID_MAP = { + TelType.VESC_LEFT: 56, # left VESC CAN ID + TelType.VESC_RIGHT: 68, # right VESC CAN ID +} + + +def _decode(type_code: int, payload: bytes) -> Optional[TelemetryFrame | tuple]: + """Decode a validated telemetry payload into a typed dataclass.""" + try: + if type_code == TelType.STATUS: + # int16 pitch_x10, int16 motor_cmd, uint16 vbat_mv, uint8 state, uint8 flags + if len(payload) < 8: + return None + pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack_from(">hhHBB", payload) + return StatusFrame( + pitch_deg=pitch_x10 / 10.0, + motor_cmd=motor_cmd, + vbat_mv=vbat_mv, + balance_state=state, + flags=flags, + ) + + if type_code in (TelType.VESC_LEFT, TelType.VESC_RIGHT): + # int32 erpm, uint16 voltage_mv, int16 current_ma, uint16 temp_c_x10 + if len(payload) < 10: + return None + erpm, voltage_mv, current_ma, temp_x10 = struct.unpack_from(">iHhH", payload) + return VescFrame( + vesc_id=_VESC_ID_MAP[type_code], + erpm=erpm, + voltage_mv=voltage_mv, + current_ma=current_ma, + temp_c=temp_x10 / 10.0, + ) + + if type_code == TelType.ACK: + if len(payload) < 1: + return None + return AckFrame(echoed_type=payload[0]) + + if type_code == TelType.NACK: + if len(payload) < 2: + return None + return NackFrame(cmd_type=payload[0], error_code=payload[1]) + + except struct.error: + return None + + # Unknown type — return raw for forward-compatibility + return (type_code, payload) diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/setup.cfg b/jetson/ros2_ws/src/saltybot_esp32_serial/setup.cfg new file mode 100644 index 0000000..6dc128a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_esp32_serial + +[install] +install_scripts=$base/lib/saltybot_esp32_serial diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/setup.py b/jetson/ros2_ws/src/saltybot_esp32_serial/setup.py new file mode 100644 index 0000000..59b23d1 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup + +package_name = "saltybot_esp32_serial" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/config", ["config/esp32_balance_params.yaml"]), + (f"share/{package_name}/launch", ["launch/esp32_balance.launch.py"]), + ], + install_requires=["setuptools", "pyserial"], + zip_safe=True, + maintainer="sl-perception", + maintainer_email="sl-perception@saltylab.local", + description="UART/USB-CDC serial bridge for ESP32-S3 BALANCE (bd-wim1)", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "esp32_balance_node = saltybot_esp32_serial.esp32_balance_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/test/__init__.py b/jetson/ros2_ws/src/saltybot_esp32_serial/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_esp32_serial/test/test_esp32_balance_protocol.py b/jetson/ros2_ws/src/saltybot_esp32_serial/test/test_esp32_balance_protocol.py new file mode 100644 index 0000000..631778e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_esp32_serial/test/test_esp32_balance_protocol.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python3 +"""Unit tests for saltybot_esp32_serial.esp32_balance_protocol. + +No ROS2 or hardware required — exercises encode/decode round-trips, +framing, CRC8, and parser state-machine in pure Python. + +Run with: pytest test/ -v +""" + +import struct +import unittest + +from saltybot_esp32_serial.esp32_balance_protocol import ( + SYNC_BYTE, + CmdType, + TelType, + BalanceState, + StatusFrame, + VescFrame, + AckFrame, + NackFrame, + FrameParser, + _crc8, + encode_heartbeat, + encode_drive, + encode_estop, + encode_arm, + encode_pid, +) + + +# ── CRC8 tests ──────────────────────────────────────────────────────────────── + +class TestCrc8(unittest.TestCase): + """Verify CRC8-SMBUS implementation.""" + + def test_empty(self): + self.assertEqual(_crc8(b""), 0x00) + + def test_single_byte_zero(self): + # CRC of 0x00: no XOR, no poly — stays 0 + self.assertEqual(_crc8(b"\x00"), 0x00) + + def test_single_byte_nonzero(self): + # Pre-computed expected value for 0xFF + result = _crc8(b"\xFF") + self.assertIsInstance(result, int) + self.assertGreaterEqual(result, 0) + self.assertLessEqual(result, 0xFF) + + def test_deterministic(self): + data = b"\x01\x02\x04\x08" + self.assertEqual(_crc8(data), _crc8(data)) + + def test_different_data_different_crc(self): + self.assertNotEqual(_crc8(b"\x01"), _crc8(b"\x02")) + + def test_byte_range(self): + for b in range(256): + r = _crc8(bytes([b])) + self.assertGreaterEqual(r, 0) + self.assertLessEqual(r, 255) + + +# ── Frame structure tests ───────────────────────────────────────────────────── + +class TestFrameStructure(unittest.TestCase): + """Verify encode functions produce correctly structured frames.""" + + def _check_frame(self, data: bytes, expected_type: int, expected_payload: bytes): + """Assert frame matches [SYNC][LEN][TYPE][PAYLOAD][CRC8].""" + self.assertGreaterEqual(len(data), 4) + self.assertEqual(data[0], SYNC_BYTE, "SYNC byte mismatch") + length = data[1] + self.assertEqual(data[2], expected_type, "TYPE byte mismatch") + self.assertEqual(length, len(expected_payload), "LEN byte mismatch") + self.assertEqual(data[3:3 + length], expected_payload, "PAYLOAD mismatch") + # Verify CRC over [LEN][TYPE][PAYLOAD] + crc_data = bytes([length, expected_type]) + expected_payload + expected_crc = _crc8(crc_data) + self.assertEqual(data[-1], expected_crc, "CRC8 mismatch") + + def test_heartbeat_frame(self): + frame = encode_heartbeat() + self._check_frame(frame, CmdType.HEARTBEAT, b"") + + def test_drive_frame_zeros(self): + frame = encode_drive(0, 0) + payload = struct.pack(">hh", 0, 0) + self._check_frame(frame, CmdType.DRIVE, payload) + + def test_drive_frame_values(self): + frame = encode_drive(500, -300) + payload = struct.pack(">hh", 500, -300) + self._check_frame(frame, CmdType.DRIVE, payload) + + def test_drive_clamping(self): + # Values outside ±1000 must be clamped + frame = encode_drive(5000, -9999) + _, _, _, p0, p1, p2, p3, _ = frame + speed, steer = struct.unpack(">hh", bytes([p0, p1, p2, p3])) + self.assertEqual(speed, 1000) + self.assertEqual(steer, -1000) + + def test_estop_assert(self): + frame = encode_estop(True) + self._check_frame(frame, CmdType.ESTOP, b"\x01") + + def test_estop_clear(self): + frame = encode_estop(False) + self._check_frame(frame, CmdType.ESTOP, b"\x00") + + def test_arm_frame(self): + for arm_val in (True, False): + frame = encode_arm(arm_val) + self._check_frame(frame, CmdType.ARM, bytes([1 if arm_val else 0])) + + def test_pid_frame(self): + frame = encode_pid(1.0, 0.1, 0.01) + payload = struct.pack(">fff", 1.0, 0.1, 0.01) + self._check_frame(frame, CmdType.PID, payload) + + def test_pid_negative_raises(self): + with self.assertRaises(ValueError): + encode_pid(-1.0, 0.1, 0.01) + + +# ── Parser round-trip tests ─────────────────────────────────────────────────── + +class TestFrameParser(unittest.TestCase): + """Feed encoded frames back through the parser and verify decode.""" + + def _parse_all(self, data: bytes): + """Feed all bytes; return list of decoded frames.""" + parser = FrameParser() + results = [] + for byte in data: + r = parser.feed(byte) + if r is not None: + results.append(r) + return parser, results + + # ── Command echo round-trips (parser recognises unknown cmd types as raw tuples) ── + + def test_heartbeat_roundtrip(self): + frame = encode_heartbeat() + parser, results = self._parse_all(frame) + self.assertEqual(len(results), 1) + # HEARTBEAT (0x01) is a command type — no dataclass decoder, returns raw tuple + self.assertIsInstance(results[0], tuple) + type_code, payload = results[0] + self.assertEqual(type_code, CmdType.HEARTBEAT) + self.assertEqual(payload, b"") + + def test_drive_roundtrip(self): + frame = encode_drive(250, -100) + parser, results = self._parse_all(frame) + self.assertEqual(len(results), 1) + type_code, payload = results[0] + self.assertEqual(type_code, CmdType.DRIVE) + speed, steer = struct.unpack(">hh", payload) + self.assertEqual(speed, 250) + self.assertEqual(steer, -100) + + # ── Telemetry decode round-trips ──────────────────────────────────────── + + def _build_telem_frame(self, type_code: int, payload: bytes) -> bytes: + length = len(payload) + crc = _crc8(bytes([length, type_code]) + payload) + return bytes([SYNC_BYTE, length, type_code]) + payload + bytes([crc]) + + def test_status_frame_decode(self): + raw = struct.pack(">hhHBB", 152, 300, 11400, BalanceState.ARMED, 0x00) + frame = self._build_telem_frame(TelType.STATUS, raw) + _, results = self._parse_all(frame) + self.assertEqual(len(results), 1) + f = results[0] + self.assertIsInstance(f, StatusFrame) + self.assertAlmostEqual(f.pitch_deg, 15.2, places=5) + self.assertEqual(f.motor_cmd, 300) + self.assertEqual(f.vbat_mv, 11400) + self.assertEqual(f.balance_state, BalanceState.ARMED) + self.assertEqual(f.flags, 0x00) + + def test_status_estop_flag(self): + raw = struct.pack(">hhHBB", 0, 0, 10000, BalanceState.ESTOP, 0x01) + frame = self._build_telem_frame(TelType.STATUS, raw) + _, results = self._parse_all(frame) + f = results[0] + self.assertIsInstance(f, StatusFrame) + self.assertTrue(bool(f.flags & 0x01)) # estop_active flag set + + def test_vesc_left_frame_decode(self): + # erpm=12000, voltage_mv=22400, current_ma=4500, temp_x10=352 + raw = struct.pack(">iHhH", 12000, 22400, 4500, 352) + frame = self._build_telem_frame(TelType.VESC_LEFT, raw) + _, results = self._parse_all(frame) + self.assertEqual(len(results), 1) + f = results[0] + self.assertIsInstance(f, VescFrame) + self.assertEqual(f.vesc_id, 56) + self.assertEqual(f.erpm, 12000) + self.assertEqual(f.voltage_mv, 22400) + self.assertEqual(f.current_ma, 4500) + self.assertAlmostEqual(f.temp_c, 35.2, places=5) + + def test_vesc_right_frame_decode(self): + raw = struct.pack(">iHhH", -6000, 22000, -200, 280) + frame = self._build_telem_frame(TelType.VESC_RIGHT, raw) + _, results = self._parse_all(frame) + f = results[0] + self.assertIsInstance(f, VescFrame) + self.assertEqual(f.vesc_id, 68) + self.assertEqual(f.erpm, -6000) + self.assertAlmostEqual(f.temp_c, 28.0, places=5) + + def test_ack_frame_decode(self): + raw = bytes([CmdType.DRIVE]) + frame = self._build_telem_frame(TelType.ACK, raw) + _, results = self._parse_all(frame) + f = results[0] + self.assertIsInstance(f, AckFrame) + self.assertEqual(f.echoed_type, CmdType.DRIVE) + + def test_nack_frame_decode(self): + raw = bytes([CmdType.DRIVE, 0x03]) # ERR_ESTOP_ACTIVE + frame = self._build_telem_frame(TelType.NACK, raw) + _, results = self._parse_all(frame) + f = results[0] + self.assertIsInstance(f, NackFrame) + self.assertEqual(f.cmd_type, CmdType.DRIVE) + self.assertEqual(f.error_code, 0x03) + + # ── Parser robustness ─────────────────────────────────────────────────── + + def test_bad_crc_discarded(self): + frame = bytearray(encode_heartbeat()) + frame[-1] ^= 0xFF # corrupt CRC + parser, results = self._parse_all(bytes(frame)) + self.assertEqual(results, []) + self.assertEqual(parser.frames_error, 1) + self.assertEqual(parser.frames_ok, 0) + + def test_garbage_before_sync(self): + garbage = bytes([0x00, 0xFF, 0x01, 0x22]) + frame = encode_heartbeat() + parser, results = self._parse_all(garbage + frame) + self.assertEqual(len(results), 1) + + def test_two_frames_sequential(self): + frames = encode_heartbeat() + encode_drive(100, 50) + parser, results = self._parse_all(frames) + self.assertEqual(len(results), 2) + self.assertEqual(parser.frames_ok, 2) + + def test_oversized_payload_rejected(self): + # Craft a frame claiming LEN=63 (> MAX_PAYLOAD_LEN=62) + bad = bytes([SYNC_BYTE, 63, CmdType.DRIVE]) + parser, results = self._parse_all(bad) + self.assertEqual(results, []) + self.assertEqual(parser.frames_error, 1) + + def test_parser_counter_increments(self): + parser = FrameParser() + frame = encode_drive(10, 10) + for b in frame: + parser.feed(b) + self.assertEqual(parser.frames_ok, 1) + self.assertEqual(parser.frames_error, 0) + + def test_reset_clears_state(self): + parser = FrameParser() + # Feed partial frame then reset + partial = encode_drive(0, 0)[:-2] + for b in partial: + parser.feed(b) + parser.reset() + # Now a clean frame should decode correctly + for b in encode_heartbeat(): + r = parser.feed(b) + self.assertEqual(parser.frames_ok, 1) + + +# ── Encode parameter edge cases ─────────────────────────────────────────────── + +class TestEncodeEdgeCases(unittest.TestCase): + def test_drive_clamp_positive(self): + frame = encode_drive(9999, 9999) + payload = frame[3:7] + speed, steer = struct.unpack(">hh", payload) + self.assertEqual(speed, 1000) + self.assertEqual(steer, 1000) + + def test_drive_clamp_negative(self): + frame = encode_drive(-9999, -9999) + payload = frame[3:7] + speed, steer = struct.unpack(">hh", payload) + self.assertEqual(speed, -1000) + self.assertEqual(steer, -1000) + + def test_pid_zero_gains_valid(self): + frame = encode_pid(0.0, 0.0, 0.0) + self.assertIsNotNone(frame) + + def test_pid_large_gains_valid(self): + frame = encode_pid(100.0, 50.0, 25.0) + self.assertIsNotNone(frame) + + +if __name__ == "__main__": + unittest.main() -- 2.47.2