diff --git a/jetson/ros2_ws/src/saltybot_bridge/launch/uart_bridge.launch.py b/jetson/ros2_ws/src/saltybot_bridge/launch/uart_bridge.launch.py index aa460f3..427279e 100644 --- a/jetson/ros2_ws/src/saltybot_bridge/launch/uart_bridge.launch.py +++ b/jetson/ros2_ws/src/saltybot_bridge/launch/uart_bridge.launch.py @@ -2,7 +2,7 @@ uart_bridge.launch.py — FC↔Orin UART bridge (Issue #362) Launches serial_bridge_node configured for Jetson Orin UART port. -Bridges Flight Controller (STM32F722) telemetry from /dev/ttyTHS1 into ROS2. +Bridges Flight Controller (ESP32-S3 BALANCE) telemetry from /dev/ttyTHS1 into ROS2. Published topics (same as USB CDC bridge): /saltybot/imu sensor_msgs/Imu — pitch/roll/yaw as angular velocity diff --git a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py index 993d2f4..b436233 100644 --- a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py +++ b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/cmd_vel_bridge_node.py @@ -378,7 +378,7 @@ class CmdVelBridgeNode(Node): diag.header.stamp = stamp status = DiagnosticStatus() status.name = "saltybot/balance_controller" - status.hardware_id = "stm32f722" + status.hardware_id = "esp32_balance" status.message = f"{state_label} [{mode_label}]" status.level = ( DiagnosticStatus.OK if state == 1 else @@ -406,7 +406,7 @@ class CmdVelBridgeNode(Node): status = DiagnosticStatus() status.level = DiagnosticStatus.ERROR status.name = "saltybot/balance_controller" - status.hardware_id = "stm32f722" + status.hardware_id = "esp32_balance" status.message = f"IMU fault errno={errno}" diag.status.append(status) self._diag_pub.publish(diag) diff --git a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_can_node.py b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_can_node.py index f0a4ee8..c836eb5 100644 --- a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_can_node.py +++ b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_can_node.py @@ -322,7 +322,7 @@ class SaltybotCanNode(Node): diag.header.stamp = stamp st = DiagnosticStatus() st.name = "saltybot/balance_controller" - st.hardware_id = "stm32f722" + st.hardware_id = "esp32_balance" st.message = state_label st.level = (DiagnosticStatus.OK if state == 1 else DiagnosticStatus.WARN if state == 0 else diff --git a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_cmd_node.py b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_cmd_node.py index ddcd5f0..798557e 100644 --- a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_cmd_node.py +++ b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/saltybot_cmd_node.py @@ -266,7 +266,7 @@ class SaltybotCmdNode(Node): diag.header.stamp = stamp status = DiagnosticStatus() status.name = "saltybot/balance_controller" - status.hardware_id = "stm32f722" + status.hardware_id = "esp32_balance" status.message = state_label if state == 1: status.level = DiagnosticStatus.OK @@ -294,7 +294,7 @@ class SaltybotCmdNode(Node): status = DiagnosticStatus() status.level = DiagnosticStatus.ERROR status.name = "saltybot/balance_controller" - status.hardware_id = "stm32f722" + status.hardware_id = "esp32_balance" status.message = f"IMU fault errno={errno}" diag.status.append(status) self._diag_pub.publish(diag) diff --git a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/serial_bridge_node.py b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/serial_bridge_node.py index 6816410..9041682 100644 --- a/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/serial_bridge_node.py +++ b/jetson/ros2_ws/src/saltybot_bridge/saltybot_bridge/serial_bridge_node.py @@ -1,6 +1,6 @@ """ saltybot_bridge — serial_bridge_node -STM32F722 USB CDC → ROS2 topic publisher +ESP32-S3 BALANCE USB CDC → ROS2 topic publisher Telemetry frame (50 Hz, newline-delimited JSON): {"p":,"r":,"e":,"ig":, @@ -264,7 +264,7 @@ class SerialBridgeNode(Node): diag.header.stamp = stamp status = DiagnosticStatus() status.name = "saltybot/balance_controller" - status.hardware_id = "stm32f722" + status.hardware_id = "esp32_balance" status.message = state_label if state == 1: # ARMED @@ -293,7 +293,7 @@ class SerialBridgeNode(Node): status = DiagnosticStatus() status.level = DiagnosticStatus.ERROR status.name = "saltybot/balance_controller" - status.hardware_id = "stm32f722" + status.hardware_id = "esp32_balance" status.message = f"IMU fault errno={errno}" diag.status.append(status) self._diag_pub.publish(diag) diff --git a/jetson/ros2_ws/src/saltybot_can_bridge/config/can_bridge_params.yaml b/jetson/ros2_ws/src/saltybot_can_bridge/config/can_bridge_params.yaml index f5babf2..2ce4970 100644 --- a/jetson/ros2_ws/src/saltybot_can_bridge/config/can_bridge_params.yaml +++ b/jetson/ros2_ws/src/saltybot_can_bridge/config/can_bridge_params.yaml @@ -1,7 +1,11 @@ can_bridge_node: ros__parameters: - can_interface: slcan0 + # SocketCAN interface — CANable 2.0 USB-to-CAN on Orin + can_interface: "can0" + + # VESC CAN node IDs (FSESC 6.7 Pro Mini Dual) left_vesc_can_id: 56 right_vesc_can_id: 68 - mamba_can_id: 1 + + # Watchdog: send zero velocity if no /cmd_vel for this many seconds command_timeout_s: 0.5 diff --git a/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/__init__.py b/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/__init__.py index e146650..10069bb 100644 --- a/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/__init__.py +++ b/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/__init__.py @@ -1 +1 @@ -"""SaltyBot CAN bridge package — Mamba controller and VESC telemetry via python-can.""" +"""saltybot_can_bridge — CAN bus bridge for ESP32 BALANCE and VESC telemetry.""" diff --git a/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/balance_protocol.py b/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/balance_protocol.py new file mode 100644 index 0000000..746b598 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/balance_protocol.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +"""balance_protocol.py — CAN frame codec for SAUL-TEE ESP32 BALANCE + VESC. + +CAN bus: 500 kbps, CANable 2.0 (can0) on Orin. + +── Orin → ESP32 BALANCE (commands) ─────────────────────────────────────────── + 0x300 DRIVE 8 B [left_mps f32 LE][right_mps f32 LE] + 0x301 MODE 1 B [mode u8] 0=idle 1=drive 2=estop + 0x302 ESTOP 1 B [flags u8] bit0=stop bit1=clear + 0x303 LED 4 B [pattern u8][r u8][g u8][b u8] + +── ESP32 BALANCE → Orin (telemetry) ────────────────────────────────────────── + 0x400 FC_STATUS 8 B [pitch_x10 i16][motor_cmd u16][vbat_mv u16][state u8][flags u8] 10 Hz + 0x401 FC_VESC 8 B [l_rpm_x10 i16][r_rpm_x10 i16][l_cur_x10 i16][r_cur_x10 i16] 10 Hz + +── VESC standard CAN frames (29-bit extended IDs) ──────────────────────────── + VESC node IDs: Left = 56, Right = 68 + ID = (packet_type << 8) | node_id +""" + +import struct +from dataclasses import dataclass + +ORIN_CMD_DRIVE: int = 0x300 +ORIN_CMD_MODE: int = 0x301 +ORIN_CMD_ESTOP: int = 0x302 +ORIN_CMD_LED: int = 0x303 + +FC_STATUS: int = 0x400 +FC_VESC: int = 0x401 + +VESC_LEFT_ID: int = 56 +VESC_RIGHT_ID: int = 68 + +VESC_CMD_SET_RPM: int = 3 +VESC_STATUS_1: int = 9 +VESC_STATUS_4: int = 16 +VESC_STATUS_5: int = 27 + +MODE_IDLE: int = 0 +MODE_DRIVE: int = 1 +MODE_ESTOP: int = 2 + + +@dataclass +class FcStatus: + pitch_deg: float = 0.0 + motor_cmd: int = 0 + vbat_mv: int = 0 + state: int = 0 + flags: int = 0 + + +@dataclass +class FcVesc: + left_rpm: float = 0.0 + right_rpm: float = 0.0 + left_cur: float = 0.0 + right_cur: float = 0.0 + + +@dataclass +class VescStatus1: + node_id: int = 0 + erpm: float = 0.0 + current: float = 0.0 + duty: float = 0.0 + + +@dataclass +class VescStatus4: + node_id: int = 0 + temp_fet_c: float = 0.0 + temp_motor_c: float = 0.0 + current_in: float = 0.0 + + +@dataclass +class VescStatus5: + node_id: int = 0 + tacho: int = 0 + vbat_v: float = 0.0 + + +def encode_drive_cmd(left_mps: float, right_mps: float) -> bytes: + """Encode ORIN_CMD_DRIVE (0x300): 8 bytes, 2 × float32 little-endian.""" + return struct.pack(" bytes: + """Encode ORIN_CMD_MODE (0x301): 1 byte mode value.""" + if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP): + raise ValueError(f"Invalid mode {mode!r}") + return struct.pack("B", mode) + + +def encode_estop_cmd(stop: bool = True, clear: bool = False) -> bytes: + """Encode ORIN_CMD_ESTOP (0x302): 1 byte flags (bit0=stop, bit1=clear).""" + return struct.pack("B", (0x01 if stop else 0) | (0x02 if clear else 0)) + + +def encode_led_cmd(pattern: int, r: int, g: int, b: int) -> bytes: + """Encode ORIN_CMD_LED (0x303): 4 bytes.""" + return struct.pack("BBBB", pattern & 0xFF, r & 0xFF, g & 0xFF, b & 0xFF) + + +def encode_vesc_set_rpm(node_id: int, rpm: int) -> tuple: + """Return (arb_id, payload) for a VESC SET_RPM command.""" + return (VESC_CMD_SET_RPM << 8) | (node_id & 0xFF), struct.pack(">i", int(rpm)) + + +def decode_fc_status(data: bytes) -> FcStatus: + """Decode FC_STATUS (0x400) payload → FcStatus.""" + if len(data) < 8: + raise ValueError(f"FC_STATUS expects 8 bytes, got {len(data)}") + pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack_from(">hHHBB", data) + return FcStatus( + pitch_deg=pitch_x10 / 10.0, + motor_cmd=motor_cmd, + vbat_mv=vbat_mv, + state=state, + flags=flags, + ) + + +def decode_fc_vesc(data: bytes) -> FcVesc: + """Decode FC_VESC (0x401) payload → FcVesc.""" + if len(data) < 8: + raise ValueError(f"FC_VESC expects 8 bytes, got {len(data)}") + l_rpm, r_rpm, l_cur, r_cur = struct.unpack_from(">hhhh", data) + return FcVesc( + left_rpm=l_rpm / 10.0, + right_rpm=r_rpm / 10.0, + left_cur=l_cur / 10.0, + right_cur=r_cur / 10.0, + ) + + +def decode_vesc_can_id(can_id: int) -> tuple: + """Decode a VESC extended CAN ID → (packet_type, node_id).""" + return (can_id >> 8) & 0xFF, can_id & 0xFF + + +def decode_vesc_status1(node_id: int, data: bytes) -> VescStatus1: + """Decode VESC STATUS_1 payload → VescStatus1.""" + erpm, cur_x10, duty_x1000 = struct.unpack_from(">ihh", data[:8]) + return VescStatus1( + node_id=node_id, + erpm=float(erpm), + current=cur_x10 / 10.0, + duty=duty_x1000 / 1000.0, + ) + + +def decode_vesc_status4(node_id: int, data: bytes) -> VescStatus4: + """Decode VESC STATUS_4 payload → VescStatus4.""" + tfet, tmot, cur_in, _ = struct.unpack_from(">hhhh", data[:8]) + return VescStatus4( + node_id=node_id, + temp_fet_c=tfet / 10.0, + temp_motor_c=tmot / 10.0, + current_in=cur_in / 10.0, + ) + + +def decode_vesc_status5(node_id: int, data: bytes) -> VescStatus5: + """Decode VESC STATUS_5 payload → VescStatus5.""" + tacho, vbat_x10 = struct.unpack_from(">ih", data[:6]) + return VescStatus5(node_id=node_id, tacho=tacho, vbat_v=vbat_x10 / 10.0) diff --git a/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/can_bridge_node.py b/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/can_bridge_node.py index fe62740..0214d5f 100644 --- a/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/can_bridge_node.py +++ b/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/can_bridge_node.py @@ -1,20 +1,20 @@ #!/usr/bin/env python3 """ -can_bridge_node.py — ROS2 node bridging the SaltyBot Orin to the Mamba motor +can_bridge_node.py — ROS2 node bridging the SaltyBot Orin to the ESP32-S3 BALANCE controller and VESC motor controllers over CAN bus. -The node opens the SocketCAN interface (slcan0 by default), spawns a background +The node opens the SocketCAN interface (can0 by default), spawns a background reader thread to process incoming telemetry, and exposes the following interface: Subscriptions ------------- - /cmd_vel geometry_msgs/Twist → VESC speed commands (CAN) - /estop std_msgs/Bool → Mamba e-stop (CAN) + /cmd_vel geometry_msgs/Twist -> drive commands (CAN 0x300) + /estop std_msgs/Bool -> e-stop (CAN 0x303) Publications ------------ - /can/imu sensor_msgs/Imu Mamba IMU telemetry - /can/battery sensor_msgs/BatteryState Mamba battery telemetry + /can/attitude std_msgs/Float32MultiArray ESP32-S3 BALANCE attitude + /can/battery std_msgs/Float32MultiArray ESP32-S3 BALANCE battery /can/vesc/left/state std_msgs/Float32MultiArray Left VESC state /can/vesc/right/state std_msgs/Float32MultiArray Right VESC state /can/connection_status std_msgs/String "connected" | "disconnected" @@ -30,30 +30,24 @@ import can import rclpy from geometry_msgs.msg import Twist from rclpy.node import Node -from rcl_interfaces.msg import SetParametersResult -from sensor_msgs.msg import BatteryState, Imu from std_msgs.msg import Bool, Float32MultiArray, String -from saltybot_can_bridge.mamba_protocol import ( - MAMBA_CMD_ESTOP, - MAMBA_CMD_MODE, - MAMBA_CMD_VELOCITY, - MAMBA_TELEM_BATTERY, - MAMBA_TELEM_IMU, - VESC_TELEM_STATE, - ORIN_CAN_ID_FC_PID_ACK, - ORIN_CAN_ID_PID_SET, +from saltybot_can_bridge.balance_protocol import ( + ESP32_TELEM_ATTITUDE, + ESP32_TELEM_BATTERY, + ORIN_CMD_DRIVE, + ORIN_CMD_ESTOP, + VESC_LEFT_ID, + VESC_RIGHT_ID, + VESC_STATUS_1, MODE_DRIVE, - MODE_ESTOP, MODE_IDLE, + encode_drive_cmd, encode_estop_cmd, - encode_mode_cmd, - encode_velocity_cmd, - encode_pid_set_cmd, - decode_battery_telem, - decode_imu_telem, - decode_pid_ack, - decode_vesc_state, + decode_attitude, + decode_battery, + decode_vesc_can_id, + decode_vesc_status1, ) # Reconnect attempt interval when CAN bus is lost @@ -64,39 +58,35 @@ _WATCHDOG_HZ: float = 10.0 class CanBridgeNode(Node): - """CAN bus bridge between Orin ROS2 and Mamba / VESC controllers.""" + """CAN bus bridge between Orin ROS2 and ESP32-S3 BALANCE / VESC controllers.""" def __init__(self) -> None: super().__init__("can_bridge_node") - # ── Parameters ──────────────────────────────────────────────────── - self.declare_parameter("can_interface", "slcan0") - self.declare_parameter("left_vesc_can_id", 56) - self.declare_parameter("right_vesc_can_id", 68) - self.declare_parameter("mamba_can_id", 1) + # -- Parameters -- + self.declare_parameter("can_interface", "can0") + self.declare_parameter("left_vesc_can_id", VESC_LEFT_ID) + self.declare_parameter("right_vesc_can_id", VESC_RIGHT_ID) self.declare_parameter("command_timeout_s", 0.5) - self.declare_parameter("pid/kp", 0.0) - self.declare_parameter("pid/ki", 0.0) - self.declare_parameter("pid/kd", 0.0) self._iface: str = self.get_parameter("can_interface").value self._left_vesc_id: int = self.get_parameter("left_vesc_can_id").value self._right_vesc_id: int = self.get_parameter("right_vesc_can_id").value - self._mamba_id: int = self.get_parameter("mamba_can_id").value self._cmd_timeout: float = self.get_parameter("command_timeout_s").value - self._pid_kp: float = self.get_parameter("pid/kp").value - self._pid_ki: float = self.get_parameter("pid/ki").value - self._pid_kd: float = self.get_parameter("pid/kd").value - # ── State ───────────────────────────────────────────────────────── + # -- State -- self._bus: Optional[can.BusABC] = None self._connected: bool = False self._last_cmd_time: float = time.monotonic() - self._lock = threading.Lock() # protects _bus / _connected + self._lock = threading.Lock() - # ── Publishers ──────────────────────────────────────────────────── - self._pub_imu = self.create_publisher(Imu, "/can/imu", 10) - self._pub_battery = self.create_publisher(BatteryState, "/can/battery", 10) + # -- Publishers -- + self._pub_attitude = self.create_publisher( + Float32MultiArray, "/can/attitude", 10 + ) + self._pub_battery = self.create_publisher( + Float32MultiArray, "/can/battery", 10 + ) self._pub_vesc_left = self.create_publisher( Float32MultiArray, "/can/vesc/left/state", 10 ) @@ -107,55 +97,29 @@ class CanBridgeNode(Node): String, "/can/connection_status", 10 ) - # ── Subscriptions ───────────────────────────────────────────────── + # -- Subscriptions -- self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10) self.create_subscription(Bool, "/estop", self._estop_cb, 10) - self.add_on_set_parameters_callback(self._on_set_parameters) - # ── Timers ──────────────────────────────────────────────────────── + # -- Timers -- self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb) self.create_timer(_RECONNECT_INTERVAL_S, self._reconnect_cb) - # ── Open CAN ────────────────────────────────────────────────────── + # -- Open CAN -- self._try_connect() - # ── Background reader thread ────────────────────────────────────── + # -- Background reader thread -- self._reader_thread = threading.Thread( target=self._reader_loop, daemon=True, name="can_reader" ) self._reader_thread.start() self.get_logger().info( - f"can_bridge_node ready — iface={self._iface} " - f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id} " - f"mamba={self._mamba_id}" + f"can_bridge_node ready -- iface={self._iface} " + f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id}" ) - # -- PID parameter callback (Issue #693) -- - - def _on_set_parameters(self, params) -> SetParametersResult: - """Send new PID gains over CAN when pid/* params change.""" - for p in params: - if p.name == "pid/kp": - self._pid_kp = float(p.value) - elif p.name == "pid/ki": - self._pid_ki = float(p.value) - elif p.name == "pid/kd": - self._pid_kd = float(p.value) - else: - continue - try: - payload = encode_pid_set_cmd(self._pid_kp, self._pid_ki, self._pid_kd) - self._send_can(ORIN_CAN_ID_PID_SET, payload, "pid_set") - self.get_logger().info( - f"PID gains sent: Kp={self._pid_kp:.2f} " - f"Ki={self._pid_ki:.2f} Kd={self._pid_kd:.2f}" - ) - except ValueError as exc: - return SetParametersResult(successful=False, reason=str(exc)) - return SetParametersResult(successful=True) - - # ── Connection management ────────────────────────────────────────────── + # -- Connection management -- def _try_connect(self) -> None: """Attempt to open the CAN interface; silently skip if already connected.""" @@ -174,7 +138,7 @@ class CanBridgeNode(Node): except Exception as exc: self.get_logger().warning( f"CAN bus not available ({self._iface}): {exc} " - f"— will retry every {_RECONNECT_INTERVAL_S:.0f} s" + f"-- will retry every {_RECONNECT_INTERVAL_S:.0f} s" ) self._connected = False self._publish_status("disconnected") @@ -197,46 +161,33 @@ class CanBridgeNode(Node): self._connected = False self._publish_status("disconnected") - # ── ROS callbacks ───────────────────────────────────────────────────── + # -- ROS callbacks -- def _cmd_vel_cb(self, msg: Twist) -> None: - """Convert /cmd_vel Twist to VESC speed commands over CAN.""" + """Convert /cmd_vel Twist to ESP32-S3 BALANCE drive commands over CAN (0x300).""" self._last_cmd_time = time.monotonic() - if not self._connected: return - - # Differential drive decomposition — individual wheel speeds in m/s. - # The VESC nodes interpret linear velocity directly; angular is handled - # by the sign difference between left and right. linear = msg.linear.x angular = msg.angular.z - - # Forward left = forward right for pure translation; for rotation - # left slows and right speeds up (positive angular = CCW = left turn). - # The Mamba velocity command carries both wheels independently. - left_mps = linear - angular - right_mps = linear + angular - - payload = encode_velocity_cmd(left_mps, right_mps) - self._send_can(MAMBA_CMD_VELOCITY, payload, "cmd_vel") - - # Keep Mamba in DRIVE mode while receiving commands - self._send_can(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE), "cmd_vel mode") + # Differential drive -- motor units [-1000, +1000] + speed_units = int(linear * 1000.0) + steer_units = int(angular * 1000.0) + self._send_can( + ORIN_CMD_DRIVE, + encode_drive_cmd(speed_units, steer_units, mode=MODE_DRIVE), + "cmd_vel", + ) def _estop_cb(self, msg: Bool) -> None: - """Forward /estop to Mamba over CAN.""" + """Forward /estop to ESP32-S3 BALANCE over CAN (0x303).""" if not self._connected: return - payload = encode_estop_cmd(msg.data) - self._send_can(MAMBA_CMD_ESTOP, payload, "estop") if msg.data: - self._send_can( - MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP), "estop mode" - ) - self.get_logger().warning("E-stop asserted — sent ESTOP to Mamba") + self._send_can(ORIN_CMD_ESTOP, encode_estop_cmd(), "estop") + self.get_logger().warning("E-stop asserted -- sent ESTOP to ESP32-S3 BALANCE") - # ── Watchdog ────────────────────────────────────────────────────────── + # -- Watchdog -- def _watchdog_cb(self) -> None: """If no /cmd_vel arrives within the timeout, send zero velocity.""" @@ -245,15 +196,12 @@ class CanBridgeNode(Node): elapsed = time.monotonic() - self._last_cmd_time if elapsed > self._cmd_timeout: self._send_can( - MAMBA_CMD_VELOCITY, - encode_velocity_cmd(0.0, 0.0), + ORIN_CMD_DRIVE, + encode_drive_cmd(0, 0, mode=MODE_IDLE), "watchdog zero-vel", ) - self._send_can( - MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "watchdog idle" - ) - # ── CAN send helper ─────────────────────────────────────────────────── + # -- CAN send helper -- def _send_can(self, arb_id: int, data: bytes, context: str) -> None: """Send a standard CAN frame; handle errors gracefully.""" @@ -261,7 +209,6 @@ class CanBridgeNode(Node): if not self._connected or self._bus is None: return bus = self._bus - msg = can.Message( arbitration_id=arb_id, data=data, @@ -272,132 +219,95 @@ class CanBridgeNode(Node): except can.CanError as exc: self._handle_can_error(exc, f"send({context})") - # ── Background CAN reader ───────────────────────────────────────────── + # -- Background CAN reader -- def _reader_loop(self) -> None: - """ - Blocking CAN read loop executed in a daemon thread. - Dispatches incoming frames to the appropriate handler. - """ + """Blocking CAN read loop executed in a daemon thread.""" while rclpy.ok(): with self._lock: connected = self._connected bus = self._bus - if not connected or bus is None: time.sleep(0.1) continue - try: frame = bus.recv(timeout=0.5) except can.CanError as exc: self._handle_can_error(exc, "reader_loop recv") continue - if frame is None: - # Timeout — no frame within 0.5 s, loop again continue - self._dispatch_frame(frame) def _dispatch_frame(self, frame: can.Message) -> None: """Route an incoming CAN frame to the correct publisher.""" arb_id = frame.arbitration_id data = bytes(frame.data) - try: - if arb_id == MAMBA_TELEM_IMU: - self._handle_imu(data, frame.timestamp) - - elif arb_id == MAMBA_TELEM_BATTERY: - self._handle_battery(data, frame.timestamp) - - elif arb_id == VESC_TELEM_STATE + self._left_vesc_id: - self._handle_vesc_state(data, frame.timestamp, side="left") - - elif arb_id == VESC_TELEM_STATE + self._right_vesc_id: - self._handle_vesc_state(data, frame.timestamp, side="right") - - elif arb_id == ORIN_CAN_ID_FC_PID_ACK: - gains = decode_pid_ack(data) - self.get_logger().debug( - f"FC PID ACK: Kp={gains.kp:.2f} Ki={gains.ki:.2f} Kd={gains.kd:.2f}" - ) - + if arb_id == ESP32_TELEM_ATTITUDE: + self._handle_attitude(data) + elif arb_id == ESP32_TELEM_BATTERY: + self._handle_battery(data) + elif frame.is_extended_id: + # VESC extended 29-bit ID: packet_type<<8 | node_id + pkt_type, node_id = decode_vesc_can_id(arb_id) + if pkt_type == VESC_STATUS_1: + self._handle_vesc_status1(data, node_id) except Exception as exc: self.get_logger().warning( f"Error parsing CAN frame 0x{arb_id:03X}: {exc}" ) - # ── Frame handlers ──────────────────────────────────────────────────── + # -- Frame handlers -- - def _handle_imu(self, data: bytes, timestamp: float) -> None: - telem = decode_imu_telem(data) - - msg = Imu() - msg.header.stamp = self.get_clock().now().to_msg() - msg.header.frame_id = "imu_link" - - msg.linear_acceleration.x = telem.accel_x - msg.linear_acceleration.y = telem.accel_y - msg.linear_acceleration.z = telem.accel_z - - msg.angular_velocity.x = telem.gyro_x - msg.angular_velocity.y = telem.gyro_y - msg.angular_velocity.z = telem.gyro_z - - # Covariance unknown; mark as -1 per REP-145 - msg.orientation_covariance[0] = -1.0 - - self._pub_imu.publish(msg) - - def _handle_battery(self, data: bytes, timestamp: float) -> None: - telem = decode_battery_telem(data) - - msg = BatteryState() - msg.header.stamp = self.get_clock().now().to_msg() - msg.voltage = telem.voltage - msg.current = telem.current - msg.present = True - msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING + def _handle_attitude(self, data: bytes) -> None: + telem = decode_attitude(data) + msg = Float32MultiArray() + # Layout: [pitch_deg, speed, yaw_rate, state, flags] + msg.data = [ + telem.pitch_deg, + telem.speed, + telem.yaw_rate, + float(telem.state), + float(telem.flags), + ] + self._pub_attitude.publish(msg) + def _handle_battery(self, data: bytes) -> None: + telem = decode_battery(data) + msg = Float32MultiArray() + # Layout: [vbat_v, fault_code, rssi] + msg.data = [telem.vbat_mv / 1000.0, float(telem.fault_code), float(telem.rssi)] self._pub_battery.publish(msg) - def _handle_vesc_state( - self, data: bytes, timestamp: float, side: str - ) -> None: - telem = decode_vesc_state(data) - + def _handle_vesc_status1(self, data: bytes, node_id: int) -> None: + status = decode_vesc_status1(node_id, data) msg = Float32MultiArray() - # Layout: [erpm, duty, voltage, current] - msg.data = [telem.erpm, telem.duty, telem.voltage, telem.current] - - if side == "left": + # Layout: [erpm, current, duty] + msg.data = [status.erpm, status.current, status.duty] + if node_id == self._left_vesc_id: self._pub_vesc_left.publish(msg) - else: + elif node_id == self._right_vesc_id: self._pub_vesc_right.publish(msg) - # ── Status helper ───────────────────────────────────────────────────── + # -- Status helper -- def _publish_status(self, status: str) -> None: msg = String() msg.data = status self._pub_status.publish(msg) - # ── Shutdown ────────────────────────────────────────────────────────── + # -- Shutdown -- def destroy_node(self) -> None: """Send zero velocity and shut down the CAN bus cleanly.""" if self._connected and self._bus is not None: try: self._send_can( - MAMBA_CMD_VELOCITY, - encode_velocity_cmd(0.0, 0.0), + ORIN_CMD_DRIVE, + encode_drive_cmd(0, 0, mode=MODE_IDLE), "shutdown", ) - self._send_can( - MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "shutdown" - ) except Exception: pass try: diff --git a/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/mamba_protocol.py b/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/mamba_protocol.py deleted file mode 100644 index 804ae29..0000000 --- a/jetson/ros2_ws/src/saltybot_can_bridge/saltybot_can_bridge/mamba_protocol.py +++ /dev/null @@ -1,224 +0,0 @@ -#!/usr/bin/env python3 -""" -mamba_protocol.py — CAN message encoding/decoding for the Mamba motor controller -and VESC telemetry. - -CAN message layout ------------------- -Command frames (Orin → Mamba / VESC): - MAMBA_CMD_VELOCITY 0x100 8 bytes left_speed (f32, m/s) | right_speed (f32, m/s) - MAMBA_CMD_MODE 0x101 1 byte mode (0=idle, 1=drive, 2=estop) - MAMBA_CMD_ESTOP 0x102 1 byte 0x01 = stop - -Telemetry frames (Mamba → Orin): - MAMBA_TELEM_IMU 0x200 24 bytes accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z (f32 each) - MAMBA_TELEM_BATTERY 0x201 8 bytes voltage (f32, V) | current (f32, A) - -VESC telemetry frame (VESC → Orin): - VESC_TELEM_STATE 0x300 16 bytes erpm (f32) | duty (f32) | voltage (f32) | current (f32) - -All multi-byte fields are big-endian. - -Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674 -""" - -import struct -from dataclasses import dataclass -from typing import Tuple - -# --------------------------------------------------------------------------- -# CAN message IDs -# --------------------------------------------------------------------------- - -MAMBA_CMD_VELOCITY: int = 0x100 -MAMBA_CMD_MODE: int = 0x101 -MAMBA_CMD_ESTOP: int = 0x102 - -MAMBA_TELEM_IMU: int = 0x200 -MAMBA_TELEM_BATTERY: int = 0x201 - -VESC_TELEM_STATE: int = 0x300 -ORIN_CAN_ID_PID_SET: int = 0x305 -ORIN_CAN_ID_FC_PID_ACK: int = 0x405 - -# --------------------------------------------------------------------------- -# Mode constants -# --------------------------------------------------------------------------- - -MODE_IDLE: int = 0 -MODE_DRIVE: int = 1 -MODE_ESTOP: int = 2 - -# --------------------------------------------------------------------------- -# Data classes for decoded telemetry -# --------------------------------------------------------------------------- - - -@dataclass -class ImuTelemetry: - """Decoded IMU telemetry from Mamba (MAMBA_TELEM_IMU).""" - - accel_x: float = 0.0 # m/s² - accel_y: float = 0.0 - accel_z: float = 0.0 - gyro_x: float = 0.0 # rad/s - gyro_y: float = 0.0 - gyro_z: float = 0.0 - - -@dataclass -class BatteryTelemetry: - """Decoded battery telemetry from Mamba (MAMBA_TELEM_BATTERY).""" - - voltage: float = 0.0 # V - current: float = 0.0 # A - - -@dataclass -class VescStateTelemetry: - """Decoded VESC state telemetry (VESC_TELEM_STATE).""" - - erpm: float = 0.0 # electrical RPM - duty: float = 0.0 # duty cycle [-1.0, 1.0] - voltage: float = 0.0 # bus voltage, V - current: float = 0.0 # phase current, A - - -@dataclass -class PidGains: - """Balance PID gains (Issue #693).""" - kp: float = 0.0 - ki: float = 0.0 - kd: float = 0.0 - - -# --------------------------------------------------------------------------- -# Encode helpers -# --------------------------------------------------------------------------- - -_FMT_VEL = ">ff" # 2 × float32, big-endian -_FMT_MODE = ">B" # 1 × uint8 -_FMT_ESTOP = ">B" # 1 × uint8 -_FMT_IMU = ">ffffff" # 6 × float32 -_FMT_BAT = ">ff" # 2 × float32 -_FMT_VESC = ">ffff" # 4 × float32 - - -def encode_velocity_cmd(left_mps: float, right_mps: float) -> bytes: - """ - Encode a MAMBA_CMD_VELOCITY payload. - - Parameters - ---------- - left_mps: target left wheel speed in m/s (positive = forward) - right_mps: target right wheel speed in m/s (positive = forward) - - Returns - ------- - 8-byte big-endian payload suitable for a CAN frame. - """ - return struct.pack(_FMT_VEL, float(left_mps), float(right_mps)) - - -def encode_mode_cmd(mode: int) -> bytes: - """ - Encode a MAMBA_CMD_MODE payload. - - Parameters - ---------- - mode: one of MODE_IDLE (0), MODE_DRIVE (1), MODE_ESTOP (2) - - Returns - ------- - 1-byte payload. - """ - if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP): - raise ValueError(f"Invalid mode {mode!r}; expected 0, 1, or 2") - return struct.pack(_FMT_MODE, mode) - - -def encode_estop_cmd(stop: bool = True) -> bytes: - """ - Encode a MAMBA_CMD_ESTOP payload. - - Parameters - ---------- - stop: True to assert e-stop, False to clear. - - Returns - ------- - 1-byte payload (0x01 = stop, 0x00 = clear). - """ - return struct.pack(_FMT_ESTOP, 0x01 if stop else 0x00) - - -def encode_pid_set_cmd(kp: float, ki: float, kd: float) -> bytes: - """Encode ORIN_CAN_ID_PID_SET (6 bytes, uint16 BE x3). Issue #693.""" - if kp < 0.0 or ki < 0.0 or kd < 0.0: - raise ValueError("PID gains must be non-negative") - return struct.pack(_FMT_PID, round(min(kp,_PID_KP_MAX)*100), round(min(ki,_PID_KI_MAX)*100), round(min(kd,_PID_KD_MAX)*100)) - - -# --------------------------------------------------------------------------- -# Decode helpers -# --------------------------------------------------------------------------- - -def decode_imu_telem(data: bytes) -> ImuTelemetry: - """ - Decode a MAMBA_TELEM_IMU payload. - - Parameters - ---------- - data: exactly 24 bytes (6 × float32, big-endian). - - Returns - ------- - ImuTelemetry dataclass instance. - - Raises - ------ - struct.error if data is the wrong length. - """ - ax, ay, az, gx, gy, gz = struct.unpack(_FMT_IMU, data) - return ImuTelemetry( - accel_x=ax, accel_y=ay, accel_z=az, - gyro_x=gx, gyro_y=gy, gyro_z=gz, - ) - - -def decode_battery_telem(data: bytes) -> BatteryTelemetry: - """ - Decode a MAMBA_TELEM_BATTERY payload. - - Parameters - ---------- - data: exactly 8 bytes (2 × float32, big-endian). - - Returns - ------- - BatteryTelemetry dataclass instance. - """ - voltage, current = struct.unpack(_FMT_BAT, data) - return BatteryTelemetry(voltage=voltage, current=current) - - -def decode_vesc_state(data: bytes) -> VescStateTelemetry: - """ - Decode a VESC_TELEM_STATE payload. - - Parameters - ---------- - data: exactly 16 bytes (4 × float32, big-endian). - - Returns - ------- - VescStateTelemetry dataclass instance. - """ - erpm, duty, voltage, current = struct.unpack(_FMT_VESC, data) - return VescStateTelemetry(erpm=erpm, duty=duty, voltage=voltage, current=current) - - -def decode_pid_ack(data: bytes) -> PidGains: - """Decode ORIN_CAN_ID_FC_PID_ACK (6 bytes). Issue #693.""" - kp_x100, ki_x100, kd_x100 = struct.unpack(_FMT_PID, data) - return PidGains(kp=kp_x100/100.0, ki=ki_x100/100.0, kd=kd_x100/100.0) diff --git a/jetson/ros2_ws/src/saltybot_can_bridge/setup.py b/jetson/ros2_ws/src/saltybot_can_bridge/setup.py index 88161c9..d5fb4c1 100644 --- a/jetson/ros2_ws/src/saltybot_can_bridge/setup.py +++ b/jetson/ros2_ws/src/saltybot_can_bridge/setup.py @@ -1,21 +1,22 @@ -from setuptools import setup +from setuptools import find_packages, setup package_name = "saltybot_can_bridge" setup( name=package_name, - version="0.1.0", - packages=[package_name], + version="0.0.1", + packages=find_packages(exclude=["test"]), data_files=[ - ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), - (f"share/{package_name}", ["package.xml"]), - (f"share/{package_name}/config", ["config/can_bridge_params.yaml"]), + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + ("share/" + package_name + "/config", ["config/can_bridge_params.yaml"]), + ("share/" + package_name + "/launch", ["launch/can_bridge.launch.py"]), ], - install_requires=["setuptools", "python-can"], + install_requires=["setuptools"], zip_safe=True, - maintainer="sl-controls", - maintainer_email="sl-controls@saltylab.local", - description="CAN bus bridge for Mamba controller and VESC telemetry", + maintainer="seb", + maintainer_email="seb@example.com", + description="CAN bus bridge for ESP32 BALANCE and VESC telemetry", license="MIT", tests_require=["pytest"], entry_points={ diff --git a/jetson/ros2_ws/src/saltybot_can_bridge/test/test_can_bridge.py b/jetson/ros2_ws/src/saltybot_can_bridge/test/test_can_bridge.py index 2b8f9bf..efb3908 100644 --- a/jetson/ros2_ws/src/saltybot_can_bridge/test/test_can_bridge.py +++ b/jetson/ros2_ws/src/saltybot_can_bridge/test/test_can_bridge.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 """ -Unit tests for saltybot_can_bridge.mamba_protocol. +Unit tests for saltybot_can_bridge.balance_protocol. -No ROS2 or CAN hardware required — tests exercise encode/decode round-trips +No ROS2 or CAN hardware required -- tests exercise encode/decode round-trips and boundary conditions entirely in Python. Run with: pytest test/test_can_bridge.py -v @@ -11,243 +11,165 @@ Run with: pytest test/test_can_bridge.py -v import struct import unittest -from saltybot_can_bridge.mamba_protocol import ( - MAMBA_CMD_ESTOP, - MAMBA_CMD_MODE, - MAMBA_CMD_VELOCITY, - MAMBA_TELEM_BATTERY, - MAMBA_TELEM_IMU, - VESC_TELEM_STATE, - MODE_DRIVE, - MODE_ESTOP, +from saltybot_can_bridge.balance_protocol import ( + ORIN_CMD_DRIVE, + ORIN_CMD_ARM, + ORIN_CMD_PID, + ORIN_CMD_ESTOP, + ESP32_TELEM_ATTITUDE, + ESP32_TELEM_BATTERY, + VESC_LEFT_ID, + VESC_RIGHT_ID, + VESC_STATUS_1, MODE_IDLE, + MODE_DRIVE, + MODE_AUTONOMOUS, + AttitudeTelemetry, BatteryTelemetry, - ImuTelemetry, - VescStateTelemetry, - decode_battery_telem, - decode_imu_telem, - decode_vesc_state, + VescStatus1, + decode_attitude, + decode_battery, + decode_vesc_status1, + encode_drive_cmd, + encode_arm_cmd, encode_estop_cmd, - encode_mode_cmd, - encode_velocity_cmd, ) class TestMessageIDs(unittest.TestCase): - """Verify the CAN message ID constants are correct.""" + """Verify CAN message ID constants match the spec.""" - def test_command_ids(self): - self.assertEqual(MAMBA_CMD_VELOCITY, 0x100) - self.assertEqual(MAMBA_CMD_MODE, 0x101) - self.assertEqual(MAMBA_CMD_ESTOP, 0x102) + def test_orin_cmd_ids(self): + self.assertEqual(ORIN_CMD_DRIVE, 0x300) + self.assertEqual(ORIN_CMD_ARM, 0x301) + self.assertEqual(ORIN_CMD_PID, 0x302) + self.assertEqual(ORIN_CMD_ESTOP, 0x303) - def test_telemetry_ids(self): - self.assertEqual(MAMBA_TELEM_IMU, 0x200) - self.assertEqual(MAMBA_TELEM_BATTERY, 0x201) - self.assertEqual(VESC_TELEM_STATE, 0x300) + def test_esp32_telem_ids(self): + self.assertEqual(ESP32_TELEM_ATTITUDE, 0x400) + self.assertEqual(ESP32_TELEM_BATTERY, 0x401) + + def test_vesc_ids(self): + self.assertEqual(VESC_LEFT_ID, 56) + self.assertEqual(VESC_RIGHT_ID, 68) + self.assertEqual(VESC_STATUS_1, 9) -class TestVelocityEncode(unittest.TestCase): - """Tests for encode_velocity_cmd.""" +class TestDriveEncode(unittest.TestCase): + """Tests for encode_drive_cmd.""" def test_zero_velocity(self): - payload = encode_velocity_cmd(0.0, 0.0) + payload = encode_drive_cmd(0, 0) self.assertEqual(len(payload), 8) - left, right = struct.unpack(">ff", payload) - self.assertAlmostEqual(left, 0.0, places=5) - self.assertAlmostEqual(right, 0.0, places=5) + speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload) + self.assertEqual(speed, 0) + self.assertEqual(steer, 0) + self.assertEqual(mode, MODE_DRIVE) - def test_forward_velocity(self): - payload = encode_velocity_cmd(1.5, 1.5) - left, right = struct.unpack(">ff", payload) - self.assertAlmostEqual(left, 1.5, places=5) - self.assertAlmostEqual(right, 1.5, places=5) + def test_forward(self): + payload = encode_drive_cmd(500, 0) + speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload) + self.assertEqual(speed, 500) + self.assertEqual(steer, 0) - def test_differential_velocity(self): - payload = encode_velocity_cmd(-0.5, 0.5) - left, right = struct.unpack(">ff", payload) - self.assertAlmostEqual(left, -0.5, places=5) - self.assertAlmostEqual(right, 0.5, places=5) + def test_clamping_high(self): + payload = encode_drive_cmd(9999, 9999) + speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload) + self.assertEqual(speed, 1000) + self.assertEqual(steer, 1000) - def test_large_velocity(self): - # No clamping at the protocol layer — values are sent as-is - payload = encode_velocity_cmd(10.0, -10.0) - left, right = struct.unpack(">ff", payload) - self.assertAlmostEqual(left, 10.0, places=3) - self.assertAlmostEqual(right, -10.0, places=3) + def test_clamping_low(self): + payload = encode_drive_cmd(-9999, -9999) + speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload) + self.assertEqual(speed, -1000) + self.assertEqual(steer, -1000) - def test_payload_is_big_endian(self): - # Sanity check: first 4 bytes encode left speed - payload = encode_velocity_cmd(1.0, 0.0) - (left,) = struct.unpack(">f", payload[:4]) - self.assertAlmostEqual(left, 1.0, places=5) + def test_mode_idle(self): + payload = encode_drive_cmd(0, 0, mode=MODE_IDLE) + speed, steer, mode_byte, flags, pad = struct.unpack(">hhBBH", payload) + self.assertEqual(mode_byte, MODE_IDLE) -class TestModeEncode(unittest.TestCase): - """Tests for encode_mode_cmd.""" +class TestArmEncode(unittest.TestCase): + """Tests for encode_arm_cmd.""" - def test_idle_mode(self): - payload = encode_mode_cmd(MODE_IDLE) - self.assertEqual(payload, b"\x00") + def test_arm(self): + payload = encode_arm_cmd(True) + self.assertEqual(len(payload), 1) + self.assertEqual(payload[0], 0x01) - def test_drive_mode(self): - payload = encode_mode_cmd(MODE_DRIVE) - self.assertEqual(payload, b"\x01") - - def test_estop_mode(self): - payload = encode_mode_cmd(MODE_ESTOP) - self.assertEqual(payload, b"\x02") - - def test_invalid_mode_raises(self): - with self.assertRaises(ValueError): - encode_mode_cmd(99) - - def test_invalid_mode_negative_raises(self): - with self.assertRaises(ValueError): - encode_mode_cmd(-1) + def test_disarm(self): + payload = encode_arm_cmd(False) + self.assertEqual(len(payload), 1) + self.assertEqual(payload[0], 0x00) class TestEstopEncode(unittest.TestCase): """Tests for encode_estop_cmd.""" - def test_estop_assert(self): - self.assertEqual(encode_estop_cmd(True), b"\x01") - - def test_estop_clear(self): - self.assertEqual(encode_estop_cmd(False), b"\x00") - - def test_estop_default_is_stop(self): - self.assertEqual(encode_estop_cmd(), b"\x01") + def test_estop_magic_byte(self): + payload = encode_estop_cmd() + self.assertEqual(len(payload), 1) + self.assertEqual(payload[0], 0xE5) -class TestImuDecodeRoundTrip(unittest.TestCase): - """Round-trip tests for IMU telemetry.""" +class TestAttitudeDecode(unittest.TestCase): + """Tests for decode_attitude (0x400).""" - def _encode_imu(self, ax, ay, az, gx, gy, gz) -> bytes: - return struct.pack(">ffffff", ax, ay, az, gx, gy, gz) + def test_roundtrip(self): + import struct as _s + # Encode a known attitude: pitch=12.5, speed=1.5, yaw_rate=-0.5, state=1, flags=0 + fmt = ">eeeBB" + raw = _s.pack(fmt, 12.5, 1.5, -0.5, 1, 0) + b"\x00" # pad to 8 bytes + att = decode_attitude(raw) + self.assertAlmostEqual(att.pitch_deg, 12.5, places=1) + self.assertAlmostEqual(att.speed, 1.5, places=1) + self.assertEqual(att.state, 1) - def test_zero_imu(self): - data = self._encode_imu(0.0, 0.0, 0.0, 0.0, 0.0, 0.0) - telem = decode_imu_telem(data) - self.assertIsInstance(telem, ImuTelemetry) - self.assertAlmostEqual(telem.accel_x, 0.0, places=5) - self.assertAlmostEqual(telem.gyro_z, 0.0, places=5) - - def test_nominal_imu(self): - data = self._encode_imu(0.1, -0.2, 9.81, 0.01, -0.02, 0.03) - telem = decode_imu_telem(data) - self.assertAlmostEqual(telem.accel_x, 0.1, places=4) - self.assertAlmostEqual(telem.accel_y, -0.2, places=4) - self.assertAlmostEqual(telem.accel_z, 9.81, places=3) - self.assertAlmostEqual(telem.gyro_x, 0.01, places=5) - self.assertAlmostEqual(telem.gyro_y, -0.02, places=5) - self.assertAlmostEqual(telem.gyro_z, 0.03, places=5) - - def test_imu_bad_length_raises(self): - with self.assertRaises(struct.error): - decode_imu_telem(b"\x00" * 10) # too short + def test_too_short_raises(self): + with self.assertRaises(ValueError): + decode_attitude(b"\x00" * 3) -class TestBatteryDecodeRoundTrip(unittest.TestCase): - """Round-trip tests for battery telemetry.""" +class TestBatteryDecode(unittest.TestCase): + """Tests for decode_battery (0x401).""" - def _encode_bat(self, voltage, current) -> bytes: - return struct.pack(">ff", voltage, current) + def test_roundtrip(self): + import struct as _s + raw = _s.pack(">HBb", 24000, 0, -65) + bat = decode_battery(raw) + self.assertEqual(bat.vbat_mv, 24000) + self.assertEqual(bat.fault_code, 0) + self.assertEqual(bat.rssi, -65) - def test_nominal_battery(self): - data = self._encode_bat(24.6, 3.2) - telem = decode_battery_telem(data) - self.assertIsInstance(telem, BatteryTelemetry) - self.assertAlmostEqual(telem.voltage, 24.6, places=3) - self.assertAlmostEqual(telem.current, 3.2, places=4) - - def test_zero_battery(self): - data = self._encode_bat(0.0, 0.0) - telem = decode_battery_telem(data) - self.assertAlmostEqual(telem.voltage, 0.0, places=5) - self.assertAlmostEqual(telem.current, 0.0, places=5) - - def test_max_voltage(self): - # 6S LiPo max ~25.2 V; test with a high value - data = self._encode_bat(25.2, 0.0) - telem = decode_battery_telem(data) - self.assertAlmostEqual(telem.voltage, 25.2, places=3) - - def test_battery_bad_length_raises(self): - with self.assertRaises(struct.error): - decode_battery_telem(b"\x00" * 4) # too short + def test_too_short_raises(self): + with self.assertRaises(ValueError): + decode_battery(b"\x00" * 2) -class TestVescStateDecodeRoundTrip(unittest.TestCase): - """Round-trip tests for VESC state telemetry.""" +class TestVescStatus1Decode(unittest.TestCase): + """Tests for decode_vesc_status1.""" - def _encode_vesc(self, erpm, duty, voltage, current) -> bytes: - return struct.pack(">ffff", erpm, duty, voltage, current) - - def test_nominal_vesc(self): - data = self._encode_vesc(3000.0, 0.25, 24.0, 5.5) - telem = decode_vesc_state(data) - self.assertIsInstance(telem, VescStateTelemetry) - self.assertAlmostEqual(telem.erpm, 3000.0, places=2) - self.assertAlmostEqual(telem.duty, 0.25, places=5) - self.assertAlmostEqual(telem.voltage, 24.0, places=4) - self.assertAlmostEqual(telem.current, 5.5, places=4) - - def test_zero_vesc(self): - data = self._encode_vesc(0.0, 0.0, 0.0, 0.0) - telem = decode_vesc_state(data) - self.assertAlmostEqual(telem.erpm, 0.0) - self.assertAlmostEqual(telem.duty, 0.0) - - def test_reverse_erpm(self): - data = self._encode_vesc(-1500.0, -0.15, 23.0, 2.0) - telem = decode_vesc_state(data) - self.assertAlmostEqual(telem.erpm, -1500.0, places=2) - self.assertAlmostEqual(telem.duty, -0.15, places=5) - - def test_vesc_bad_length_raises(self): - with self.assertRaises(struct.error): - decode_vesc_state(b"\x00" * 8) # too short (need 16) + def test_roundtrip(self): + import struct as _s + erpm = 12000 + cur_x10 = 35 # 3.5 A + duty_x1000 = 450 # 0.45 + raw = _s.pack(">ihh", erpm, cur_x10, duty_x1000) + st = decode_vesc_status1(VESC_LEFT_ID, raw) + self.assertEqual(st.node_id, VESC_LEFT_ID) + self.assertAlmostEqual(st.erpm, 12000.0, places=1) + self.assertAlmostEqual(st.current, 3.5, places=3) + self.assertAlmostEqual(st.duty, 0.45, places=3) -class TestEncodeDecodeIdentity(unittest.TestCase): - """Cross-module identity tests: encode then decode must be identity.""" +class TestModeConstants(unittest.TestCase): + """Tests for mode byte constants.""" - def test_velocity_identity(self): - """encode_velocity_cmd raw bytes must decode to the same floats.""" - left, right = 0.75, -0.3 - payload = encode_velocity_cmd(left, right) - decoded_l, decoded_r = struct.unpack(">ff", payload) - self.assertAlmostEqual(decoded_l, left, places=5) - self.assertAlmostEqual(decoded_r, right, places=5) - - def test_imu_identity(self): - accel = (0.5, -0.5, 9.8) - gyro = (0.1, -0.1, 0.2) - raw = struct.pack(">ffffff", *accel, *gyro) - telem = decode_imu_telem(raw) - self.assertAlmostEqual(telem.accel_x, accel[0], places=4) - self.assertAlmostEqual(telem.accel_y, accel[1], places=4) - self.assertAlmostEqual(telem.accel_z, accel[2], places=3) - self.assertAlmostEqual(telem.gyro_x, gyro[0], places=5) - self.assertAlmostEqual(telem.gyro_y, gyro[1], places=5) - self.assertAlmostEqual(telem.gyro_z, gyro[2], places=5) - - def test_battery_identity(self): - voltage, current = 22.4, 8.1 - raw = struct.pack(">ff", voltage, current) - telem = decode_battery_telem(raw) - self.assertAlmostEqual(telem.voltage, voltage, places=3) - self.assertAlmostEqual(telem.current, current, places=4) - - def test_vesc_identity(self): - erpm, duty, voltage, current = 5000.0, 0.5, 24.5, 10.0 - raw = struct.pack(">ffff", erpm, duty, voltage, current) - telem = decode_vesc_state(raw) - self.assertAlmostEqual(telem.erpm, erpm, places=2) - self.assertAlmostEqual(telem.duty, duty, places=5) - self.assertAlmostEqual(telem.voltage, voltage, places=3) - self.assertAlmostEqual(telem.current, current, places=4) + def test_mode_values(self): + self.assertEqual(MODE_IDLE, 0) + self.assertEqual(MODE_DRIVE, 1) + self.assertEqual(MODE_AUTONOMOUS, 2) if __name__ == "__main__": diff --git a/jetson/ros2_ws/src/saltybot_can_e2e_test/saltybot_can_e2e_test/protocol_defs.py b/jetson/ros2_ws/src/saltybot_can_e2e_test/saltybot_can_e2e_test/protocol_defs.py index 41bdd6c..fcf052d 100644 --- a/jetson/ros2_ws/src/saltybot_can_e2e_test/saltybot_can_e2e_test/protocol_defs.py +++ b/jetson/ros2_ws/src/saltybot_can_e2e_test/saltybot_can_e2e_test/protocol_defs.py @@ -1,69 +1,56 @@ #!/usr/bin/env python3 """ protocol_defs.py — CAN message ID constants and frame builders/parsers for the -Orin↔Mamba↔VESC integration test suite. +Orin <-> ESP32 BALANCE <-> VESC integration test suite. All IDs and payload formats are derived from: - include/orin_can.h — Orin↔FC (Mamba) protocol - include/vesc_can.h — VESC CAN protocol - saltybot_can_bridge/mamba_protocol.py — existing bridge constants + saltybot_can_bridge/balance_protocol.py — canonical codec CAN IDs used in tests --------------------- -Orin → FC (Mamba) commands (standard 11-bit, matching orin_can.h): - ORIN_CMD_HEARTBEAT 0x300 - ORIN_CMD_DRIVE 0x301 int16 speed (−1000..+1000), int16 steer (−1000..+1000) - ORIN_CMD_MODE 0x302 uint8 mode byte - ORIN_CMD_ESTOP 0x303 uint8 action (1=ESTOP, 0=CLEAR) +Orin → ESP32 BALANCE commands (standard 11-bit): + ORIN_CMD_DRIVE 0x300 left_mps (f32 LE) | right_mps (f32 LE) + ORIN_CMD_MODE 0x301 uint8 mode (0=idle, 1=drive, 2=estop) + ORIN_CMD_ESTOP 0x302 uint8 flags (bit0=stop, bit1=clear) + ORIN_CMD_LED 0x303 uint8 pattern | r | g | b -FC (Mamba) → Orin telemetry (standard 11-bit, matching orin_can.h): - FC_STATUS 0x400 8 bytes (see orin_can_fc_status_t) - FC_VESC 0x401 8 bytes (see orin_can_fc_vesc_t) - FC_IMU 0x402 8 bytes - FC_BARO 0x403 8 bytes +ESP32 BALANCE → Orin telemetry (standard 11-bit): + FC_STATUS 0x400 8 bytes (pitch_x10 i16 | motor_cmd u16 | vbat_mv u16 | state u8 | flags u8) + FC_VESC 0x401 8 bytes (l_rpm_x10 i16 | r_rpm_x10 i16 | l_cur_x10 i16 | r_cur_x10 i16) -Mamba ↔ VESC internal commands (matching mamba_protocol.py): - MAMBA_CMD_VELOCITY 0x100 8 bytes left_mps (f32) | right_mps (f32) big-endian - MAMBA_CMD_MODE 0x101 1 byte mode (0=idle,1=drive,2=estop) - MAMBA_CMD_ESTOP 0x102 1 byte 0x01=stop - -VESC STATUS (extended 29-bit, matching vesc_can.h): - arb_id = (VESC_PKT_STATUS << 8) | vesc_node_id = (9 << 8) | node_id - Payload: int32 RPM (BE), int16 current×10 (BE), int16 duty×1000 (BE) +VESC STATUS (extended 29-bit): + arb_id = (VESC_STATUS_1 << 8) | vesc_node_id = (9 << 8) | node_id + Payload: int32 ERPM (BE), int16 current×10 (BE), int16 duty×1000 (BE) """ import struct from typing import Tuple # --------------------------------------------------------------------------- -# Orin → FC (Mamba) command IDs (from orin_can.h) +# Orin → ESP32 BALANCE command IDs # --------------------------------------------------------------------------- -ORIN_CMD_HEARTBEAT: int = 0x300 -ORIN_CMD_DRIVE: int = 0x301 -ORIN_CMD_MODE: int = 0x302 -ORIN_CMD_ESTOP: int = 0x303 +ORIN_CMD_DRIVE: int = 0x300 +ORIN_CMD_MODE: int = 0x301 +ORIN_CMD_ESTOP: int = 0x302 +ORIN_CMD_LED: int = 0x303 # --------------------------------------------------------------------------- -# FC (Mamba) → Orin telemetry IDs (from orin_can.h) +# ESP32 BALANCE → Orin telemetry IDs # --------------------------------------------------------------------------- FC_STATUS: int = 0x400 FC_VESC: int = 0x401 -FC_IMU: int = 0x402 -FC_BARO: int = 0x403 # --------------------------------------------------------------------------- -# Mamba → VESC internal command IDs (from mamba_protocol.py) +# VESC constants # --------------------------------------------------------------------------- -MAMBA_CMD_VELOCITY: int = 0x100 -MAMBA_CMD_MODE: int = 0x101 -MAMBA_CMD_ESTOP: int = 0x102 +VESC_STATUS_1: int = 9 # STATUS_1 packet type (upper byte of arb_id) +VESC_CMD_SET_RPM: int = 3 -MAMBA_TELEM_IMU: int = 0x200 -MAMBA_TELEM_BATTERY: int = 0x201 -VESC_TELEM_STATE: int = 0x300 +VESC_CAN_ID_LEFT: int = 56 +VESC_CAN_ID_RIGHT: int = 68 # --------------------------------------------------------------------------- # Mode constants @@ -73,242 +60,111 @@ MODE_IDLE: int = 0 MODE_DRIVE: int = 1 MODE_ESTOP: int = 2 -# --------------------------------------------------------------------------- -# VESC protocol constants (from vesc_can.h) -# --------------------------------------------------------------------------- -VESC_PKT_STATUS: int = 9 # STATUS packet type (upper byte of arb_id) -VESC_PKT_SET_RPM: int = 3 # SET_RPM packet type - -VESC_CAN_ID_LEFT: int = 56 -VESC_CAN_ID_RIGHT: int = 68 - - -def VESC_STATUS_ID(vesc_node_id: int) -> int: - """ - Return the 29-bit extended arbitration ID for a VESC STATUS frame. - - Formula (from vesc_can.h): arb_id = (VESC_PKT_STATUS << 8) | vesc_node_id - = (9 << 8) | vesc_node_id - """ - return (VESC_PKT_STATUS << 8) | vesc_node_id - - -def VESC_SET_RPM_ID(vesc_node_id: int) -> int: - """ - Return the 29-bit extended arbitration ID for a VESC SET_RPM command. - - Formula: arb_id = (VESC_PKT_SET_RPM << 8) | vesc_node_id - = (3 << 8) | vesc_node_id - """ - return (VESC_PKT_SET_RPM << 8) | vesc_node_id +def VESC_STATUS1_ID(vesc_node_id: int) -> int: + """Return the 29-bit extended arbitration ID for a VESC STATUS_1 frame.""" + return (VESC_STATUS_1 << 8) | vesc_node_id # --------------------------------------------------------------------------- -# Frame builders — Orin → FC +# Frame builders — Orin → ESP32 BALANCE # --------------------------------------------------------------------------- -def build_heartbeat(seq: int = 0) -> bytes: - """Build a HEARTBEAT payload: uint32 sequence counter (big-endian, 4 bytes).""" - return struct.pack(">I", seq & 0xFFFFFFFF) - - -def build_drive_cmd(speed: int, steer: int) -> bytes: - """ - Build an ORIN_CMD_DRIVE payload. - - Parameters - ---------- - speed: int, −1000..+1000 (mapped directly to int16) - steer: int, −1000..+1000 - """ - return struct.pack(">hh", int(speed), int(steer)) +def build_drive_cmd(left_mps: float, right_mps: float) -> bytes: + """Build ORIN_CMD_DRIVE payload (8 bytes, 2 × float32 little-endian).""" + return struct.pack(" bytes: - """Build an ORIN_CMD_MODE payload (1 byte).""" - return struct.pack(">B", mode & 0xFF) + """Build ORIN_CMD_MODE payload (1 byte).""" + return struct.pack("B", mode & 0xFF) -def build_estop_cmd(action: int = 1) -> bytes: - """Build an ORIN_CMD_ESTOP payload. action=1 → ESTOP, 0 → CLEAR.""" - return struct.pack(">B", action & 0xFF) +def build_estop_cmd(stop: bool = True, clear: bool = False) -> bytes: + """Build ORIN_CMD_ESTOP payload (1 byte flags).""" + return struct.pack("B", (0x01 if stop else 0) | (0x02 if clear else 0)) # --------------------------------------------------------------------------- -# Frame builders — Mamba velocity commands (mamba_protocol.py encoding) -# --------------------------------------------------------------------------- - -def build_velocity_cmd(left_mps: float, right_mps: float) -> bytes: - """ - Build a MAMBA_CMD_VELOCITY payload (8 bytes, 2 × float32 big-endian). - - Matches encode_velocity_cmd() in mamba_protocol.py. - """ - return struct.pack(">ff", float(left_mps), float(right_mps)) - - -# --------------------------------------------------------------------------- -# Frame builders — FC → Orin telemetry +# Frame builders — ESP32 BALANCE → Orin telemetry # --------------------------------------------------------------------------- def build_fc_status( pitch_x10: int = 0, motor_cmd: int = 0, vbat_mv: int = 24000, - balance_state: int = 1, + state: int = 1, flags: int = 0, ) -> bytes: - """ - Build an FC_STATUS (0x400) payload. - - Layout (orin_can_fc_status_t, 8 bytes, big-endian): - int16 pitch_x10 - int16 motor_cmd - uint16 vbat_mv - uint8 balance_state - uint8 flags [bit0=estop_active, bit1=armed] - """ + """Build FC_STATUS (0x400) payload (8 bytes, big-endian).""" return struct.pack( - ">hhHBB", + ">hHHBB", int(pitch_x10), - int(motor_cmd), + int(motor_cmd) & 0xFFFF, int(vbat_mv) & 0xFFFF, - int(balance_state) & 0xFF, + int(state) & 0xFF, int(flags) & 0xFF, ) def build_fc_vesc( - left_rpm_x10: int = 0, - right_rpm_x10: int = 0, - left_current_x10: int = 0, - right_current_x10: int = 0, + l_rpm_x10: int = 0, + r_rpm_x10: int = 0, + l_cur_x10: int = 0, + r_cur_x10: int = 0, ) -> bytes: - """ - Build an FC_VESC (0x401) payload. - - Layout (orin_can_fc_vesc_t, 8 bytes, big-endian): - int16 left_rpm_x10 - int16 right_rpm_x10 - int16 left_current_x10 - int16 right_current_x10 - - RPM values are RPM / 10 (e.g. 3000 RPM → 300). - Current values are A × 10 (e.g. 5.5 A → 55). - """ + """Build FC_VESC (0x401) payload (8 bytes, big-endian).""" return struct.pack( ">hhhh", - int(left_rpm_x10), - int(right_rpm_x10), - int(left_current_x10), - int(right_current_x10), + int(l_rpm_x10), + int(r_rpm_x10), + int(l_cur_x10), + int(r_cur_x10), ) -def build_vesc_status( - rpm: int = 0, +def build_vesc_status1( + erpm: int = 0, current_x10: int = 0, duty_x1000: int = 0, ) -> bytes: - """ - Build a VESC STATUS (packet type 9) payload. - - Layout (from vesc_can.h / VESC FW 6.x, big-endian): - int32 rpm - int16 current × 10 - int16 duty × 1000 - Total: 8 bytes. - """ - return struct.pack( - ">ihh", - int(rpm), - int(current_x10), - int(duty_x1000), - ) + """Build VESC STATUS_1 payload (8 bytes, big-endian).""" + return struct.pack(">ihh", int(erpm), int(current_x10), int(duty_x1000)) # --------------------------------------------------------------------------- # Frame parsers # --------------------------------------------------------------------------- -def parse_fc_status(data: bytes): - """ - Parse an FC_STATUS (0x400) payload. +def parse_drive_cmd(data: bytes) -> Tuple[float, float]: + """Parse ORIN_CMD_DRIVE payload → (left_mps, right_mps).""" + if len(data) < 8: + raise ValueError(f"ORIN_CMD_DRIVE needs 8 bytes, got {len(data)}") + return struct.unpack("hhHBB", data[:8] - ) + pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack(">hHHBB", data[:8]) return { "pitch_x10": pitch_x10, + "pitch_deg": pitch_x10 / 10.0, "motor_cmd": motor_cmd, "vbat_mv": vbat_mv, - "balance_state": balance_state, + "state": state, "flags": flags, - "estop_active": bool(flags & 0x01), - "armed": bool(flags & 0x02), } -def parse_fc_vesc(data: bytes): - """ - Parse an FC_VESC (0x401) payload. - - Returns - ------- - dict with keys: left_rpm_x10, right_rpm_x10, left_current_x10, - right_current_x10, left_rpm (float), right_rpm (float) - """ +def parse_vesc_status1(data: bytes): + """Parse VESC STATUS_1 payload.""" if len(data) < 8: - raise ValueError(f"FC_VESC needs 8 bytes, got {len(data)}") - left_rpm_x10, right_rpm_x10, left_cur_x10, right_cur_x10 = struct.unpack( - ">hhhh", data[:8] - ) + raise ValueError(f"VESC STATUS_1 needs 8 bytes, got {len(data)}") + erpm, cur_x10, duty_x1000 = struct.unpack(">ihh", data[:8]) return { - "left_rpm_x10": left_rpm_x10, - "right_rpm_x10": right_rpm_x10, - "left_current_x10": left_cur_x10, - "right_current_x10": right_cur_x10, - "left_rpm": left_rpm_x10 * 10.0, - "right_rpm": right_rpm_x10 * 10.0, - } - - -def parse_vesc_status(data: bytes): - """ - Parse a VESC STATUS (packet type 9) payload. - - Returns - ------- - dict with keys: rpm, current_x10, duty_x1000, current (float), duty (float) - """ - if len(data) < 8: - raise ValueError(f"VESC STATUS needs 8 bytes, got {len(data)}") - rpm, current_x10, duty_x1000 = struct.unpack(">ihh", data[:8]) - return { - "rpm": rpm, - "current_x10": current_x10, - "duty_x1000": duty_x1000, - "current": current_x10 / 10.0, + "erpm": erpm, + "current": cur_x10 / 10.0, "duty": duty_x1000 / 1000.0, } - - -def parse_velocity_cmd(data: bytes) -> Tuple[float, float]: - """ - Parse a MAMBA_CMD_VELOCITY payload (8 bytes, 2 × float32 big-endian). - - Returns - ------- - (left_mps, right_mps) - """ - if len(data) < 8: - raise ValueError(f"MAMBA_CMD_VELOCITY needs 8 bytes, got {len(data)}") - return struct.unpack(">ff", data[:8]) diff --git a/jetson/ros2_ws/src/saltybot_can_e2e_test/setup.py b/jetson/ros2_ws/src/saltybot_can_e2e_test/setup.py index eff1956..c499830 100644 --- a/jetson/ros2_ws/src/saltybot_can_e2e_test/setup.py +++ b/jetson/ros2_ws/src/saltybot_can_e2e_test/setup.py @@ -1,20 +1,20 @@ -from setuptools import setup +from setuptools import find_packages, setup package_name = "saltybot_can_e2e_test" setup( name=package_name, - version="0.1.0", - packages=[package_name], + version="0.0.1", + packages=find_packages(exclude=["test"]), data_files=[ - ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), - (f"share/{package_name}", ["package.xml"]), + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), ], install_requires=["setuptools"], zip_safe=True, - maintainer="sl-jetson", - maintainer_email="sl-jetson@saltylab.local", - description="End-to-end CAN integration tests for Orin↔Mamba↔VESC full loop", + maintainer="seb", + maintainer_email="seb@example.com", + description="Orin <-> ESP32 BALANCE <-> VESC end-to-end CAN integration tests", license="MIT", tests_require=["pytest"], entry_points={ diff --git a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/conftest.py b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/conftest.py index db8c471..1967cee 100644 --- a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/conftest.py +++ b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/conftest.py @@ -1,93 +1,27 @@ -#!/usr/bin/env python3 """ -conftest.py — pytest fixtures for the saltybot_can_e2e_test suite. - -No ROS2 node infrastructure is started; all tests run purely in Python. +conftest.py — shared pytest fixtures for saltybot CAN end-to-end tests. """ -import sys -import os - -# Ensure the package root is on sys.path so relative imports work when running -# pytest directly from the saltybot_can_e2e_test/ directory. -_pkg_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -if _pkg_root not in sys.path: - sys.path.insert(0, _pkg_root) - -# Also add the saltybot_can_bridge package so we can import mamba_protocol. -_bridge_pkg = os.path.join( - os.path.dirname(_pkg_root), "saltybot_can_bridge" -) -if _bridge_pkg not in sys.path: - sys.path.insert(0, _bridge_pkg) - import pytest -from saltybot_can_e2e_test.can_mock import MockCANBus -from saltybot_can_e2e_test.protocol_defs import ( - VESC_CAN_ID_LEFT, - VESC_CAN_ID_RIGHT, +from saltybot_can_bridge.balance_protocol import ( + encode_drive_cmd, + encode_mode_cmd, + encode_estop_cmd, + decode_fc_status, + decode_fc_vesc, + decode_vesc_status1, ) -# --------------------------------------------------------------------------- -# Core fixtures -# --------------------------------------------------------------------------- - - -@pytest.fixture(scope="function") -def mock_can_bus(): - """ - Provide a fresh MockCANBus instance per test function. - The bus is automatically shut down after each test. - """ - bus = MockCANBus(loopback=False) - yield bus - bus.shutdown() - - -@pytest.fixture(scope="function") -def loopback_can_bus(): - """ - MockCANBus in loopback mode — sent frames are also queued for recv. - Useful for testing round-trip behaviour without a second node. - """ - bus = MockCANBus(loopback=True) - yield bus - bus.shutdown() - - -@pytest.fixture(scope="function") +@pytest.fixture def bridge_components(): - """ - Return the mamba_protocol encode/decode callables and a fresh mock bus. - - Yields a dict with keys: - bus — MockCANBus instance - encode_vel — encode_velocity_cmd(left, right) → bytes - encode_mode — encode_mode_cmd(mode) → bytes - encode_estop — encode_estop_cmd(stop) → bytes - decode_vesc — decode_vesc_state(data) → VescStateTelemetry - """ - from saltybot_can_bridge.mamba_protocol import ( - encode_velocity_cmd, - encode_mode_cmd, - encode_estop_cmd, - decode_vesc_state, - decode_battery_telem, - decode_imu_telem, - ) - - bus = MockCANBus(loopback=False) + """Return a dict of encode/decode helpers from balance_protocol.""" yield { - "bus": bus, - "encode_vel": encode_velocity_cmd, + "encode_drive": encode_drive_cmd, "encode_mode": encode_mode_cmd, "encode_estop": encode_estop_cmd, - "decode_vesc": decode_vesc_state, - "decode_battery": decode_battery_telem, - "decode_imu": decode_imu_telem, - "left_vesc_id": VESC_CAN_ID_LEFT, - "right_vesc_id": VESC_CAN_ID_RIGHT, + "decode_fc_status": decode_fc_status, + "decode_fc_vesc": decode_fc_vesc, + "decode_vesc": decode_vesc_status1, } - bus.shutdown() diff --git a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_drive_command.py b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_drive_command.py index dfbee87..65aa561 100644 --- a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_drive_command.py +++ b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_drive_command.py @@ -1,193 +1,47 @@ -#!/usr/bin/env python3 """ -test_drive_command.py — Integration tests for the drive command path. - -Tests verify: - DRIVE cmd → Mamba receives velocity command frame → mock VESC status response - → FC_VESC broadcast contains correct RPMs. - -All tests run without real hardware or a running ROS2 system. -Run with: python -m pytest test/test_drive_command.py -v +test_drive_command.py — E2E tests for ORIN_CMD_DRIVE (0x300) encoding. """ import struct import pytest from saltybot_can_e2e_test.protocol_defs import ( - MAMBA_CMD_VELOCITY, - MAMBA_CMD_MODE, - FC_VESC, - MODE_DRIVE, + ORIN_CMD_DRIVE, + build_drive_cmd, + parse_drive_cmd, MODE_IDLE, - VESC_CAN_ID_LEFT, - VESC_CAN_ID_RIGHT, - VESC_STATUS_ID, - build_velocity_cmd, - build_fc_vesc, - build_vesc_status, - parse_velocity_cmd, - parse_fc_vesc, -) -from saltybot_can_bridge.mamba_protocol import ( - encode_velocity_cmd, - encode_mode_cmd, + MODE_DRIVE, ) -# --------------------------------------------------------------------------- -# Helper -# --------------------------------------------------------------------------- +class TestDriveCommandEncoding: -def _send_drive(bus, left_mps: float, right_mps: float) -> None: - """Simulate the bridge encoding and sending a velocity command.""" - from saltybot_can_e2e_test.can_mock import MockCANBus + def test_frame_id(self): + assert ORIN_CMD_DRIVE == 0x300 - payload = encode_velocity_cmd(left_mps, right_mps) - # Create a minimal message object compatible with our mock - class _Msg: - def __init__(self, arb_id, data): - self.arbitration_id = arb_id - self.data = bytearray(data) - self.is_extended_id = False - - bus.send(_Msg(MAMBA_CMD_VELOCITY, payload)) - bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE))) - - -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- - -class TestDriveForward: - def test_drive_forward_velocity_frame_sent(self, mock_can_bus): - """ - Inject DRIVE cmd (1.0 m/s, 1.0 m/s) → verify Mamba receives - a MAMBA_CMD_VELOCITY frame with correct payload. - """ - _send_drive(mock_can_bus, 1.0, 1.0) - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 1, "Expected exactly one velocity command frame" - - left, right = parse_velocity_cmd(bytes(vel_frames[0].data)) - assert abs(left - 1.0) < 1e-4, f"Left speed {left} != 1.0" - assert abs(right - 1.0) < 1e-4, f"Right speed {right} != 1.0" - - def test_drive_forward_mode_drive_sent(self, mock_can_bus): - """After a drive command, a MODE=drive frame must accompany it.""" - _send_drive(mock_can_bus, 1.0, 1.0) - - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert len(mode_frames) >= 1, "Expected at least one MODE frame" - assert bytes(mode_frames[0].data) == bytes([MODE_DRIVE]) - - def test_drive_forward_fc_vesc_broadcast(self, mock_can_bus): - """ - Simulate FC_VESC broadcast arriving after drive cmd; verify parse is correct. - (In the real loop Mamba computes RPM from m/s and broadcasts FC_VESC.) - This test checks the FC_VESC frame format and parser. - """ - # Simulate: 1.0 m/s → ~300 RPM × 10 = 3000 (representative, not physics) - fc_payload = build_fc_vesc( - left_rpm_x10=300, right_rpm_x10=300, - left_current_x10=50, right_current_x10=50, - ) - mock_can_bus.inject(FC_VESC, fc_payload) - - frame = mock_can_bus.recv(timeout=0.1) - assert frame is not None, "FC_VESC frame not received" - parsed = parse_fc_vesc(bytes(frame.data)) - assert parsed["left_rpm_x10"] == 300 - assert parsed["right_rpm_x10"] == 300 - assert abs(parsed["left_rpm"] - 3000.0) < 0.1 - - -class TestDriveTurn: - def test_drive_turn_differential_rpm(self, mock_can_bus): - """ - DRIVE cmd (0.5, −0.5) → verify differential RPM in velocity command. - """ - _send_drive(mock_can_bus, 0.5, -0.5) - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 1 - - left, right = parse_velocity_cmd(bytes(vel_frames[0].data)) - assert abs(left - 0.5) < 1e-4, f"Left speed {left} != 0.5" - assert abs(right - (-0.5)) < 1e-4, f"Right speed {right} != -0.5" - # Signs must be opposite for a zero-radius spin - assert left > 0 and right < 0 - - def test_drive_turn_fc_vesc_differential(self, mock_can_bus): - """Simulated FC_VESC for a turn has opposite-sign RPMs.""" - fc_payload = build_fc_vesc( - left_rpm_x10=150, right_rpm_x10=-150, - left_current_x10=30, right_current_x10=30, - ) - mock_can_bus.inject(FC_VESC, fc_payload) - frame = mock_can_bus.recv(timeout=0.1) - parsed = parse_fc_vesc(bytes(frame.data)) - assert parsed["left_rpm_x10"] > 0 - assert parsed["right_rpm_x10"] < 0 - - -class TestDriveZero: - def test_drive_zero_stops_motors(self, mock_can_bus): - """ - After a non-zero drive cmd, sending zero velocity must result in - RPM=0 being commanded to both VESCs. - """ - _send_drive(mock_can_bus, 1.0, 1.0) - mock_can_bus.reset() # clear prior frames - - _send_drive(mock_can_bus, 0.0, 0.0) - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 1 - left, right = parse_velocity_cmd(bytes(vel_frames[0].data)) - assert abs(left) < 1e-5, "Left motor not stopped" - assert abs(right) < 1e-5, "Right motor not stopped" - - -class TestDriveCmdTimeout: - def test_drive_cmd_timeout_sends_zero(self, mock_can_bus): - """ - Simulate the watchdog behaviour: if no DRIVE cmd arrives for >500 ms, - zero velocity is sent. We test the encoding directly (without timers). - """ - # The watchdog in CanBridgeNode calls encode_velocity_cmd(0.0, 0.0) and - # sends it on MAMBA_CMD_VELOCITY. Replicate that here. - zero_payload = encode_velocity_cmd(0.0, 0.0) - - class _Msg: - def __init__(self, arb_id, data): - self.arbitration_id = arb_id - self.data = bytearray(data) - self.is_extended_id = False - - mock_can_bus.send(_Msg(MAMBA_CMD_VELOCITY, zero_payload)) - mock_can_bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE))) - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 1 - left, right = parse_velocity_cmd(bytes(vel_frames[0].data)) + def test_zero_drive(self): + payload = build_drive_cmd(0.0, 0.0) + left, right = parse_drive_cmd(payload) assert abs(left) < 1e-5 assert abs(right) < 1e-5 - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert len(mode_frames) == 1 - assert bytes(mode_frames[0].data) == bytes([MODE_IDLE]) + def test_forward(self): + payload = build_drive_cmd(1.5, 1.5) + left, right = parse_drive_cmd(payload) + assert abs(left - 1.5) < 1e-4 + assert abs(right - 1.5) < 1e-4 + def test_differential(self): + payload = build_drive_cmd(-0.5, 0.5) + left, right = parse_drive_cmd(payload) + assert abs(left - (-0.5)) < 1e-4 + assert abs(right - 0.5) < 1e-4 -@pytest.mark.parametrize("left_mps,right_mps", [ - (0.5, 0.5), - (1.0, 0.0), - (0.0, -1.0), - (-0.5, -0.5), -]) -def test_drive_cmd_payload_roundtrip(mock_can_bus, left_mps, right_mps): - """Parametrized: encode then decode must recover original velocities.""" - payload = encode_velocity_cmd(left_mps, right_mps) - l, r = parse_velocity_cmd(payload) - assert abs(l - left_mps) < 1e-4 - assert abs(r - right_mps) < 1e-4 + def test_payload_is_little_endian(self): + # First 4 bytes encode left speed in LE float32 + payload = build_drive_cmd(1.0, 0.0) + (left,) = struct.unpack(" None: - """Send ESTOP and transition to estop mode.""" - self._estop_active = True - self._mode = MODE_ESTOP - self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.0, 0.0))) - self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP))) - self._bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(True))) - - def clear_estop(self) -> None: - """Clear ESTOP and return to IDLE mode.""" - self._estop_active = False - self._mode = MODE_IDLE - self._bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(False))) - self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE))) - - def send_drive(self, left_mps: float, right_mps: float) -> None: - """Send velocity command only if ESTOP is not active.""" - if self._estop_active: - # Bridge silently drops commands while estopped - return - self._mode = MODE_DRIVE - self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(left_mps, right_mps))) - self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE))) - - @property - def estop_active(self) -> bool: - return self._estop_active - - -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- - -class TestEstopHaltsMotors: - def test_estop_command_halts_motors(self, mock_can_bus): - """ - After injecting ESTOP, zero velocity must be commanded immediately. - """ - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) >= 1, "No velocity frame after ESTOP" - l, r = parse_velocity_cmd(bytes(vel_frames[-1].data)) - assert abs(l) < 1e-5, f"Left motor {l} not zero after ESTOP" - assert abs(r) < 1e-5, f"Right motor {r} not zero after ESTOP" - - def test_estop_mode_frame_sent(self, mock_can_bus): - """ESTOP mode byte must be broadcast on CAN.""" - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert any( - bytes(f.data) == bytes([MODE_ESTOP]) for f in mode_frames - ), "MODE=ESTOP not found in sent frames" - - def test_estop_flag_byte_is_0x01(self, mock_can_bus): - """MAMBA_CMD_ESTOP payload must be 0x01 when asserting e-stop.""" - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - - estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP) - assert len(estop_frames) >= 1 - assert bytes(estop_frames[-1].data) == b"\x01", \ - f"ESTOP payload {estop_frames[-1].data!r} != 0x01" - - -class TestEstopPersists: - def test_estop_persists_after_drive_cmd(self, mock_can_bus): - """ - After ESTOP, injecting a DRIVE command must NOT forward velocity to the bus. - """ - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - mock_can_bus.reset() # start fresh after initial ESTOP frames - - sm.send_drive(1.0, 1.0) # should be suppressed - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 0, \ - "Velocity command was forwarded while ESTOP is active" - - def test_estop_mode_unchanged_after_drive_attempt(self, mock_can_bus): - """ - After ESTOP, attempting DRIVE must not change the mode to DRIVE. - """ - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - mock_can_bus.reset() - - sm.send_drive(0.5, 0.5) - - # No mode frames should have been emitted (drive was suppressed) - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert all( - bytes(f.data) != bytes([MODE_DRIVE]) for f in mode_frames - ), "MODE=DRIVE was set despite active ESTOP" + def test_stop_bit_clear_not_set(self): + payload = build_estop_cmd(stop=True, clear=False) + assert payload[0] & 0x02 == 0x00 class TestEstopClear: - def test_estop_clear_restores_drive(self, mock_can_bus): - """After ESTOP_CLEAR, drive commands must be accepted again.""" - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - sm.clear_estop() - mock_can_bus.reset() - sm.send_drive(0.8, 0.8) + def test_clear_bit_set(self): + payload = build_estop_cmd(stop=False, clear=True) + assert payload[0] & 0x02 == 0x02 - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 1, "Velocity command not sent after ESTOP clear" - l, r = parse_velocity_cmd(bytes(vel_frames[0].data)) - assert abs(l - 0.8) < 1e-4 - assert abs(r - 0.8) < 1e-4 - - def test_estop_clear_flag_byte_is_0x00(self, mock_can_bus): - """MAMBA_CMD_ESTOP payload must be 0x00 when clearing e-stop.""" - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - sm.clear_estop() - - estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP) - assert len(estop_frames) >= 2 - # Last ESTOP frame should be the clear - assert bytes(estop_frames[-1].data) == b"\x00", \ - f"ESTOP clear payload {estop_frames[-1].data!r} != 0x00" - - def test_estop_clear_mode_returns_to_idle(self, mock_can_bus): - """After clearing ESTOP, the mode frame must be MODE_IDLE.""" - sm = EstopStateMachine(mock_can_bus) - sm.assert_estop() - sm.clear_estop() - - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - last_mode = bytes(mode_frames[-1].data) - assert last_mode == bytes([MODE_IDLE]), \ - f"Mode after ESTOP clear is {last_mode!r}, expected MODE_IDLE" + def test_stop_not_set_on_clear(self): + payload = build_estop_cmd(stop=False, clear=True) + assert payload[0] & 0x01 == 0x00 -class TestFirmwareSideEstop: - def test_fc_status_estop_flag_detected(self, mock_can_bus): - """ - Simulate firmware sending estop via FC_STATUS flags (bit0=estop_active). - Verify the Orin bridge side correctly parses the flag. - """ - # Build FC_STATUS with estop_active bit set (flags=0x01) - payload = build_fc_status( - pitch_x10=0, - motor_cmd=0, - vbat_mv=24000, - balance_state=2, # TILT_FAULT - flags=0x01, # bit0 = estop_active - ) - mock_can_bus.inject(FC_STATUS, payload) +class TestAttitudeEstopFlag: - frame = mock_can_bus.recv(timeout=0.1) - assert frame is not None, "FC_STATUS frame not received" - parsed = parse_fc_status(bytes(frame.data)) - assert parsed["estop_active"] is True, \ - "estop_active flag not set in FC_STATUS" - assert parsed["balance_state"] == 2 + def test_estop_active_flag_in_fc_status(self): + # FC_STATUS flags bit0 = estop active + raw = build_fc_status(flags=0x01) + status = parse_fc_status(raw) + assert status["flags"] & 0x01 == 0x01 - def test_fc_status_no_estop_flag(self, mock_can_bus): - """FC_STATUS with flags=0x00 must NOT set estop_active.""" - payload = build_fc_status(flags=0x00) - mock_can_bus.inject(FC_STATUS, payload) - frame = mock_can_bus.recv(timeout=0.1) - parsed = parse_fc_status(bytes(frame.data)) - assert parsed["estop_active"] is False - - def test_fc_status_armed_flag_detected(self, mock_can_bus): - """FC_STATUS flags bit1=armed must parse correctly.""" - payload = build_fc_status(flags=0x02) # bit1 = armed - mock_can_bus.inject(FC_STATUS, payload) - frame = mock_can_bus.recv(timeout=0.1) - parsed = parse_fc_status(bytes(frame.data)) - assert parsed["armed"] is True - assert parsed["estop_active"] is False - - def test_fc_status_roundtrip(self, mock_can_bus): - """build_fc_status → inject → recv → parse_fc_status must be identity.""" - payload = build_fc_status( - pitch_x10=150, - motor_cmd=-200, - vbat_mv=23800, - balance_state=1, - flags=0x03, - ) - mock_can_bus.inject(FC_STATUS, payload) - frame = mock_can_bus.recv(timeout=0.1) - parsed = parse_fc_status(bytes(frame.data)) - assert parsed["pitch_x10"] == 150 - assert parsed["motor_cmd"] == -200 - assert parsed["vbat_mv"] == 23800 - assert parsed["balance_state"] == 1 - assert parsed["estop_active"] is True - assert parsed["armed"] is True + def test_no_estop_flag(self): + raw = build_fc_status(flags=0x00) + status = parse_fc_status(raw) + assert status["flags"] & 0x01 == 0x00 diff --git a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_fc_vesc_broadcast.py b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_fc_vesc_broadcast.py index a7e9f9a..b10a68a 100644 --- a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_fc_vesc_broadcast.py +++ b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_fc_vesc_broadcast.py @@ -1,315 +1,78 @@ -#!/usr/bin/env python3 """ -test_fc_vesc_broadcast.py — FC_VESC broadcast and VESC STATUS integration tests. +test_fc_vesc_broadcast.py — E2E tests for ESP32 BALANCE telemetry parsing. Covers: - - VESC STATUS extended frame for left VESC (ID 56) triggers FC_VESC broadcast - - Both left (56) and right (68) VESC STATUS combined in FC_VESC - - FC_VESC broadcast rate (~10 Hz) - - current_x10 scaling matches protocol spec - -No ROS2 or real CAN hardware required. -Run with: python -m pytest test/test_fc_vesc_broadcast.py -v +- VESC STATUS_1 extended frames (arb_id = (9 << 8) | node_id) +- FC_STATUS (0x400) attitude/battery frame +- FC_VESC (0x401) VESC RPM/current broadcast frame """ import struct -import time -import threading import pytest -from saltybot_can_e2e_test.can_mock import MockCANBus from saltybot_can_e2e_test.protocol_defs import ( + FC_STATUS, FC_VESC, VESC_CAN_ID_LEFT, VESC_CAN_ID_RIGHT, - VESC_STATUS_ID, - VESC_SET_RPM_ID, - VESC_TELEM_STATE, - build_vesc_status, + VESC_STATUS1_ID, + build_fc_status, build_fc_vesc, - parse_fc_vesc, - parse_vesc_status, -) -from saltybot_can_bridge.mamba_protocol import ( - VESC_TELEM_STATE as BRIDGE_VESC_TELEM_STATE, - decode_vesc_state, + build_vesc_status1, + parse_fc_status, + parse_vesc_status1, ) -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- +class TestVescStatusParsing: -class VescStatusAggregator: - """ - Simulates the firmware logic that: - 1. Receives VESC STATUS extended frames from left/right VESCs - 2. Builds an FC_VESC broadcast payload - 3. Injects the FC_VESC frame onto the mock bus + def test_status1_id_left(self): + arb_id = VESC_STATUS1_ID(VESC_CAN_ID_LEFT) + assert arb_id == (9 << 8) | 56 - This represents the Mamba → Orin telemetry path. - """ + def test_status1_id_right(self): + arb_id = VESC_STATUS1_ID(VESC_CAN_ID_RIGHT) + assert arb_id == (9 << 8) | 68 - def __init__(self, bus: MockCANBus): - self._bus = bus - self._left_rpm_x10 = 0 - self._right_rpm_x10 = 0 - self._left_current_x10 = 0 - self._right_current_x10 = 0 - self._left_seen = False - self._right_seen = False + def test_parse_vesc_status1_nominal(self): + raw = build_vesc_status1(erpm=3000, current_x10=55, duty_x1000=250) + result = parse_vesc_status1(raw) + assert abs(result["erpm"] - 3000) < 1 + assert abs(result["current"] - 5.5) < 0.01 + assert abs(result["duty"] - 0.25) < 0.001 - def process_vesc_status(self, arb_id: int, data: bytes) -> None: - """ - Process an incoming VESC STATUS frame (extended 29-bit ID). - Updates internal state; broadcasts FC_VESC when at least one side is known. - """ - node_id = arb_id & 0xFF - parsed = parse_vesc_status(data) - rpm_x10 = parsed["rpm"] // 10 # convert full RPM to RPM/10 - - if node_id == VESC_CAN_ID_LEFT: - self._left_rpm_x10 = rpm_x10 - self._left_current_x10 = parsed["current_x10"] - self._left_seen = True - elif node_id == VESC_CAN_ID_RIGHT: - self._right_rpm_x10 = rpm_x10 - self._right_current_x10 = parsed["current_x10"] - self._right_seen = True - - # Broadcast FC_VESC whenever we receive any update - self._broadcast_fc_vesc() - - def _broadcast_fc_vesc(self) -> None: - payload = build_fc_vesc( - left_rpm_x10=self._left_rpm_x10, - right_rpm_x10=self._right_rpm_x10, - left_current_x10=self._left_current_x10, - right_current_x10=self._right_current_x10, - ) - self._bus.inject(FC_VESC, payload) + def test_parse_vesc_status1_reverse(self): + raw = build_vesc_status1(erpm=-1500, current_x10=-20, duty_x1000=-150) + result = parse_vesc_status1(raw) + assert result["erpm"] == -1500 + assert abs(result["duty"] - (-0.15)) < 0.001 -def _inject_vesc_status(bus: MockCANBus, vesc_id: int, rpm: int, - current_x10: int = 50, duty_x1000: int = 250) -> None: - """Inject a VESC STATUS extended frame for the given node ID.""" - arb_id = VESC_STATUS_ID(vesc_id) - payload = build_vesc_status(rpm=rpm, current_x10=current_x10, duty_x1000=duty_x1000) - bus.inject(arb_id, payload, is_extended_id=True) +class TestFcStatusTelemetry: + + def test_frame_id(self): + assert FC_STATUS == 0x400 + + def test_nominal(self): + raw = build_fc_status(pitch_x10=125, motor_cmd=300, vbat_mv=24000, state=1, flags=0) + status = parse_fc_status(raw) + assert abs(status["pitch_deg"] - 12.5) < 0.01 + assert status["vbat_mv"] == 24000 + assert status["state"] == 1 + + def test_zero(self): + raw = build_fc_status(pitch_x10=0, motor_cmd=0, vbat_mv=0, state=0, flags=0) + status = parse_fc_status(raw) + assert status["pitch_x10"] == 0 -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- +class TestFcVescTelemetry: -class TestVescStatusToFcVesc: - def test_left_vesc_status_triggers_broadcast(self, mock_can_bus): - """ - Inject VESC STATUS for left VESC (ID 56) → verify FC_VESC contains - the correct left RPM (rpm / 10). - """ - agg = VescStatusAggregator(mock_can_bus) + def test_frame_id(self): + assert FC_VESC == 0x401 - # Left VESC: 3000 RPM → rpm_x10 = 300 - arb_id = VESC_STATUS_ID(VESC_CAN_ID_LEFT) - payload = build_vesc_status(rpm=3000, current_x10=55) - agg.process_vesc_status(arb_id, payload) - - frame = mock_can_bus.recv(timeout=0.1) - assert frame is not None, "No FC_VESC broadcast after left VESC STATUS" - parsed = parse_fc_vesc(bytes(frame.data)) - assert parsed["left_rpm_x10"] == 300, \ - f"left_rpm_x10 {parsed['left_rpm_x10']} != 300" - assert abs(parsed["left_rpm"] - 3000.0) < 1.0 - - def test_right_vesc_status_triggers_broadcast(self, mock_can_bus): - """Inject VESC STATUS for right VESC (ID 68) → verify right RPM in FC_VESC.""" - agg = VescStatusAggregator(mock_can_bus) - - arb_id = VESC_STATUS_ID(VESC_CAN_ID_RIGHT) - payload = build_vesc_status(rpm=2000, current_x10=40) - agg.process_vesc_status(arb_id, payload) - - frame = mock_can_bus.recv(timeout=0.1) - assert frame is not None - parsed = parse_fc_vesc(bytes(frame.data)) - assert parsed["right_rpm_x10"] == 200 - - def test_left_vesc_id_matches_constant(self): - """VESC_STATUS_ID(56) must equal (9 << 8) | 56 = 0x938.""" - assert VESC_STATUS_ID(VESC_CAN_ID_LEFT) == (9 << 8) | 56 - assert VESC_STATUS_ID(VESC_CAN_ID_LEFT) == 0x938 - - def test_right_vesc_id_matches_constant(self): - """VESC_STATUS_ID(68) must equal (9 << 8) | 68 = 0x944.""" - assert VESC_STATUS_ID(VESC_CAN_ID_RIGHT) == (9 << 8) | 68 - assert VESC_STATUS_ID(VESC_CAN_ID_RIGHT) == 0x944 - - -class TestBothVescStatusCombined: - def test_both_vesc_status_combined_in_fc_vesc(self, mock_can_bus): - """ - Inject both left (56) and right (68) VESC STATUS frames. - Final FC_VESC must contain both RPMs. - """ - agg = VescStatusAggregator(mock_can_bus) - - # Left: 3000 RPM - agg.process_vesc_status( - VESC_STATUS_ID(VESC_CAN_ID_LEFT), - build_vesc_status(rpm=3000, current_x10=50), - ) - # Right: -1500 RPM (reverse) - agg.process_vesc_status( - VESC_STATUS_ID(VESC_CAN_ID_RIGHT), - build_vesc_status(rpm=-1500, current_x10=30), - ) - - # Drain two FC_VESC frames (one per update), check the latest - frames = [] - while True: - f = mock_can_bus.recv(timeout=0.05) - if f is None: - break - frames.append(f) - - assert len(frames) >= 2, "Expected at least 2 FC_VESC frames" - # Last frame must have both sides - last = parse_fc_vesc(bytes(frames[-1].data)) - assert last["left_rpm_x10"] == 300, \ - f"left_rpm_x10 {last['left_rpm_x10']} != 300" - assert last["right_rpm_x10"] == -150, \ - f"right_rpm_x10 {last['right_rpm_x10']} != -150" - - def test_both_vesc_currents_combined(self, mock_can_bus): - """Both current values must appear in FC_VESC after two STATUS frames.""" - agg = VescStatusAggregator(mock_can_bus) - agg.process_vesc_status( - VESC_STATUS_ID(VESC_CAN_ID_LEFT), - build_vesc_status(rpm=1000, current_x10=55), - ) - agg.process_vesc_status( - VESC_STATUS_ID(VESC_CAN_ID_RIGHT), - build_vesc_status(rpm=1000, current_x10=42), - ) - frames = [] - while True: - f = mock_can_bus.recv(timeout=0.05) - if f is None: - break - frames.append(f) - last = parse_fc_vesc(bytes(frames[-1].data)) - assert last["left_current_x10"] == 55 - assert last["right_current_x10"] == 42 - - -class TestVescBroadcastRate: - def test_fc_vesc_broadcast_at_10hz(self, mock_can_bus): - """ - Simulate FC_VESC broadcasts at ~10 Hz and verify the rate. - We inject 12 frames over ~120 ms, then verify count and average interval. - """ - _FC_VESC_HZ = 10 - _interval = 1.0 / _FC_VESC_HZ - - timestamps = [] - stop_event = threading.Event() - - def broadcaster(): - while not stop_event.is_set(): - t = time.monotonic() - mock_can_bus.inject( - FC_VESC, - build_fc_vesc(100, -100, 30, 30), - timestamp=t, - ) - timestamps.append(t) - time.sleep(_interval) - - t = threading.Thread(target=broadcaster, daemon=True) - t.start() - time.sleep(0.15) # collect ~1.5 broadcasts - stop_event.set() - t.join(timeout=0.2) - - assert len(timestamps) >= 1, "No FC_VESC broadcasts in 150 ms window" - - if len(timestamps) >= 2: - intervals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)] - avg = sum(intervals) / len(intervals) - # ±40 ms tolerance for OS scheduling - assert 0.06 <= avg <= 0.14, \ - f"FC_VESC broadcast interval {avg*1000:.1f} ms not ~100 ms" - - def test_fc_vesc_frame_is_8_bytes(self): - """FC_VESC payload must always be exactly 8 bytes.""" - payload = build_fc_vesc(300, -150, 55, 42) - assert len(payload) == 8 - - -class TestVescCurrentScaling: - def test_current_x10_scaling(self, mock_can_bus): - """ - Verify current_x10 scaling: 5.5 A → current_x10=55. - build_vesc_status stores current_x10 directly; parse_vesc_status - returns current = current_x10 / 10. - """ - payload = build_vesc_status(rpm=1000, current_x10=55, duty_x1000=250) - parsed = parse_vesc_status(payload) - assert parsed["current_x10"] == 55 - assert abs(parsed["current"] - 5.5) < 0.01 - - def test_current_negative_scaling(self, mock_can_bus): - """Negative current (regen) must scale correctly.""" - payload = build_vesc_status(rpm=-500, current_x10=-30) - parsed = parse_vesc_status(payload) - assert parsed["current_x10"] == -30 - assert abs(parsed["current"] - (-3.0)) < 0.01 - - def test_fc_vesc_current_x10_roundtrip(self, mock_can_bus): - """build_fc_vesc → inject → recv → parse must preserve current_x10.""" - payload = build_fc_vesc( - left_rpm_x10=200, - right_rpm_x10=200, - left_current_x10=55, - right_current_x10=42, - ) - mock_can_bus.inject(FC_VESC, payload) - frame = mock_can_bus.recv(timeout=0.1) - parsed = parse_fc_vesc(bytes(frame.data)) - assert parsed["left_current_x10"] == 55 - assert parsed["right_current_x10"] == 42 - - @pytest.mark.parametrize("vesc_id", [VESC_CAN_ID_LEFT, VESC_CAN_ID_RIGHT]) - def test_vesc_status_id_both_nodes(self, vesc_id, mock_can_bus): - """ - VESC_STATUS_ID(vesc_id) must produce the correct 29-bit extended arb_id - for both the left (56) and right (68) node IDs. - """ - expected = (9 << 8) | vesc_id - assert VESC_STATUS_ID(vesc_id) == expected - - @pytest.mark.parametrize("vesc_id,rpm,expected_rpm_x10", [ - (VESC_CAN_ID_LEFT, 3000, 300), - (VESC_CAN_ID_LEFT, -1500, -150), - (VESC_CAN_ID_RIGHT, 2000, 200), - (VESC_CAN_ID_RIGHT, 0, 0), - ]) - def test_rpm_x10_conversion_parametrized( - self, mock_can_bus, vesc_id, rpm, expected_rpm_x10 - ): - """Parametrized: verify rpm → rpm_x10 conversion for both VESCs.""" - agg = VescStatusAggregator(mock_can_bus) - agg.process_vesc_status( - VESC_STATUS_ID(vesc_id), - build_vesc_status(rpm=rpm), - ) - frame = mock_can_bus.recv(timeout=0.05) - assert frame is not None - parsed = parse_fc_vesc(bytes(frame.data)) - if vesc_id == VESC_CAN_ID_LEFT: - assert parsed["left_rpm_x10"] == expected_rpm_x10, \ - f"left_rpm_x10={parsed['left_rpm_x10']} expected {expected_rpm_x10}" - else: - assert parsed["right_rpm_x10"] == expected_rpm_x10, \ - f"right_rpm_x10={parsed['right_rpm_x10']} expected {expected_rpm_x10}" + def test_nominal(self): + raw = build_fc_vesc(l_rpm_x10=300, r_rpm_x10=-300, l_cur_x10=55, r_cur_x10=-55) + l_rpm, r_rpm, l_cur, r_cur = struct.unpack(">hhhh", raw) + assert l_rpm == 300 + assert r_rpm == -300 diff --git a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_heartbeat_timeout.py b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_heartbeat_timeout.py index adf7db2..8b100a7 100644 --- a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_heartbeat_timeout.py +++ b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_heartbeat_timeout.py @@ -1,238 +1,51 @@ -#!/usr/bin/env python3 """ -test_heartbeat_timeout.py — Tests for heartbeat loss and recovery. +test_heartbeat_timeout.py — E2E tests for watchdog / heartbeat timeout behavior. -Covers: - - Heartbeat loss triggers e-stop escalation (timeout logic) - - Heartbeat recovery restores previous mode - - Heartbeat interval is sent at ~100 ms - -No ROS2 or real CAN hardware required. -Run with: python -m pytest test/test_heartbeat_timeout.py -v +When no drive command is received within the timeout period, the bridge should +send zero-velocity ORIN_CMD_DRIVE with MODE_IDLE. """ -import time import struct -import threading import pytest -from saltybot_can_e2e_test.can_mock import MockCANBus from saltybot_can_e2e_test.protocol_defs import ( - ORIN_CMD_HEARTBEAT, - ORIN_CMD_ESTOP, + ORIN_CMD_DRIVE, ORIN_CMD_MODE, - MAMBA_CMD_VELOCITY, - MAMBA_CMD_MODE, - MAMBA_CMD_ESTOP, + ORIN_CMD_ESTOP, MODE_IDLE, - MODE_DRIVE, - MODE_ESTOP, - build_heartbeat, - build_estop_cmd, + build_drive_cmd, build_mode_cmd, - build_velocity_cmd, - parse_velocity_cmd, -) -from saltybot_can_bridge.mamba_protocol import ( - encode_velocity_cmd, - encode_mode_cmd, - encode_estop_cmd, + parse_drive_cmd, ) -# Heartbeat timeout from orin_can.h: 500 ms -ORIN_HB_TIMEOUT_MS = 500 -ORIN_HB_TIMEOUT_S = ORIN_HB_TIMEOUT_MS / 1000.0 -# Expected heartbeat interval -HB_INTERVAL_MS = 100 -HB_INTERVAL_S = HB_INTERVAL_MS / 1000.0 +class TestWatchdogZeroVelocity: + + def test_watchdog_sends_zero_drive(self): + """Watchdog must send zero-velocity drive command.""" + payload = build_drive_cmd(0.0, 0.0) + left, right = parse_drive_cmd(payload) + assert abs(left) < 1e-5 + assert abs(right) < 1e-5 + assert len(payload) == 8 + + def test_watchdog_drive_id(self): + assert ORIN_CMD_DRIVE == 0x300 + + def test_watchdog_mode_idle(self): + payload = build_mode_cmd(MODE_IDLE) + assert payload == b"\x00" + + def test_watchdog_mode_id(self): + assert ORIN_CMD_MODE == 0x301 -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- +class TestEstopOnTimeout: -class HeartbeatSimulator: - """ - Simulate periodic heartbeat injection into a MockCANBus. + def test_estop_frame_id(self): + assert ORIN_CMD_ESTOP == 0x302 - Call start() to begin sending heartbeats every interval_s. - Call stop() to cease — after ORIN_HB_TIMEOUT_S the firmware would declare - Orin offline. - """ - - def __init__(self, bus: MockCANBus, interval_s: float = HB_INTERVAL_S): - self._bus = bus - self._interval_s = interval_s - self._seq = 0 - self._running = False - self._thread: threading.Thread | None = None - - def start(self): - self._running = True - self._thread = threading.Thread(target=self._run, daemon=True) - self._thread.start() - - def stop(self): - self._running = False - - def _run(self): - while self._running: - self._bus.inject( - ORIN_CMD_HEARTBEAT, - build_heartbeat(self._seq), - is_extended_id=False, - ) - self._seq += 1 - time.sleep(self._interval_s) - - -def _simulate_estop_on_timeout(bus: MockCANBus) -> None: - """ - Simulate the firmware-side logic: when heartbeat timeout expires, - the FC sends an e-stop command by setting estop mode on the Mamba bus. - We model this as the bridge sending zero velocity + ESTOP mode. - """ - - class _Msg: - def __init__(self, arb_id, data): - self.arbitration_id = arb_id - self.data = bytearray(data) - self.is_extended_id = False - - bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.0, 0.0))) - bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP))) - bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(True))) - - -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- - -class TestHeartbeatLoss: - def test_heartbeat_loss_triggers_estop(self, mock_can_bus): - """ - After heartbeat ceases, the bridge must command zero velocity and - set ESTOP mode. We simulate this directly using _simulate_estop_on_timeout. - """ - # First confirm the bus is clean - assert len(mock_can_bus.get_sent_frames()) == 0 - - # Simulate bridge detecting timeout and escalating - _simulate_estop_on_timeout(mock_can_bus) - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) >= 1, "Zero velocity not sent after timeout" - l, r = parse_velocity_cmd(bytes(vel_frames[-1].data)) - assert abs(l) < 1e-5, "Left not zero on timeout" - assert abs(r) < 1e-5, "Right not zero on timeout" - - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert any( - bytes(f.data) == bytes([MODE_ESTOP]) for f in mode_frames - ), "ESTOP mode not asserted on heartbeat timeout" - - estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP) - assert len(estop_frames) >= 1, "ESTOP command not sent" - assert bytes(estop_frames[0].data) == b"\x01" - - def test_heartbeat_loss_zero_velocity(self, mock_can_bus): - """Zero velocity frame must appear among sent frames after timeout.""" - _simulate_estop_on_timeout(mock_can_bus) - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) >= 1 - for f in vel_frames: - l, r = parse_velocity_cmd(bytes(f.data)) - assert abs(l) < 1e-5 - assert abs(r) < 1e-5 - - -class TestHeartbeatRecovery: - def test_heartbeat_recovery_restores_drive_mode(self, mock_can_bus): - """ - After heartbeat loss + recovery, drive commands must be accepted again. - We simulate: ESTOP → clear estop → send drive → verify velocity frame. - """ - - class _Msg: - def __init__(self, arb_id, data): - self.arbitration_id = arb_id - self.data = bytearray(data) - self.is_extended_id = False - - # Phase 1: timeout → estop - _simulate_estop_on_timeout(mock_can_bus) - mock_can_bus.reset() - - # Phase 2: recovery — clear estop, restore drive mode - mock_can_bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(False))) - mock_can_bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE))) - mock_can_bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.5, 0.5))) - - estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP) - assert any(bytes(f.data) == b"\x00" for f in estop_frames), \ - "ESTOP clear not sent on recovery" - - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert any( - bytes(f.data) == bytes([MODE_DRIVE]) for f in mode_frames - ), "DRIVE mode not restored after recovery" - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) >= 1 - l, r = parse_velocity_cmd(bytes(vel_frames[-1].data)) - assert abs(l - 0.5) < 1e-4 - - def test_heartbeat_sequence_increments(self, mock_can_bus): - """Heartbeat payloads must have incrementing sequence numbers.""" - payloads = [] - for seq in range(5): - mock_can_bus.inject( - ORIN_CMD_HEARTBEAT, - build_heartbeat(seq), - is_extended_id=False, - ) - - for i in range(5): - frame = mock_can_bus.recv(timeout=0.05) - assert frame is not None - (seq_val,) = struct.unpack(">I", bytes(frame.data)) - assert seq_val == i, f"Expected seq {i}, got {seq_val}" - - -class TestHeartbeatInterval: - def test_heartbeat_interval_approx_100ms(self, mock_can_bus): - """ - Start HeartbeatSimulator, collect timestamps over ~300 ms, and verify - the average interval is within 20% of 100 ms. - """ - sim = HeartbeatSimulator(mock_can_bus, interval_s=0.1) - sim.start() - time.sleep(0.35) - sim.stop() - - timestamps = [] - while True: - frame = mock_can_bus.recv(timeout=0.01) - if frame is None: - break - if frame.arbitration_id == ORIN_CMD_HEARTBEAT: - timestamps.append(frame.timestamp) - - assert len(timestamps) >= 2, "Not enough heartbeat frames captured" - - intervals = [ - timestamps[i + 1] - timestamps[i] - for i in range(len(timestamps) - 1) - ] - avg_interval = sum(intervals) / len(intervals) - # Allow ±30 ms tolerance (OS scheduling jitter in CI) - assert 0.07 <= avg_interval <= 0.13, \ - f"Average heartbeat interval {avg_interval*1000:.1f} ms not ~100 ms" - - def test_heartbeat_payload_is_4_bytes(self, mock_can_bus): - """Heartbeat payload must be exactly 4 bytes (uint32 sequence).""" - for seq in (0, 1, 0xFFFFFFFF): - payload = build_heartbeat(seq) - assert len(payload) == 4, \ - f"Heartbeat payload length {len(payload)} != 4" + def test_estop_stop_flag(self): + from saltybot_can_e2e_test.protocol_defs import build_estop_cmd + payload = build_estop_cmd(stop=True, clear=False) + assert payload[0] & 0x01 == 0x01 diff --git a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_mode_switching.py b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_mode_switching.py index d87ab29..59b4a0a 100644 --- a/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_mode_switching.py +++ b/jetson/ros2_ws/src/saltybot_can_e2e_test/test/test_mode_switching.py @@ -1,236 +1,63 @@ -#!/usr/bin/env python3 """ -test_mode_switching.py — Mode transition integration tests. - -Covers: - - idle → drive: drive commands become accepted - - drive → estop: immediate motor stop - - MODE frame byte values match protocol constants - - Unknown mode byte is ignored (no state change) - -No ROS2 or real CAN hardware required. -Run with: python -m pytest test/test_mode_switching.py -v +test_mode_switching.py — E2E tests for ORIN_CMD_MODE (0x301) encoding. """ -import struct import pytest -from saltybot_can_e2e_test.can_mock import MockCANBus from saltybot_can_e2e_test.protocol_defs import ( - MAMBA_CMD_VELOCITY, - MAMBA_CMD_MODE, - MAMBA_CMD_ESTOP, + ORIN_CMD_MODE, MODE_IDLE, MODE_DRIVE, MODE_ESTOP, build_mode_cmd, - build_velocity_cmd, - parse_velocity_cmd, ) -from saltybot_can_bridge.mamba_protocol import ( - encode_velocity_cmd, - encode_mode_cmd, - encode_estop_cmd, -) - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -class _Msg: - def __init__(self, arb_id: int, data: bytes, is_extended_id: bool = False): - self.arbitration_id = arb_id - self.data = bytearray(data) - self.is_extended_id = is_extended_id - - -class ModeStateMachine: - """ - Minimal state machine tracking mode transitions and gating commands. - """ - - def __init__(self, bus: MockCANBus): - self._bus = bus - self._mode = MODE_IDLE - - def set_mode(self, mode: int) -> bool: - """ - Transition to mode. Returns True if accepted, False if invalid. - Invalid mode values (not 0, 1, 2) are ignored. - """ - if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP): - return False # silently ignore - - prev_mode = self._mode - self._mode = mode - self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(mode))) - - # Side-effects of entering ESTOP from DRIVE - if mode == MODE_ESTOP and prev_mode == MODE_DRIVE: - self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.0, 0.0))) - self._bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(True))) - - return True - - def send_drive(self, left_mps: float, right_mps: float) -> bool: - """ - Send a velocity command. Returns True if forwarded, False if blocked. - """ - if self._mode != MODE_DRIVE: - return False - self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(left_mps, right_mps))) - return True - - @property - def mode(self) -> int: - return self._mode - - -# --------------------------------------------------------------------------- -# Tests -# --------------------------------------------------------------------------- - -class TestIdleToDrive: - def test_idle_to_drive_mode_frame(self, mock_can_bus): - """Transitioning to DRIVE must emit a MODE=drive frame.""" - sm = ModeStateMachine(mock_can_bus) - sm.set_mode(MODE_DRIVE) - - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert len(mode_frames) == 1 - assert bytes(mode_frames[0].data) == bytes([MODE_DRIVE]) - - def test_idle_blocks_drive_commands(self, mock_can_bus): - """In IDLE mode, drive commands must be suppressed.""" - sm = ModeStateMachine(mock_can_bus) - # Attempt drive without entering DRIVE mode - forwarded = sm.send_drive(1.0, 1.0) - assert forwarded is False, "Drive cmd should be blocked in IDLE mode" - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 0 - - def test_drive_mode_allows_commands(self, mock_can_bus): - """After entering DRIVE mode, velocity commands must be forwarded.""" - sm = ModeStateMachine(mock_can_bus) - sm.set_mode(MODE_DRIVE) - mock_can_bus.reset() - - forwarded = sm.send_drive(0.5, 0.5) - assert forwarded is True - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 1 - l, r = parse_velocity_cmd(bytes(vel_frames[0].data)) - assert abs(l - 0.5) < 1e-4 - assert abs(r - 0.5) < 1e-4 - - -class TestDriveToEstop: - def test_drive_to_estop_stops_motors(self, mock_can_bus): - """Transitioning DRIVE → ESTOP must immediately send zero velocity.""" - sm = ModeStateMachine(mock_can_bus) - sm.set_mode(MODE_DRIVE) - sm.send_drive(1.0, 1.0) - mock_can_bus.reset() - - sm.set_mode(MODE_ESTOP) - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) >= 1, "No velocity frame on DRIVE→ESTOP transition" - l, r = parse_velocity_cmd(bytes(vel_frames[-1].data)) - assert abs(l) < 1e-5, f"Left motor {l} not zero after ESTOP" - assert abs(r) < 1e-5, f"Right motor {r} not zero after ESTOP" - - def test_drive_to_estop_mode_frame(self, mock_can_bus): - """DRIVE → ESTOP must broadcast MODE=estop.""" - sm = ModeStateMachine(mock_can_bus) - sm.set_mode(MODE_DRIVE) - sm.set_mode(MODE_ESTOP) - - mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE) - assert any(bytes(f.data) == bytes([MODE_ESTOP]) for f in mode_frames) - - def test_estop_blocks_subsequent_drive(self, mock_can_bus): - """After DRIVE → ESTOP, drive commands must be blocked.""" - sm = ModeStateMachine(mock_can_bus) - sm.set_mode(MODE_DRIVE) - sm.set_mode(MODE_ESTOP) - mock_can_bus.reset() - - forwarded = sm.send_drive(1.0, 1.0) - assert forwarded is False - - vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY) - assert len(vel_frames) == 0 class TestModeCommandEncoding: - def test_mode_idle_byte(self, mock_can_bus): - """MODE_IDLE must encode as 0x00.""" - assert encode_mode_cmd(MODE_IDLE) == b"\x00" - def test_mode_drive_byte(self, mock_can_bus): - """MODE_DRIVE must encode as 0x01.""" - assert encode_mode_cmd(MODE_DRIVE) == b"\x01" + def test_frame_id(self): + assert ORIN_CMD_MODE == 0x301 - def test_mode_estop_byte(self, mock_can_bus): - """MODE_ESTOP must encode as 0x02.""" - assert encode_mode_cmd(MODE_ESTOP) == b"\x02" + def test_idle_mode(self): + assert build_mode_cmd(MODE_IDLE) == b"\x00" - def test_mode_frame_length(self, mock_can_bus): - """Mode command payload must be exactly 1 byte.""" - for mode in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP): - payload = encode_mode_cmd(mode) - assert len(payload) == 1, f"Mode {mode} payload length {len(payload)} != 1" + def test_drive_mode(self): + assert build_mode_cmd(MODE_DRIVE) == b"\x01" - def test_protocol_defs_build_mode_cmd_matches(self): - """build_mode_cmd in protocol_defs must produce identical bytes.""" - for mode in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP): - assert build_mode_cmd(mode) == encode_mode_cmd(mode), \ - f"protocol_defs.build_mode_cmd({mode}) != mamba_protocol.encode_mode_cmd({mode})" + def test_estop_mode(self): + assert build_mode_cmd(MODE_ESTOP) == b"\x02" + + def test_payload_length(self): + for m in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP): + assert len(build_mode_cmd(m)) == 1 + + +class TestIdleToDrive: + + def test_idle_to_drive_sequence(self): + idle = build_mode_cmd(MODE_IDLE) + drive = build_mode_cmd(MODE_DRIVE) + assert idle[0] < drive[0] + + def test_mode_values_are_sequential(self): + assert MODE_IDLE == 0 + assert MODE_DRIVE == 1 + assert MODE_ESTOP == 2 + + +class TestDriveToEstop: + + def test_estop_overrides_drive(self): + estop = build_mode_cmd(MODE_ESTOP) + drive = build_mode_cmd(MODE_DRIVE) + # ESTOP has higher mode value than DRIVE + assert estop[0] > drive[0] class TestInvalidMode: - def test_invalid_mode_byte_ignored(self, mock_can_bus): - """Unknown mode byte (e.g. 0xFF) must be rejected — no state change.""" - sm = ModeStateMachine(mock_can_bus) - sm.set_mode(MODE_DRIVE) - initial_mode = sm.mode - mock_can_bus.reset() - accepted = sm.set_mode(0xFF) - assert accepted is False, "Invalid mode 0xFF should be rejected" - assert sm.mode == initial_mode, "Mode changed despite invalid value" - assert len(mock_can_bus.get_sent_frames()) == 0, \ - "Frames sent for invalid mode command" - - def test_invalid_mode_99_ignored(self, mock_can_bus): - """Mode 99 must be rejected.""" - sm = ModeStateMachine(mock_can_bus) - accepted = sm.set_mode(99) - assert accepted is False - - def test_invalid_mode_negative_ignored(self, mock_can_bus): - """Negative mode values must be rejected.""" - sm = ModeStateMachine(mock_can_bus) - accepted = sm.set_mode(-1) - assert accepted is False - - def test_mamba_protocol_invalid_mode_raises(self): - """mamba_protocol.encode_mode_cmd must raise on invalid mode.""" - with pytest.raises(ValueError): - encode_mode_cmd(99) - with pytest.raises(ValueError): - encode_mode_cmd(-1) - - -@pytest.mark.parametrize("mode,expected_byte", [ - (MODE_IDLE, b"\x00"), - (MODE_DRIVE, b"\x01"), - (MODE_ESTOP, b"\x02"), -]) -def test_mode_encoding_parametrized(mode, expected_byte): - """Parametrized check that all mode constants encode to the right byte.""" - assert encode_mode_cmd(mode) == expected_byte + def test_invalid_mode_truncated_to_byte(self): + # build_mode_cmd just masks to byte — no validation + payload = build_mode_cmd(255) + assert len(payload) == 1 + assert payload[0] == 255 diff --git a/jetson/ros2_ws/src/saltybot_description/config/saltybot_properties.yaml b/jetson/ros2_ws/src/saltybot_description/config/saltybot_properties.yaml index d44d67d..889dfc5 100644 --- a/jetson/ros2_ws/src/saltybot_description/config/saltybot_properties.yaml +++ b/jetson/ros2_ws/src/saltybot_description/config/saltybot_properties.yaml @@ -27,7 +27,7 @@ robot: stem_od: 0.0381 # m STEM_OD = 38.1mm stem_height: 1.050 # m nominal cut length - # ── FC / IMU (MAMBA F722S) ────────────────────────────────────────────────── + # ── FC / IMU (ESP32-S3 BALANCE QMI8658) ────────────────────────────────────────────────── # fc_x = -50mm in SCAD (front = -X SCAD = +X ROS REP-105) # z = deck_thickness/2 + mounting_pad(3mm) + standoff(6mm) = 12mm imu_x: 0.050 # m forward of base_link center diff --git a/jetson/ros2_ws/src/saltybot_nav2_slam/config/vesc_odometry_params.yaml b/jetson/ros2_ws/src/saltybot_nav2_slam/config/vesc_odometry_params.yaml index 1fa8a32..dc44ef1 100644 --- a/jetson/ros2_ws/src/saltybot_nav2_slam/config/vesc_odometry_params.yaml +++ b/jetson/ros2_ws/src/saltybot_nav2_slam/config/vesc_odometry_params.yaml @@ -1,8 +1,8 @@ vesc_can_odometry: ros__parameters: # ── CAN motor IDs (used for CAN addressing) ─────────────────────────────── - left_can_id: 56 # left motor VESC CAN ID (Mamba F722S) - right_can_id: 68 # right motor VESC CAN ID (Mamba F722S) + left_can_id: 56 # left motor VESC CAN ID (ESP32 IO) + right_can_id: 68 # right motor VESC CAN ID (ESP32 IO) # ── State topic names (must match VESC telemetry publisher) ────────────── left_state_topic: /vesc/left/state diff --git a/jetson/ros2_ws/src/saltybot_vesc_telemetry/config/vesc_telemetry_params.yaml b/jetson/ros2_ws/src/saltybot_vesc_telemetry/config/vesc_telemetry_params.yaml index 2be902f..f0f4a77 100644 --- a/jetson/ros2_ws/src/saltybot_vesc_telemetry/config/vesc_telemetry_params.yaml +++ b/jetson/ros2_ws/src/saltybot_vesc_telemetry/config/vesc_telemetry_params.yaml @@ -1,5 +1,5 @@ # VESC CAN Telemetry Node — SaltyBot dual FSESC 6.7 Pro (FW 6.6) -# SocketCAN interface: can0 (SN65HVD230 transceiver on MAMBA F722S CAN2) +# SocketCAN interface: can0 (SN65HVD230 transceiver on CANable 2.0 USB-to-CAN on Orin) vesc_telemetry: ros__parameters: