Compare commits

..

No commits in common. "ec4527b8f39d8acbbdcb699f3c8a82b59d03c8bd" and "7db6158adaf50cd76ee98d3c29dfcaa9e8b96b54" have entirely different histories.

24 changed files with 2185 additions and 764 deletions

View File

@ -5,18 +5,17 @@ You're working on **SaltyLab**, a self-balancing two-wheeled indoor robot. Read
## Project Overview
A hoverboard-based balancing robot with two compute layers:
1. **ESP32-S3 BALANCE** — runs the PID balance loop. Safety-critical, operates independently of the Orin.
2. **ESP32-S3 IO** — handles I/O: motor commands to VESCs, sensor polling, CAN/UART comms.
3. **Orin Nano Super** — AI brain. ROS2, SLAM, person tracking. Sends velocity commands to ESP32-S3 BALANCE via UART. Not safety-critical.
1. **FC (Flight Controller)** — MAMBA F722S (STM32F722RET6 + MPU6000 IMU). Runs a lean C balance loop at up to 8kHz. Talks UART to the hoverboard ESC. This is the safety-critical layer.
2. **Jetson Nano** — AI brain. ROS2, SLAM, person tracking. Sends velocity commands to FC via UART. Not safety-critical — FC operates independently.
```
Orin Nano Super (speed+steer via UART) ←→ ELRS RC (kill switch)
Jetson (speed+steer via UART1) ←→ ELRS RC (UART3, kill switch)
ESP32-S3 BALANCE (IMU, PID balance loop)
MAMBA F722S (MPU6000 IMU, PID balance)
CAN / UART
ESP32-S3 IO → VESC 68 (left) + VESC 56 (right)
▼ UART2
Hoverboard ESC (FOC) → 2× 8" hub motors
```
## ⚠️ SAFETY — READ THIS OR PEOPLE GET HURT
@ -27,7 +26,7 @@ This is not a toy. 8" hub motors + 36V battery can crush fingers, break toes, an
2. **Tilt cutoff at ±25°** — motors to zero, require manual re-arm. No retry, no recovery.
3. **Hardware watchdog (50ms)** — if firmware hangs, motors cut.
4. **RC kill switch** — dedicated ELRS channel, checked every loop iteration. Always overrides.
5. **Orin UART timeout (200ms)** — if Orin disconnects, motors cut.
5. **Jetson UART timeout (200ms)** — if Jetson disconnects, motors cut.
6. **Speed hard cap** — firmware limit, start at 10%. Increase only after proven stable.
7. **Never test untethered** until PID is stable for 5+ minutes on a tether.
@ -36,16 +35,31 @@ This is not a toy. 8" hub motors + 36V battery can crush fingers, break toes, an
## Repository Layout
```
esp32/ # ESP32-S3 firmware (ESP-IDF)
├── balance/ # ESP32-S3 BALANCE — PID loop, IMU, safety
└── io/ # ESP32-S3 IO — VESC CAN, sensors, comms
firmware/ # STM32 HAL firmware (PlatformIO)
├── src/
│ ├── main.c # Entry point, clock config, main loop
│ ├── icm42688.c # ICM-42688-P SPI driver (backup IMU — currently broken)
│ ├── bmp280.c # Barometer driver (disabled)
│ └── status.c # LED + buzzer status patterns
├── include/
│ ├── config.h # Pin definitions, constants
│ ├── icm42688.h
│ ├── mpu6000.h # MPU6000 driver header (primary IMU)
│ ├── hoverboard.h # Hoverboard ESC UART protocol
│ ├── crsf.h # ELRS CRSF protocol
│ ├── bmp280.h
│ └── status.h
├── lib/USB_CDC/ # USB CDC stack (serial over USB)
│ ├── src/ # CDC implementation, USB descriptors, PCD config
│ └── include/
└── platformio.ini # Build config
cad/ # OpenSCAD parametric parts (16 files)
├── dimensions.scad # ALL measurements live here — single source of truth
├── assembly.scad # Full robot assembly visualization
├── motor_mount_plate.scad
├── battery_shelf.scad
├── esp32_balance_mount.scad # Vibration-isolated ESP32-S3 BALANCE mount
├── fc_mount.scad # Vibration-isolated FC mount
├── jetson_shelf.scad
├── esc_mount.scad
├── sensor_tower_top.scad
@ -68,55 +82,55 @@ PLATFORM.md # Hardware platform reference
## Hardware Quick Reference
### ESP32-S3 BALANCE
### MAMBA F722S Flight Controller
| Spec | Value |
|------|-------|
| MCU | ESP32-S3 (dual-core Xtensa LX7, 240MHz, 512KB SRAM, 8MB flash) |
| Primary IMU | MPU6000 (SPI) |
| Role | PID balance loop, tilt cutoff, arming |
| Comms to Orin | UART (velocity commands in, telemetry out) |
| Flash | `idf.py -p /dev/ttyUSB0 flash` |
| MCU | STM32F722RET6 (Cortex-M7, 216MHz, 512KB flash, 256KB RAM) |
| Primary IMU | MPU6000 (WHO_AM_I = 0x68) |
| IMU Bus | SPI1: PA5=SCK, PA6=MISO, PA7=MOSI, CS=PA4 |
| IMU EXTI | PC4 (data ready interrupt) |
| IMU Orientation | CW270 (Betaflight convention) |
| Secondary IMU | ICM-42688-P (on same SPI1, CS unknown — currently non-functional) |
| Betaflight Target | DIAT-MAMBAF722_2022B |
| USB | OTG FS (PA11/PA12), enumerates as /dev/cu.usbmodemSALTY0011 |
| VID/PID | 0x0483/0x5740 |
| LEDs | PC15 (LED1), PC14 (LED2), active low |
| Buzzer | PB2 (inverted push-pull) |
| Battery ADC | PC1=VBAT, PC3=CURR (ADC3) |
| DFU | Hold yellow BOOT button + plug USB (or send 'R' over CDC) |
### ESP32-S3 IO
### UART Assignments
| Spec | Value |
|------|-------|
| MCU | ESP32-S3 |
| Role | VESC CAN driver, sensor polling, peripheral I/O |
| VESC IDs | 68 (left), 56 (right) |
| Motor bus | CAN 1Mbit/s |
| Flash | `idf.py -p /dev/ttyUSB1 flash` |
### UART Assignments (ESP32-S3 BALANCE)
| UART | Connected To | Baud |
|------|-------------|------|
| UART0 | Orin Nano Super | 115200 |
| UART1 | ESP32-S3 IO | 115200 |
| UART2 | ELRS Receiver | 420000 (CRSF) |
| UART | Pins | Connected To | Baud |
|------|------|-------------|------|
| USART1 | PA9/PA10 | Jetson Nano | 115200 |
| USART2 | PA2/PA3 | Hoverboard ESC | 115200 |
| USART3 | PB10/PB11 | ELRS Receiver | 420000 (CRSF) |
| UART4 | — | Spare | — |
| UART5 | — | Spare | — |
### Motor/ESC
- 2× 8" pneumatic hub motors (36V, hoverboard type)
- 2× VESC motor controllers (CAN IDs 68, 56)
- VESC CAN protocol: standard SET_DUTY / SET_CURRENT / SET_RPM
- Speed range: -1.0 to +1.0 (duty cycle)
- Hoverboard ESC with FOC firmware
- UART protocol: `{0xABCD, int16 speed, int16 steer, uint16 checksum}` at 115200
- Speed range: -1000 to +1000
### Physical Dimensions (from `cad/dimensions.scad`)
| Part | Key Measurement |
|------|----------------|
| ESP32-S3 BALANCE board | ~55×28mm (DevKit form factor) |
| ESP32-S3 IO board | ~55×28mm (DevKit form factor) |
| FC mounting holes | 25.5mm spacing (NOT standard 30.5mm!) |
| FC board size | ~36mm square |
| Hub motor body | Ø200mm (~8") |
| Motor axle | Ø12mm, 45mm long |
| Orin Nano Super | 100×79mm, M2.5 holes at 86×58mm |
| Jetson Nano | 100×80×29mm, M2.5 holes at 86×58mm |
| RealSense D435i | 90×25×25mm, 1/4-20 tripod mount |
| RPLIDAR A1 | Ø70×41mm, 4× M2.5 on Ø67mm circle |
| Kill switch hole | Ø22mm panel mount |
| Battery pack | ~180×80×40mm |
| VESC (each) | ~70×50×15mm |
| Hoverboard ESC | ~80×50×15mm |
| 2020 extrusion | 20mm square, M5 center bore |
| Frame width | ~350mm (axle to axle) |
| Frame height | ~500-550mm total |
@ -133,7 +147,7 @@ PLATFORM.md # Hardware platform reference
| sensor_tower_top | ASA | 80% |
| lidar_standoff (Ø80×80mm) | ASA | 40% |
| realsense_bracket | PETG | 60% |
| esp32_balance_mount (vibration isolated) | TPU+PETG | — |
| fc_mount (vibration isolated) | TPU+PETG | — |
| bumper front + rear (350×50×30mm) | TPU | 30% |
| handle | PETG | 80% |
| kill_switch_mount | PETG | 80% |
@ -145,75 +159,99 @@ PLATFORM.md # Hardware platform reference
### Critical Lessons Learned (DON'T REPEAT THESE)
1. **NEVER auto-run untested code on_boot** — we bricked the NSPanel 3x doing this. Test manually first.
2. **`-(int)0 == 0`** — checking `if (-result)` to detect errors doesn't work when result is 0. Always use explicit error codes.
3. **USB CDC needs RX primed in init** — without it, the OUT endpoint never starts listening.
4. **Watchdog must be fed every loop iteration** — if balance loop stalls, motors must cut within 50ms.
5. **Never change PID and speed limit in the same test** — one variable at a time.
1. **SysTick_Handler with HAL_IncTick() is MANDATORY** — without it, HAL_Delay() and every HAL timeout hangs forever. This bricked us multiple times.
2. **DCache breaks SPI on STM32F7** — disable DCache or use cache-aligned DMA buffers with clean/invalidate. We disable it.
3. **`-(int)0 == 0`** — checking `if (-result)` to detect errors doesn't work when result is 0 (success and failure look the same). Always use explicit error codes.
4. **NEVER auto-run untested code on_boot** — we bricked the NSPanel 3x doing this. Test manually first.
5. **USB CDC needs ReceivePacket() primed in CDC_Init** — without it, the OUT endpoint never starts listening. No data reception.
### Build & Flash (ESP32-S3)
### DFU Reboot (Betaflight Method)
The firmware supports reboot-to-DFU via USB command:
1. Send `R` byte over USB CDC
2. Firmware writes `0xDEADBEEF` to RTC backup register 0
3. `NVIC_SystemReset()` — clean hardware reset
4. On boot, `checkForBootloader()` (called after `HAL_Init()`) reads the magic
5. If magic found: clears it, remaps system memory, jumps to STM32 bootloader at `0x1FF00000`
6. Board appears as DFU device, ready for `dfu-util` flash
### Build & Flash
```bash
# Balance board
cd esp32/balance/
idf.py build && idf.py -p /dev/ttyUSB0 flash monitor
# IO board
cd esp32/io/
idf.py build && idf.py -p /dev/ttyUSB1 flash monitor
cd firmware/
python3 -m platformio run # Build
dfu-util -a 0 -s 0x08000000:leave -D .pio/build/f722/firmware.bin # Flash
```
Dev machine: mbpm4 (seb@192.168.87.40)
Dev machine: mbpm4 (seb@192.168.87.40), PlatformIO project at `~/Projects/saltylab-firmware/`
### Clock Configuration
```
HSE 8MHz → PLL (M=8, N=432, P=2, Q=9) → SYSCLK 216MHz
PLLSAI (N=384, P=8) → CLK48 48MHz (USB)
APB1 = HCLK/4 = 54MHz
APB2 = HCLK/2 = 108MHz
Fallback: HSI 16MHz if HSE fails (PLL M=16)
```
## Current Status & Known Issues
### Working
- IMU streaming (50Hz JSON: `{"ax":...,"ay":...,"az":...,"gx":...,"gy":...,"gz":...}`)
- VESC CAN communication (IDs 68, 56)
- LED status patterns
- USB CDC serial streaming (50Hz JSON: `{"ax":...,"ay":...,"az":...,"gx":...,"gy":...,"gz":...}`)
- Clock config with HSE + HSI fallback
- Reboot-to-DFU via USB 'R' command
- LED status patterns (status.c)
- Web UI with WebSerial + Three.js 3D visualization
### In Progress
- PID balance loop tuning
- ELRS CRSF receiver integration
- Orin UART integration
### Broken / In Progress
- **ICM-42688-P SPI reads return all zeros** — was the original IMU target, but SPI communication completely non-functional despite correct pin config. May be dead silicon. Switched to MPU6000 as primary.
- **MPU6000 driver** — header exists but implementation needs completion
- **PID balance loop** — not yet implemented
- **Hoverboard ESC UART** — protocol defined, driver not written
- **ELRS CRSF receiver** — protocol defined, driver not written
- **Barometer (BMP280)** — I2C init hangs, disabled
### TODO (Priority Order)
1. Tune PID balance loop on ESP32-S3 BALANCE
1. Get MPU6000 streaming accel+gyro data
2. Implement complementary filter (pitch angle)
3. Wire ELRS receiver, implement CRSF parser
4. Bench test (VESCs disconnected, verify PID output)
5. First tethered balance test at 10% speed
6. Orin UART integration
7. LED subsystem (ESP32-S3 IO)
3. Write hoverboard ESC UART driver
4. Write PID balance loop with safety checks
5. Wire ELRS receiver, implement CRSF parser
6. Bench test (ESC disconnected, verify PID output)
7. First tethered balance test at 10% speed
8. Jetson UART integration
9. LED subsystem (ESP32-C3)
## Communication Protocols
### Orin → ESP32-S3 BALANCE (UART0, 50Hz)
### Jetson → FC (UART1, 50Hz)
```c
struct { uint8_t header=0xAA; int16_t speed; int16_t steer; uint8_t mode; uint8_t checksum; };
// mode: 0=idle, 1=balance, 2=follow, 3=RC
```
### ESP32-S3 IO → VESC (CAN, loop rate)
- Standard VESC CAN protocol (SET_DUTY / SET_CURRENT / SET_RPM)
- Node IDs: VESC 68 (left), VESC 56 (right)
### FC → Hoverboard ESC (UART2, loop rate)
```c
struct { uint16_t start=0xABCD; int16_t speed; int16_t steer; uint16_t checksum; };
// speed/steer: -1000 to +1000
```
### ESP32-S3 BALANCE → Orin Telemetry (UART0 TX, 50Hz)
### FC → Jetson Telemetry (UART1 TX, 50Hz)
```
T:12.3,P:45,L:100,R:-80,S:3\n
// T=tilt°, P=PID output, L/R=motor commands, S=state (0-3)
```
### ESP32-S3 → USB (50Hz JSON, debug/tuning)
### FC → USB CDC (50Hz JSON)
```json
{"ax":123,"ay":-456,"az":16384,"gx":10,"gy":-5,"gz":3,"t":250,"p":0}
// Raw IMU values (int16), t=temp×10, p=PID output
{"ax":123,"ay":-456,"az":16384,"gx":10,"gy":-5,"gz":3,"t":250,"p":0,"bt":0}
// Raw IMU values (int16), t=temp×10, p=pressure, bt=baro temp
```
## LED Subsystem (ESP32-S3 IO)
## LED Subsystem (ESP32-C3)
ESP32-S3 IO eavesdrops on BALANCE→Orin telemetry (listen-only). No extra UART needed.
ESP32-C3 eavesdrops on FC→Jetson telemetry (listen-only tap on UART1 TX). No extra FC UART needed.
| State | Pattern | Color |
|-------|---------|-------|
@ -237,7 +275,7 @@ ESP32-S3 IO eavesdrops on BALANCE→Orin telemetry (listen-only). No extra UART
1. **Read SALTYLAB.md fully** before making any design decisions
2. **Never remove safety checks** from firmware — add more if needed
3. **All measurements go in `cad/dimensions.scad`** — single source of truth
4. **Test firmware on bench before any motor test**VESCs disconnected, verify outputs on serial
4. **Test firmware on bench before any motor test** — ESC disconnected, verify outputs on serial
5. **One variable at a time** — don't change PID and speed limit in the same test
6. **Document what you change** — update this file if you add pins, change protocols, or discover hardware quirks
7. **Ask before wiring changes** — wrong connections can fry an ESP32 or VESC
7. **Ask before wiring changes** — wrong connections can fry the FC ($50+ board)

View File

@ -2,7 +2,7 @@
uart_bridge.launch.py FCOrin UART bridge (Issue #362)
Launches serial_bridge_node configured for Jetson Orin UART port.
Bridges Flight Controller (ESP32-S3 BALANCE) telemetry from /dev/ttyTHS1 into ROS2.
Bridges Flight Controller (STM32F722) telemetry from /dev/ttyTHS1 into ROS2.
Published topics (same as USB CDC bridge):
/saltybot/imu sensor_msgs/Imu pitch/roll/yaw as angular velocity

View File

@ -378,7 +378,7 @@ class CmdVelBridgeNode(Node):
diag.header.stamp = stamp
status = DiagnosticStatus()
status.name = "saltybot/balance_controller"
status.hardware_id = "esp32_balance"
status.hardware_id = "stm32f722"
status.message = f"{state_label} [{mode_label}]"
status.level = (
DiagnosticStatus.OK if state == 1 else
@ -406,7 +406,7 @@ class CmdVelBridgeNode(Node):
status = DiagnosticStatus()
status.level = DiagnosticStatus.ERROR
status.name = "saltybot/balance_controller"
status.hardware_id = "esp32_balance"
status.hardware_id = "stm32f722"
status.message = f"IMU fault errno={errno}"
diag.status.append(status)
self._diag_pub.publish(diag)

View File

@ -322,7 +322,7 @@ class SaltybotCanNode(Node):
diag.header.stamp = stamp
st = DiagnosticStatus()
st.name = "saltybot/balance_controller"
st.hardware_id = "esp32_balance"
st.hardware_id = "stm32f722"
st.message = state_label
st.level = (DiagnosticStatus.OK if state == 1 else
DiagnosticStatus.WARN if state == 0 else

View File

@ -266,7 +266,7 @@ class SaltybotCmdNode(Node):
diag.header.stamp = stamp
status = DiagnosticStatus()
status.name = "saltybot/balance_controller"
status.hardware_id = "esp32_balance"
status.hardware_id = "stm32f722"
status.message = state_label
if state == 1:
status.level = DiagnosticStatus.OK
@ -294,7 +294,7 @@ class SaltybotCmdNode(Node):
status = DiagnosticStatus()
status.level = DiagnosticStatus.ERROR
status.name = "saltybot/balance_controller"
status.hardware_id = "esp32_balance"
status.hardware_id = "stm32f722"
status.message = f"IMU fault errno={errno}"
diag.status.append(status)
self._diag_pub.publish(diag)

View File

@ -1,6 +1,6 @@
"""
saltybot_bridge serial_bridge_node
ESP32-S3 BALANCE USB CDC ROS2 topic publisher
STM32F722 USB CDC ROS2 topic publisher
Telemetry frame (50 Hz, newline-delimited JSON):
{"p":<pitch×10>,"r":<roll×10>,"e":<err×10>,"ig":<integral×10>,
@ -264,7 +264,7 @@ class SerialBridgeNode(Node):
diag.header.stamp = stamp
status = DiagnosticStatus()
status.name = "saltybot/balance_controller"
status.hardware_id = "esp32_balance"
status.hardware_id = "stm32f722"
status.message = state_label
if state == 1: # ARMED
@ -293,7 +293,7 @@ class SerialBridgeNode(Node):
status = DiagnosticStatus()
status.level = DiagnosticStatus.ERROR
status.name = "saltybot/balance_controller"
status.hardware_id = "esp32_balance"
status.hardware_id = "stm32f722"
status.message = f"IMU fault errno={errno}"
diag.status.append(status)
self._diag_pub.publish(diag)

View File

@ -1,11 +1,7 @@
can_bridge_node:
ros__parameters:
# SocketCAN interface — CANable 2.0 USB-to-CAN on Orin
can_interface: "can0"
# VESC CAN node IDs (FSESC 6.7 Pro Mini Dual)
can_interface: slcan0
left_vesc_can_id: 56
right_vesc_can_id: 68
# Watchdog: send zero velocity if no /cmd_vel for this many seconds
mamba_can_id: 1
command_timeout_s: 0.5

View File

@ -1 +1 @@
"""saltybot_can_bridge — CAN bus bridge for ESP32 BALANCE and VESC telemetry."""
"""SaltyBot CAN bridge package — Mamba controller and VESC telemetry via python-can."""

View File

@ -1,169 +0,0 @@
#!/usr/bin/env python3
"""balance_protocol.py — CAN frame codec for SAUL-TEE ESP32 BALANCE + VESC.
CAN bus: 500 kbps, CANable 2.0 (can0) on Orin.
Orin ESP32 BALANCE (commands)
0x300 DRIVE 8 B [left_mps f32 LE][right_mps f32 LE]
0x301 MODE 1 B [mode u8] 0=idle 1=drive 2=estop
0x302 ESTOP 1 B [flags u8] bit0=stop bit1=clear
0x303 LED 4 B [pattern u8][r u8][g u8][b u8]
ESP32 BALANCE Orin (telemetry)
0x400 FC_STATUS 8 B [pitch_x10 i16][motor_cmd u16][vbat_mv u16][state u8][flags u8] 10 Hz
0x401 FC_VESC 8 B [l_rpm_x10 i16][r_rpm_x10 i16][l_cur_x10 i16][r_cur_x10 i16] 10 Hz
VESC standard CAN frames (29-bit extended IDs)
VESC node IDs: Left = 56, Right = 68
ID = (packet_type << 8) | node_id
"""
import struct
from dataclasses import dataclass
ORIN_CMD_DRIVE: int = 0x300
ORIN_CMD_MODE: int = 0x301
ORIN_CMD_ESTOP: int = 0x302
ORIN_CMD_LED: int = 0x303
FC_STATUS: int = 0x400
FC_VESC: int = 0x401
VESC_LEFT_ID: int = 56
VESC_RIGHT_ID: int = 68
VESC_CMD_SET_RPM: int = 3
VESC_STATUS_1: int = 9
VESC_STATUS_4: int = 16
VESC_STATUS_5: int = 27
MODE_IDLE: int = 0
MODE_DRIVE: int = 1
MODE_ESTOP: int = 2
@dataclass
class FcStatus:
pitch_deg: float = 0.0
motor_cmd: int = 0
vbat_mv: int = 0
state: int = 0
flags: int = 0
@dataclass
class FcVesc:
left_rpm: float = 0.0
right_rpm: float = 0.0
left_cur: float = 0.0
right_cur: float = 0.0
@dataclass
class VescStatus1:
node_id: int = 0
erpm: float = 0.0
current: float = 0.0
duty: float = 0.0
@dataclass
class VescStatus4:
node_id: int = 0
temp_fet_c: float = 0.0
temp_motor_c: float = 0.0
current_in: float = 0.0
@dataclass
class VescStatus5:
node_id: int = 0
tacho: int = 0
vbat_v: float = 0.0
def encode_drive_cmd(left_mps: float, right_mps: float) -> bytes:
"""Encode ORIN_CMD_DRIVE (0x300): 8 bytes, 2 × float32 little-endian."""
return struct.pack("<ff", float(left_mps), float(right_mps))
def encode_mode_cmd(mode: int) -> bytes:
"""Encode ORIN_CMD_MODE (0x301): 1 byte mode value."""
if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
raise ValueError(f"Invalid mode {mode!r}")
return struct.pack("B", mode)
def encode_estop_cmd(stop: bool = True, clear: bool = False) -> bytes:
"""Encode ORIN_CMD_ESTOP (0x302): 1 byte flags (bit0=stop, bit1=clear)."""
return struct.pack("B", (0x01 if stop else 0) | (0x02 if clear else 0))
def encode_led_cmd(pattern: int, r: int, g: int, b: int) -> bytes:
"""Encode ORIN_CMD_LED (0x303): 4 bytes."""
return struct.pack("BBBB", pattern & 0xFF, r & 0xFF, g & 0xFF, b & 0xFF)
def encode_vesc_set_rpm(node_id: int, rpm: int) -> tuple:
"""Return (arb_id, payload) for a VESC SET_RPM command."""
return (VESC_CMD_SET_RPM << 8) | (node_id & 0xFF), struct.pack(">i", int(rpm))
def decode_fc_status(data: bytes) -> FcStatus:
"""Decode FC_STATUS (0x400) payload → FcStatus."""
if len(data) < 8:
raise ValueError(f"FC_STATUS expects 8 bytes, got {len(data)}")
pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack_from(">hHHBB", data)
return FcStatus(
pitch_deg=pitch_x10 / 10.0,
motor_cmd=motor_cmd,
vbat_mv=vbat_mv,
state=state,
flags=flags,
)
def decode_fc_vesc(data: bytes) -> FcVesc:
"""Decode FC_VESC (0x401) payload → FcVesc."""
if len(data) < 8:
raise ValueError(f"FC_VESC expects 8 bytes, got {len(data)}")
l_rpm, r_rpm, l_cur, r_cur = struct.unpack_from(">hhhh", data)
return FcVesc(
left_rpm=l_rpm / 10.0,
right_rpm=r_rpm / 10.0,
left_cur=l_cur / 10.0,
right_cur=r_cur / 10.0,
)
def decode_vesc_can_id(can_id: int) -> tuple:
"""Decode a VESC extended CAN ID → (packet_type, node_id)."""
return (can_id >> 8) & 0xFF, can_id & 0xFF
def decode_vesc_status1(node_id: int, data: bytes) -> VescStatus1:
"""Decode VESC STATUS_1 payload → VescStatus1."""
erpm, cur_x10, duty_x1000 = struct.unpack_from(">ihh", data[:8])
return VescStatus1(
node_id=node_id,
erpm=float(erpm),
current=cur_x10 / 10.0,
duty=duty_x1000 / 1000.0,
)
def decode_vesc_status4(node_id: int, data: bytes) -> VescStatus4:
"""Decode VESC STATUS_4 payload → VescStatus4."""
tfet, tmot, cur_in, _ = struct.unpack_from(">hhhh", data[:8])
return VescStatus4(
node_id=node_id,
temp_fet_c=tfet / 10.0,
temp_motor_c=tmot / 10.0,
current_in=cur_in / 10.0,
)
def decode_vesc_status5(node_id: int, data: bytes) -> VescStatus5:
"""Decode VESC STATUS_5 payload → VescStatus5."""
tacho, vbat_x10 = struct.unpack_from(">ih", data[:6])
return VescStatus5(node_id=node_id, tacho=tacho, vbat_v=vbat_x10 / 10.0)

View File

@ -1,20 +1,20 @@
#!/usr/bin/env python3
"""
can_bridge_node.py ROS2 node bridging the SaltyBot Orin to the ESP32-S3 BALANCE
can_bridge_node.py ROS2 node bridging the SaltyBot Orin to the Mamba motor
controller and VESC motor controllers over CAN bus.
The node opens the SocketCAN interface (can0 by default), spawns a background
The node opens the SocketCAN interface (slcan0 by default), spawns a background
reader thread to process incoming telemetry, and exposes the following interface:
Subscriptions
-------------
/cmd_vel geometry_msgs/Twist -> drive commands (CAN 0x300)
/estop std_msgs/Bool -> e-stop (CAN 0x303)
/cmd_vel geometry_msgs/Twist VESC speed commands (CAN)
/estop std_msgs/Bool Mamba e-stop (CAN)
Publications
------------
/can/attitude std_msgs/Float32MultiArray ESP32-S3 BALANCE attitude
/can/battery std_msgs/Float32MultiArray ESP32-S3 BALANCE battery
/can/imu sensor_msgs/Imu Mamba IMU telemetry
/can/battery sensor_msgs/BatteryState Mamba battery telemetry
/can/vesc/left/state std_msgs/Float32MultiArray Left VESC state
/can/vesc/right/state std_msgs/Float32MultiArray Right VESC state
/can/connection_status std_msgs/String "connected" | "disconnected"
@ -30,24 +30,30 @@ import can
import rclpy
from geometry_msgs.msg import Twist
from rclpy.node import Node
from rcl_interfaces.msg import SetParametersResult
from sensor_msgs.msg import BatteryState, Imu
from std_msgs.msg import Bool, Float32MultiArray, String
from saltybot_can_bridge.balance_protocol import (
ESP32_TELEM_ATTITUDE,
ESP32_TELEM_BATTERY,
ORIN_CMD_DRIVE,
ORIN_CMD_ESTOP,
VESC_LEFT_ID,
VESC_RIGHT_ID,
VESC_STATUS_1,
from saltybot_can_bridge.mamba_protocol import (
MAMBA_CMD_ESTOP,
MAMBA_CMD_MODE,
MAMBA_CMD_VELOCITY,
MAMBA_TELEM_BATTERY,
MAMBA_TELEM_IMU,
VESC_TELEM_STATE,
ORIN_CAN_ID_FC_PID_ACK,
ORIN_CAN_ID_PID_SET,
MODE_DRIVE,
MODE_ESTOP,
MODE_IDLE,
encode_drive_cmd,
encode_estop_cmd,
decode_attitude,
decode_battery,
decode_vesc_can_id,
decode_vesc_status1,
encode_mode_cmd,
encode_velocity_cmd,
encode_pid_set_cmd,
decode_battery_telem,
decode_imu_telem,
decode_pid_ack,
decode_vesc_state,
)
# Reconnect attempt interval when CAN bus is lost
@ -58,35 +64,39 @@ _WATCHDOG_HZ: float = 10.0
class CanBridgeNode(Node):
"""CAN bus bridge between Orin ROS2 and ESP32-S3 BALANCE / VESC controllers."""
"""CAN bus bridge between Orin ROS2 and Mamba / VESC controllers."""
def __init__(self) -> None:
super().__init__("can_bridge_node")
# -- Parameters --
self.declare_parameter("can_interface", "can0")
self.declare_parameter("left_vesc_can_id", VESC_LEFT_ID)
self.declare_parameter("right_vesc_can_id", VESC_RIGHT_ID)
# ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("can_interface", "slcan0")
self.declare_parameter("left_vesc_can_id", 56)
self.declare_parameter("right_vesc_can_id", 68)
self.declare_parameter("mamba_can_id", 1)
self.declare_parameter("command_timeout_s", 0.5)
self.declare_parameter("pid/kp", 0.0)
self.declare_parameter("pid/ki", 0.0)
self.declare_parameter("pid/kd", 0.0)
self._iface: str = self.get_parameter("can_interface").value
self._left_vesc_id: int = self.get_parameter("left_vesc_can_id").value
self._right_vesc_id: int = self.get_parameter("right_vesc_can_id").value
self._mamba_id: int = self.get_parameter("mamba_can_id").value
self._cmd_timeout: float = self.get_parameter("command_timeout_s").value
self._pid_kp: float = self.get_parameter("pid/kp").value
self._pid_ki: float = self.get_parameter("pid/ki").value
self._pid_kd: float = self.get_parameter("pid/kd").value
# -- State --
# ── State ─────────────────────────────────────────────────────────
self._bus: Optional[can.BusABC] = None
self._connected: bool = False
self._last_cmd_time: float = time.monotonic()
self._lock = threading.Lock()
self._lock = threading.Lock() # protects _bus / _connected
# -- Publishers --
self._pub_attitude = self.create_publisher(
Float32MultiArray, "/can/attitude", 10
)
self._pub_battery = self.create_publisher(
Float32MultiArray, "/can/battery", 10
)
# ── Publishers ────────────────────────────────────────────────────
self._pub_imu = self.create_publisher(Imu, "/can/imu", 10)
self._pub_battery = self.create_publisher(BatteryState, "/can/battery", 10)
self._pub_vesc_left = self.create_publisher(
Float32MultiArray, "/can/vesc/left/state", 10
)
@ -97,29 +107,55 @@ class CanBridgeNode(Node):
String, "/can/connection_status", 10
)
# -- Subscriptions --
# ── Subscriptions ─────────────────────────────────────────────────
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
self.create_subscription(Bool, "/estop", self._estop_cb, 10)
self.add_on_set_parameters_callback(self._on_set_parameters)
# -- Timers --
# ── Timers ────────────────────────────────────────────────────────
self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb)
self.create_timer(_RECONNECT_INTERVAL_S, self._reconnect_cb)
# -- Open CAN --
# ── Open CAN ──────────────────────────────────────────────────────
self._try_connect()
# -- Background reader thread --
# ── Background reader thread ──────────────────────────────────────
self._reader_thread = threading.Thread(
target=self._reader_loop, daemon=True, name="can_reader"
)
self._reader_thread.start()
self.get_logger().info(
f"can_bridge_node ready -- iface={self._iface} "
f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id}"
f"can_bridge_node ready — iface={self._iface} "
f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id} "
f"mamba={self._mamba_id}"
)
# -- Connection management --
# -- PID parameter callback (Issue #693) --
def _on_set_parameters(self, params) -> SetParametersResult:
"""Send new PID gains over CAN when pid/* params change."""
for p in params:
if p.name == "pid/kp":
self._pid_kp = float(p.value)
elif p.name == "pid/ki":
self._pid_ki = float(p.value)
elif p.name == "pid/kd":
self._pid_kd = float(p.value)
else:
continue
try:
payload = encode_pid_set_cmd(self._pid_kp, self._pid_ki, self._pid_kd)
self._send_can(ORIN_CAN_ID_PID_SET, payload, "pid_set")
self.get_logger().info(
f"PID gains sent: Kp={self._pid_kp:.2f} "
f"Ki={self._pid_ki:.2f} Kd={self._pid_kd:.2f}"
)
except ValueError as exc:
return SetParametersResult(successful=False, reason=str(exc))
return SetParametersResult(successful=True)
# ── Connection management ──────────────────────────────────────────────
def _try_connect(self) -> None:
"""Attempt to open the CAN interface; silently skip if already connected."""
@ -138,7 +174,7 @@ class CanBridgeNode(Node):
except Exception as exc:
self.get_logger().warning(
f"CAN bus not available ({self._iface}): {exc} "
f"-- will retry every {_RECONNECT_INTERVAL_S:.0f} s"
f" will retry every {_RECONNECT_INTERVAL_S:.0f} s"
)
self._connected = False
self._publish_status("disconnected")
@ -161,33 +197,46 @@ class CanBridgeNode(Node):
self._connected = False
self._publish_status("disconnected")
# -- ROS callbacks --
# ── ROS callbacks ─────────────────────────────────────────────────────
def _cmd_vel_cb(self, msg: Twist) -> None:
"""Convert /cmd_vel Twist to ESP32-S3 BALANCE drive commands over CAN (0x300)."""
"""Convert /cmd_vel Twist to VESC speed commands over CAN."""
self._last_cmd_time = time.monotonic()
if not self._connected:
return
# Differential drive decomposition — individual wheel speeds in m/s.
# The VESC nodes interpret linear velocity directly; angular is handled
# by the sign difference between left and right.
linear = msg.linear.x
angular = msg.angular.z
# Differential drive -- motor units [-1000, +1000]
speed_units = int(linear * 1000.0)
steer_units = int(angular * 1000.0)
self._send_can(
ORIN_CMD_DRIVE,
encode_drive_cmd(speed_units, steer_units, mode=MODE_DRIVE),
"cmd_vel",
)
# Forward left = forward right for pure translation; for rotation
# left slows and right speeds up (positive angular = CCW = left turn).
# The Mamba velocity command carries both wheels independently.
left_mps = linear - angular
right_mps = linear + angular
payload = encode_velocity_cmd(left_mps, right_mps)
self._send_can(MAMBA_CMD_VELOCITY, payload, "cmd_vel")
# Keep Mamba in DRIVE mode while receiving commands
self._send_can(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE), "cmd_vel mode")
def _estop_cb(self, msg: Bool) -> None:
"""Forward /estop to ESP32-S3 BALANCE over CAN (0x303)."""
"""Forward /estop to Mamba over CAN."""
if not self._connected:
return
payload = encode_estop_cmd(msg.data)
self._send_can(MAMBA_CMD_ESTOP, payload, "estop")
if msg.data:
self._send_can(ORIN_CMD_ESTOP, encode_estop_cmd(), "estop")
self.get_logger().warning("E-stop asserted -- sent ESTOP to ESP32-S3 BALANCE")
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP), "estop mode"
)
self.get_logger().warning("E-stop asserted — sent ESTOP to Mamba")
# -- Watchdog --
# ── Watchdog ──────────────────────────────────────────────────────────
def _watchdog_cb(self) -> None:
"""If no /cmd_vel arrives within the timeout, send zero velocity."""
@ -196,12 +245,15 @@ class CanBridgeNode(Node):
elapsed = time.monotonic() - self._last_cmd_time
if elapsed > self._cmd_timeout:
self._send_can(
ORIN_CMD_DRIVE,
encode_drive_cmd(0, 0, mode=MODE_IDLE),
MAMBA_CMD_VELOCITY,
encode_velocity_cmd(0.0, 0.0),
"watchdog zero-vel",
)
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "watchdog idle"
)
# -- CAN send helper --
# ── CAN send helper ───────────────────────────────────────────────────
def _send_can(self, arb_id: int, data: bytes, context: str) -> None:
"""Send a standard CAN frame; handle errors gracefully."""
@ -209,6 +261,7 @@ class CanBridgeNode(Node):
if not self._connected or self._bus is None:
return
bus = self._bus
msg = can.Message(
arbitration_id=arb_id,
data=data,
@ -219,95 +272,132 @@ class CanBridgeNode(Node):
except can.CanError as exc:
self._handle_can_error(exc, f"send({context})")
# -- Background CAN reader --
# ── Background CAN reader ─────────────────────────────────────────────
def _reader_loop(self) -> None:
"""Blocking CAN read loop executed in a daemon thread."""
"""
Blocking CAN read loop executed in a daemon thread.
Dispatches incoming frames to the appropriate handler.
"""
while rclpy.ok():
with self._lock:
connected = self._connected
bus = self._bus
if not connected or bus is None:
time.sleep(0.1)
continue
try:
frame = bus.recv(timeout=0.5)
except can.CanError as exc:
self._handle_can_error(exc, "reader_loop recv")
continue
if frame is None:
# Timeout — no frame within 0.5 s, loop again
continue
self._dispatch_frame(frame)
def _dispatch_frame(self, frame: can.Message) -> None:
"""Route an incoming CAN frame to the correct publisher."""
arb_id = frame.arbitration_id
data = bytes(frame.data)
try:
if arb_id == ESP32_TELEM_ATTITUDE:
self._handle_attitude(data)
elif arb_id == ESP32_TELEM_BATTERY:
self._handle_battery(data)
elif frame.is_extended_id:
# VESC extended 29-bit ID: packet_type<<8 | node_id
pkt_type, node_id = decode_vesc_can_id(arb_id)
if pkt_type == VESC_STATUS_1:
self._handle_vesc_status1(data, node_id)
if arb_id == MAMBA_TELEM_IMU:
self._handle_imu(data, frame.timestamp)
elif arb_id == MAMBA_TELEM_BATTERY:
self._handle_battery(data, frame.timestamp)
elif arb_id == VESC_TELEM_STATE + self._left_vesc_id:
self._handle_vesc_state(data, frame.timestamp, side="left")
elif arb_id == VESC_TELEM_STATE + self._right_vesc_id:
self._handle_vesc_state(data, frame.timestamp, side="right")
elif arb_id == ORIN_CAN_ID_FC_PID_ACK:
gains = decode_pid_ack(data)
self.get_logger().debug(
f"FC PID ACK: Kp={gains.kp:.2f} Ki={gains.ki:.2f} Kd={gains.kd:.2f}"
)
except Exception as exc:
self.get_logger().warning(
f"Error parsing CAN frame 0x{arb_id:03X}: {exc}"
)
# -- Frame handlers --
# ── Frame handlers ────────────────────────────────────────────────────
def _handle_attitude(self, data: bytes) -> None:
telem = decode_attitude(data)
msg = Float32MultiArray()
# Layout: [pitch_deg, speed, yaw_rate, state, flags]
msg.data = [
telem.pitch_deg,
telem.speed,
telem.yaw_rate,
float(telem.state),
float(telem.flags),
]
self._pub_attitude.publish(msg)
def _handle_imu(self, data: bytes, timestamp: float) -> None:
telem = decode_imu_telem(data)
msg = Imu()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "imu_link"
msg.linear_acceleration.x = telem.accel_x
msg.linear_acceleration.y = telem.accel_y
msg.linear_acceleration.z = telem.accel_z
msg.angular_velocity.x = telem.gyro_x
msg.angular_velocity.y = telem.gyro_y
msg.angular_velocity.z = telem.gyro_z
# Covariance unknown; mark as -1 per REP-145
msg.orientation_covariance[0] = -1.0
self._pub_imu.publish(msg)
def _handle_battery(self, data: bytes, timestamp: float) -> None:
telem = decode_battery_telem(data)
msg = BatteryState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.voltage = telem.voltage
msg.current = telem.current
msg.present = True
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
def _handle_battery(self, data: bytes) -> None:
telem = decode_battery(data)
msg = Float32MultiArray()
# Layout: [vbat_v, fault_code, rssi]
msg.data = [telem.vbat_mv / 1000.0, float(telem.fault_code), float(telem.rssi)]
self._pub_battery.publish(msg)
def _handle_vesc_status1(self, data: bytes, node_id: int) -> None:
status = decode_vesc_status1(node_id, data)
def _handle_vesc_state(
self, data: bytes, timestamp: float, side: str
) -> None:
telem = decode_vesc_state(data)
msg = Float32MultiArray()
# Layout: [erpm, current, duty]
msg.data = [status.erpm, status.current, status.duty]
if node_id == self._left_vesc_id:
# Layout: [erpm, duty, voltage, current]
msg.data = [telem.erpm, telem.duty, telem.voltage, telem.current]
if side == "left":
self._pub_vesc_left.publish(msg)
elif node_id == self._right_vesc_id:
else:
self._pub_vesc_right.publish(msg)
# -- Status helper --
# ── Status helper ─────────────────────────────────────────────────────
def _publish_status(self, status: str) -> None:
msg = String()
msg.data = status
self._pub_status.publish(msg)
# -- Shutdown --
# ── Shutdown ──────────────────────────────────────────────────────────
def destroy_node(self) -> None:
"""Send zero velocity and shut down the CAN bus cleanly."""
if self._connected and self._bus is not None:
try:
self._send_can(
ORIN_CMD_DRIVE,
encode_drive_cmd(0, 0, mode=MODE_IDLE),
MAMBA_CMD_VELOCITY,
encode_velocity_cmd(0.0, 0.0),
"shutdown",
)
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "shutdown"
)
except Exception:
pass
try:

View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""
mamba_protocol.py CAN message encoding/decoding for the Mamba motor controller
and VESC telemetry.
CAN message layout
------------------
Command frames (Orin Mamba / VESC):
MAMBA_CMD_VELOCITY 0x100 8 bytes left_speed (f32, m/s) | right_speed (f32, m/s)
MAMBA_CMD_MODE 0x101 1 byte mode (0=idle, 1=drive, 2=estop)
MAMBA_CMD_ESTOP 0x102 1 byte 0x01 = stop
Telemetry frames (Mamba Orin):
MAMBA_TELEM_IMU 0x200 24 bytes accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z (f32 each)
MAMBA_TELEM_BATTERY 0x201 8 bytes voltage (f32, V) | current (f32, A)
VESC telemetry frame (VESC Orin):
VESC_TELEM_STATE 0x300 16 bytes erpm (f32) | duty (f32) | voltage (f32) | current (f32)
All multi-byte fields are big-endian.
Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
"""
import struct
from dataclasses import dataclass
from typing import Tuple
# ---------------------------------------------------------------------------
# CAN message IDs
# ---------------------------------------------------------------------------
MAMBA_CMD_VELOCITY: int = 0x100
MAMBA_CMD_MODE: int = 0x101
MAMBA_CMD_ESTOP: int = 0x102
MAMBA_TELEM_IMU: int = 0x200
MAMBA_TELEM_BATTERY: int = 0x201
VESC_TELEM_STATE: int = 0x300
ORIN_CAN_ID_PID_SET: int = 0x305
ORIN_CAN_ID_FC_PID_ACK: int = 0x405
# ---------------------------------------------------------------------------
# Mode constants
# ---------------------------------------------------------------------------
MODE_IDLE: int = 0
MODE_DRIVE: int = 1
MODE_ESTOP: int = 2
# ---------------------------------------------------------------------------
# Data classes for decoded telemetry
# ---------------------------------------------------------------------------
@dataclass
class ImuTelemetry:
"""Decoded IMU telemetry from Mamba (MAMBA_TELEM_IMU)."""
accel_x: float = 0.0 # m/s²
accel_y: float = 0.0
accel_z: float = 0.0
gyro_x: float = 0.0 # rad/s
gyro_y: float = 0.0
gyro_z: float = 0.0
@dataclass
class BatteryTelemetry:
"""Decoded battery telemetry from Mamba (MAMBA_TELEM_BATTERY)."""
voltage: float = 0.0 # V
current: float = 0.0 # A
@dataclass
class VescStateTelemetry:
"""Decoded VESC state telemetry (VESC_TELEM_STATE)."""
erpm: float = 0.0 # electrical RPM
duty: float = 0.0 # duty cycle [-1.0, 1.0]
voltage: float = 0.0 # bus voltage, V
current: float = 0.0 # phase current, A
@dataclass
class PidGains:
"""Balance PID gains (Issue #693)."""
kp: float = 0.0
ki: float = 0.0
kd: float = 0.0
# ---------------------------------------------------------------------------
# Encode helpers
# ---------------------------------------------------------------------------
_FMT_VEL = ">ff" # 2 × float32, big-endian
_FMT_MODE = ">B" # 1 × uint8
_FMT_ESTOP = ">B" # 1 × uint8
_FMT_IMU = ">ffffff" # 6 × float32
_FMT_BAT = ">ff" # 2 × float32
_FMT_VESC = ">ffff" # 4 × float32
def encode_velocity_cmd(left_mps: float, right_mps: float) -> bytes:
"""
Encode a MAMBA_CMD_VELOCITY payload.
Parameters
----------
left_mps: target left wheel speed in m/s (positive = forward)
right_mps: target right wheel speed in m/s (positive = forward)
Returns
-------
8-byte big-endian payload suitable for a CAN frame.
"""
return struct.pack(_FMT_VEL, float(left_mps), float(right_mps))
def encode_mode_cmd(mode: int) -> bytes:
"""
Encode a MAMBA_CMD_MODE payload.
Parameters
----------
mode: one of MODE_IDLE (0), MODE_DRIVE (1), MODE_ESTOP (2)
Returns
-------
1-byte payload.
"""
if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
raise ValueError(f"Invalid mode {mode!r}; expected 0, 1, or 2")
return struct.pack(_FMT_MODE, mode)
def encode_estop_cmd(stop: bool = True) -> bytes:
"""
Encode a MAMBA_CMD_ESTOP payload.
Parameters
----------
stop: True to assert e-stop, False to clear.
Returns
-------
1-byte payload (0x01 = stop, 0x00 = clear).
"""
return struct.pack(_FMT_ESTOP, 0x01 if stop else 0x00)
def encode_pid_set_cmd(kp: float, ki: float, kd: float) -> bytes:
"""Encode ORIN_CAN_ID_PID_SET (6 bytes, uint16 BE x3). Issue #693."""
if kp < 0.0 or ki < 0.0 or kd < 0.0:
raise ValueError("PID gains must be non-negative")
return struct.pack(_FMT_PID, round(min(kp,_PID_KP_MAX)*100), round(min(ki,_PID_KI_MAX)*100), round(min(kd,_PID_KD_MAX)*100))
# ---------------------------------------------------------------------------
# Decode helpers
# ---------------------------------------------------------------------------
def decode_imu_telem(data: bytes) -> ImuTelemetry:
"""
Decode a MAMBA_TELEM_IMU payload.
Parameters
----------
data: exactly 24 bytes (6 × float32, big-endian).
Returns
-------
ImuTelemetry dataclass instance.
Raises
------
struct.error if data is the wrong length.
"""
ax, ay, az, gx, gy, gz = struct.unpack(_FMT_IMU, data)
return ImuTelemetry(
accel_x=ax, accel_y=ay, accel_z=az,
gyro_x=gx, gyro_y=gy, gyro_z=gz,
)
def decode_battery_telem(data: bytes) -> BatteryTelemetry:
"""
Decode a MAMBA_TELEM_BATTERY payload.
Parameters
----------
data: exactly 8 bytes (2 × float32, big-endian).
Returns
-------
BatteryTelemetry dataclass instance.
"""
voltage, current = struct.unpack(_FMT_BAT, data)
return BatteryTelemetry(voltage=voltage, current=current)
def decode_vesc_state(data: bytes) -> VescStateTelemetry:
"""
Decode a VESC_TELEM_STATE payload.
Parameters
----------
data: exactly 16 bytes (4 × float32, big-endian).
Returns
-------
VescStateTelemetry dataclass instance.
"""
erpm, duty, voltage, current = struct.unpack(_FMT_VESC, data)
return VescStateTelemetry(erpm=erpm, duty=duty, voltage=voltage, current=current)
def decode_pid_ack(data: bytes) -> PidGains:
"""Decode ORIN_CAN_ID_FC_PID_ACK (6 bytes). Issue #693."""
kp_x100, ki_x100, kd_x100 = struct.unpack(_FMT_PID, data)
return PidGains(kp=kp_x100/100.0, ki=ki_x100/100.0, kd=kd_x100/100.0)

View File

@ -1,22 +1,21 @@
from setuptools import find_packages, setup
from setuptools import setup
package_name = "saltybot_can_bridge"
setup(
name=package_name,
version="0.0.1",
packages=find_packages(exclude=["test"]),
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
("share/" + package_name + "/config", ["config/can_bridge_params.yaml"]),
("share/" + package_name + "/launch", ["launch/can_bridge.launch.py"]),
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/config", ["config/can_bridge_params.yaml"]),
],
install_requires=["setuptools"],
install_requires=["setuptools", "python-can"],
zip_safe=True,
maintainer="seb",
maintainer_email="seb@example.com",
description="CAN bus bridge for ESP32 BALANCE and VESC telemetry",
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="CAN bus bridge for Mamba controller and VESC telemetry",
license="MIT",
tests_require=["pytest"],
entry_points={

View File

@ -1,8 +1,8 @@
#!/usr/bin/env python3
"""
Unit tests for saltybot_can_bridge.balance_protocol.
Unit tests for saltybot_can_bridge.mamba_protocol.
No ROS2 or CAN hardware required -- tests exercise encode/decode round-trips
No ROS2 or CAN hardware required tests exercise encode/decode round-trips
and boundary conditions entirely in Python.
Run with: pytest test/test_can_bridge.py -v
@ -11,165 +11,243 @@ Run with: pytest test/test_can_bridge.py -v
import struct
import unittest
from saltybot_can_bridge.balance_protocol import (
ORIN_CMD_DRIVE,
ORIN_CMD_ARM,
ORIN_CMD_PID,
ORIN_CMD_ESTOP,
ESP32_TELEM_ATTITUDE,
ESP32_TELEM_BATTERY,
VESC_LEFT_ID,
VESC_RIGHT_ID,
VESC_STATUS_1,
MODE_IDLE,
from saltybot_can_bridge.mamba_protocol import (
MAMBA_CMD_ESTOP,
MAMBA_CMD_MODE,
MAMBA_CMD_VELOCITY,
MAMBA_TELEM_BATTERY,
MAMBA_TELEM_IMU,
VESC_TELEM_STATE,
MODE_DRIVE,
MODE_AUTONOMOUS,
AttitudeTelemetry,
MODE_ESTOP,
MODE_IDLE,
BatteryTelemetry,
VescStatus1,
decode_attitude,
decode_battery,
decode_vesc_status1,
encode_drive_cmd,
encode_arm_cmd,
ImuTelemetry,
VescStateTelemetry,
decode_battery_telem,
decode_imu_telem,
decode_vesc_state,
encode_estop_cmd,
encode_mode_cmd,
encode_velocity_cmd,
)
class TestMessageIDs(unittest.TestCase):
"""Verify CAN message ID constants match the spec."""
"""Verify the CAN message ID constants are correct."""
def test_orin_cmd_ids(self):
self.assertEqual(ORIN_CMD_DRIVE, 0x300)
self.assertEqual(ORIN_CMD_ARM, 0x301)
self.assertEqual(ORIN_CMD_PID, 0x302)
self.assertEqual(ORIN_CMD_ESTOP, 0x303)
def test_command_ids(self):
self.assertEqual(MAMBA_CMD_VELOCITY, 0x100)
self.assertEqual(MAMBA_CMD_MODE, 0x101)
self.assertEqual(MAMBA_CMD_ESTOP, 0x102)
def test_esp32_telem_ids(self):
self.assertEqual(ESP32_TELEM_ATTITUDE, 0x400)
self.assertEqual(ESP32_TELEM_BATTERY, 0x401)
def test_vesc_ids(self):
self.assertEqual(VESC_LEFT_ID, 56)
self.assertEqual(VESC_RIGHT_ID, 68)
self.assertEqual(VESC_STATUS_1, 9)
def test_telemetry_ids(self):
self.assertEqual(MAMBA_TELEM_IMU, 0x200)
self.assertEqual(MAMBA_TELEM_BATTERY, 0x201)
self.assertEqual(VESC_TELEM_STATE, 0x300)
class TestDriveEncode(unittest.TestCase):
"""Tests for encode_drive_cmd."""
class TestVelocityEncode(unittest.TestCase):
"""Tests for encode_velocity_cmd."""
def test_zero_velocity(self):
payload = encode_drive_cmd(0, 0)
payload = encode_velocity_cmd(0.0, 0.0)
self.assertEqual(len(payload), 8)
speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload)
self.assertEqual(speed, 0)
self.assertEqual(steer, 0)
self.assertEqual(mode, MODE_DRIVE)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, 0.0, places=5)
self.assertAlmostEqual(right, 0.0, places=5)
def test_forward(self):
payload = encode_drive_cmd(500, 0)
speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload)
self.assertEqual(speed, 500)
self.assertEqual(steer, 0)
def test_forward_velocity(self):
payload = encode_velocity_cmd(1.5, 1.5)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, 1.5, places=5)
self.assertAlmostEqual(right, 1.5, places=5)
def test_clamping_high(self):
payload = encode_drive_cmd(9999, 9999)
speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload)
self.assertEqual(speed, 1000)
self.assertEqual(steer, 1000)
def test_differential_velocity(self):
payload = encode_velocity_cmd(-0.5, 0.5)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, -0.5, places=5)
self.assertAlmostEqual(right, 0.5, places=5)
def test_clamping_low(self):
payload = encode_drive_cmd(-9999, -9999)
speed, steer, mode, flags, pad = struct.unpack(">hhBBH", payload)
self.assertEqual(speed, -1000)
self.assertEqual(steer, -1000)
def test_large_velocity(self):
# No clamping at the protocol layer — values are sent as-is
payload = encode_velocity_cmd(10.0, -10.0)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, 10.0, places=3)
self.assertAlmostEqual(right, -10.0, places=3)
def test_mode_idle(self):
payload = encode_drive_cmd(0, 0, mode=MODE_IDLE)
speed, steer, mode_byte, flags, pad = struct.unpack(">hhBBH", payload)
self.assertEqual(mode_byte, MODE_IDLE)
def test_payload_is_big_endian(self):
# Sanity check: first 4 bytes encode left speed
payload = encode_velocity_cmd(1.0, 0.0)
(left,) = struct.unpack(">f", payload[:4])
self.assertAlmostEqual(left, 1.0, places=5)
class TestArmEncode(unittest.TestCase):
"""Tests for encode_arm_cmd."""
class TestModeEncode(unittest.TestCase):
"""Tests for encode_mode_cmd."""
def test_arm(self):
payload = encode_arm_cmd(True)
self.assertEqual(len(payload), 1)
self.assertEqual(payload[0], 0x01)
def test_idle_mode(self):
payload = encode_mode_cmd(MODE_IDLE)
self.assertEqual(payload, b"\x00")
def test_disarm(self):
payload = encode_arm_cmd(False)
self.assertEqual(len(payload), 1)
self.assertEqual(payload[0], 0x00)
def test_drive_mode(self):
payload = encode_mode_cmd(MODE_DRIVE)
self.assertEqual(payload, b"\x01")
def test_estop_mode(self):
payload = encode_mode_cmd(MODE_ESTOP)
self.assertEqual(payload, b"\x02")
def test_invalid_mode_raises(self):
with self.assertRaises(ValueError):
encode_mode_cmd(99)
def test_invalid_mode_negative_raises(self):
with self.assertRaises(ValueError):
encode_mode_cmd(-1)
class TestEstopEncode(unittest.TestCase):
"""Tests for encode_estop_cmd."""
def test_estop_magic_byte(self):
payload = encode_estop_cmd()
self.assertEqual(len(payload), 1)
self.assertEqual(payload[0], 0xE5)
def test_estop_assert(self):
self.assertEqual(encode_estop_cmd(True), b"\x01")
def test_estop_clear(self):
self.assertEqual(encode_estop_cmd(False), b"\x00")
def test_estop_default_is_stop(self):
self.assertEqual(encode_estop_cmd(), b"\x01")
class TestAttitudeDecode(unittest.TestCase):
"""Tests for decode_attitude (0x400)."""
class TestImuDecodeRoundTrip(unittest.TestCase):
"""Round-trip tests for IMU telemetry."""
def test_roundtrip(self):
import struct as _s
# Encode a known attitude: pitch=12.5, speed=1.5, yaw_rate=-0.5, state=1, flags=0
fmt = ">eeeBB"
raw = _s.pack(fmt, 12.5, 1.5, -0.5, 1, 0) + b"\x00" # pad to 8 bytes
att = decode_attitude(raw)
self.assertAlmostEqual(att.pitch_deg, 12.5, places=1)
self.assertAlmostEqual(att.speed, 1.5, places=1)
self.assertEqual(att.state, 1)
def _encode_imu(self, ax, ay, az, gx, gy, gz) -> bytes:
return struct.pack(">ffffff", ax, ay, az, gx, gy, gz)
def test_too_short_raises(self):
with self.assertRaises(ValueError):
decode_attitude(b"\x00" * 3)
def test_zero_imu(self):
data = self._encode_imu(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
telem = decode_imu_telem(data)
self.assertIsInstance(telem, ImuTelemetry)
self.assertAlmostEqual(telem.accel_x, 0.0, places=5)
self.assertAlmostEqual(telem.gyro_z, 0.0, places=5)
def test_nominal_imu(self):
data = self._encode_imu(0.1, -0.2, 9.81, 0.01, -0.02, 0.03)
telem = decode_imu_telem(data)
self.assertAlmostEqual(telem.accel_x, 0.1, places=4)
self.assertAlmostEqual(telem.accel_y, -0.2, places=4)
self.assertAlmostEqual(telem.accel_z, 9.81, places=3)
self.assertAlmostEqual(telem.gyro_x, 0.01, places=5)
self.assertAlmostEqual(telem.gyro_y, -0.02, places=5)
self.assertAlmostEqual(telem.gyro_z, 0.03, places=5)
def test_imu_bad_length_raises(self):
with self.assertRaises(struct.error):
decode_imu_telem(b"\x00" * 10) # too short
class TestBatteryDecode(unittest.TestCase):
"""Tests for decode_battery (0x401)."""
class TestBatteryDecodeRoundTrip(unittest.TestCase):
"""Round-trip tests for battery telemetry."""
def test_roundtrip(self):
import struct as _s
raw = _s.pack(">HBb", 24000, 0, -65)
bat = decode_battery(raw)
self.assertEqual(bat.vbat_mv, 24000)
self.assertEqual(bat.fault_code, 0)
self.assertEqual(bat.rssi, -65)
def _encode_bat(self, voltage, current) -> bytes:
return struct.pack(">ff", voltage, current)
def test_too_short_raises(self):
with self.assertRaises(ValueError):
decode_battery(b"\x00" * 2)
def test_nominal_battery(self):
data = self._encode_bat(24.6, 3.2)
telem = decode_battery_telem(data)
self.assertIsInstance(telem, BatteryTelemetry)
self.assertAlmostEqual(telem.voltage, 24.6, places=3)
self.assertAlmostEqual(telem.current, 3.2, places=4)
def test_zero_battery(self):
data = self._encode_bat(0.0, 0.0)
telem = decode_battery_telem(data)
self.assertAlmostEqual(telem.voltage, 0.0, places=5)
self.assertAlmostEqual(telem.current, 0.0, places=5)
def test_max_voltage(self):
# 6S LiPo max ~25.2 V; test with a high value
data = self._encode_bat(25.2, 0.0)
telem = decode_battery_telem(data)
self.assertAlmostEqual(telem.voltage, 25.2, places=3)
def test_battery_bad_length_raises(self):
with self.assertRaises(struct.error):
decode_battery_telem(b"\x00" * 4) # too short
class TestVescStatus1Decode(unittest.TestCase):
"""Tests for decode_vesc_status1."""
class TestVescStateDecodeRoundTrip(unittest.TestCase):
"""Round-trip tests for VESC state telemetry."""
def test_roundtrip(self):
import struct as _s
erpm = 12000
cur_x10 = 35 # 3.5 A
duty_x1000 = 450 # 0.45
raw = _s.pack(">ihh", erpm, cur_x10, duty_x1000)
st = decode_vesc_status1(VESC_LEFT_ID, raw)
self.assertEqual(st.node_id, VESC_LEFT_ID)
self.assertAlmostEqual(st.erpm, 12000.0, places=1)
self.assertAlmostEqual(st.current, 3.5, places=3)
self.assertAlmostEqual(st.duty, 0.45, places=3)
def _encode_vesc(self, erpm, duty, voltage, current) -> bytes:
return struct.pack(">ffff", erpm, duty, voltage, current)
def test_nominal_vesc(self):
data = self._encode_vesc(3000.0, 0.25, 24.0, 5.5)
telem = decode_vesc_state(data)
self.assertIsInstance(telem, VescStateTelemetry)
self.assertAlmostEqual(telem.erpm, 3000.0, places=2)
self.assertAlmostEqual(telem.duty, 0.25, places=5)
self.assertAlmostEqual(telem.voltage, 24.0, places=4)
self.assertAlmostEqual(telem.current, 5.5, places=4)
def test_zero_vesc(self):
data = self._encode_vesc(0.0, 0.0, 0.0, 0.0)
telem = decode_vesc_state(data)
self.assertAlmostEqual(telem.erpm, 0.0)
self.assertAlmostEqual(telem.duty, 0.0)
def test_reverse_erpm(self):
data = self._encode_vesc(-1500.0, -0.15, 23.0, 2.0)
telem = decode_vesc_state(data)
self.assertAlmostEqual(telem.erpm, -1500.0, places=2)
self.assertAlmostEqual(telem.duty, -0.15, places=5)
def test_vesc_bad_length_raises(self):
with self.assertRaises(struct.error):
decode_vesc_state(b"\x00" * 8) # too short (need 16)
class TestModeConstants(unittest.TestCase):
"""Tests for mode byte constants."""
class TestEncodeDecodeIdentity(unittest.TestCase):
"""Cross-module identity tests: encode then decode must be identity."""
def test_mode_values(self):
self.assertEqual(MODE_IDLE, 0)
self.assertEqual(MODE_DRIVE, 1)
self.assertEqual(MODE_AUTONOMOUS, 2)
def test_velocity_identity(self):
"""encode_velocity_cmd raw bytes must decode to the same floats."""
left, right = 0.75, -0.3
payload = encode_velocity_cmd(left, right)
decoded_l, decoded_r = struct.unpack(">ff", payload)
self.assertAlmostEqual(decoded_l, left, places=5)
self.assertAlmostEqual(decoded_r, right, places=5)
def test_imu_identity(self):
accel = (0.5, -0.5, 9.8)
gyro = (0.1, -0.1, 0.2)
raw = struct.pack(">ffffff", *accel, *gyro)
telem = decode_imu_telem(raw)
self.assertAlmostEqual(telem.accel_x, accel[0], places=4)
self.assertAlmostEqual(telem.accel_y, accel[1], places=4)
self.assertAlmostEqual(telem.accel_z, accel[2], places=3)
self.assertAlmostEqual(telem.gyro_x, gyro[0], places=5)
self.assertAlmostEqual(telem.gyro_y, gyro[1], places=5)
self.assertAlmostEqual(telem.gyro_z, gyro[2], places=5)
def test_battery_identity(self):
voltage, current = 22.4, 8.1
raw = struct.pack(">ff", voltage, current)
telem = decode_battery_telem(raw)
self.assertAlmostEqual(telem.voltage, voltage, places=3)
self.assertAlmostEqual(telem.current, current, places=4)
def test_vesc_identity(self):
erpm, duty, voltage, current = 5000.0, 0.5, 24.5, 10.0
raw = struct.pack(">ffff", erpm, duty, voltage, current)
telem = decode_vesc_state(raw)
self.assertAlmostEqual(telem.erpm, erpm, places=2)
self.assertAlmostEqual(telem.duty, duty, places=5)
self.assertAlmostEqual(telem.voltage, voltage, places=3)
self.assertAlmostEqual(telem.current, current, places=4)
if __name__ == "__main__":

View File

@ -1,56 +1,69 @@
#!/usr/bin/env python3
"""
protocol_defs.py CAN message ID constants and frame builders/parsers for the
Orin <-> ESP32 BALANCE <-> VESC integration test suite.
OrinMambaVESC integration test suite.
All IDs and payload formats are derived from:
saltybot_can_bridge/balance_protocol.py canonical codec
include/orin_can.h OrinFC (Mamba) protocol
include/vesc_can.h VESC CAN protocol
saltybot_can_bridge/mamba_protocol.py existing bridge constants
CAN IDs used in tests
---------------------
Orin ESP32 BALANCE commands (standard 11-bit):
ORIN_CMD_DRIVE 0x300 left_mps (f32 LE) | right_mps (f32 LE)
ORIN_CMD_MODE 0x301 uint8 mode (0=idle, 1=drive, 2=estop)
ORIN_CMD_ESTOP 0x302 uint8 flags (bit0=stop, bit1=clear)
ORIN_CMD_LED 0x303 uint8 pattern | r | g | b
Orin FC (Mamba) commands (standard 11-bit, matching orin_can.h):
ORIN_CMD_HEARTBEAT 0x300
ORIN_CMD_DRIVE 0x301 int16 speed (1000..+1000), int16 steer (1000..+1000)
ORIN_CMD_MODE 0x302 uint8 mode byte
ORIN_CMD_ESTOP 0x303 uint8 action (1=ESTOP, 0=CLEAR)
ESP32 BALANCE Orin telemetry (standard 11-bit):
FC_STATUS 0x400 8 bytes (pitch_x10 i16 | motor_cmd u16 | vbat_mv u16 | state u8 | flags u8)
FC_VESC 0x401 8 bytes (l_rpm_x10 i16 | r_rpm_x10 i16 | l_cur_x10 i16 | r_cur_x10 i16)
FC (Mamba) Orin telemetry (standard 11-bit, matching orin_can.h):
FC_STATUS 0x400 8 bytes (see orin_can_fc_status_t)
FC_VESC 0x401 8 bytes (see orin_can_fc_vesc_t)
FC_IMU 0x402 8 bytes
FC_BARO 0x403 8 bytes
VESC STATUS (extended 29-bit):
arb_id = (VESC_STATUS_1 << 8) | vesc_node_id = (9 << 8) | node_id
Payload: int32 ERPM (BE), int16 current×10 (BE), int16 duty×1000 (BE)
Mamba VESC internal commands (matching mamba_protocol.py):
MAMBA_CMD_VELOCITY 0x100 8 bytes left_mps (f32) | right_mps (f32) big-endian
MAMBA_CMD_MODE 0x101 1 byte mode (0=idle,1=drive,2=estop)
MAMBA_CMD_ESTOP 0x102 1 byte 0x01=stop
VESC STATUS (extended 29-bit, matching vesc_can.h):
arb_id = (VESC_PKT_STATUS << 8) | vesc_node_id = (9 << 8) | node_id
Payload: int32 RPM (BE), int16 current×10 (BE), int16 duty×1000 (BE)
"""
import struct
from typing import Tuple
# ---------------------------------------------------------------------------
# Orin → ESP32 BALANCE command IDs
# Orin → FC (Mamba) command IDs (from orin_can.h)
# ---------------------------------------------------------------------------
ORIN_CMD_DRIVE: int = 0x300
ORIN_CMD_MODE: int = 0x301
ORIN_CMD_ESTOP: int = 0x302
ORIN_CMD_LED: int = 0x303
ORIN_CMD_HEARTBEAT: int = 0x300
ORIN_CMD_DRIVE: int = 0x301
ORIN_CMD_MODE: int = 0x302
ORIN_CMD_ESTOP: int = 0x303
# ---------------------------------------------------------------------------
# ESP32 BALANCE → Orin telemetry IDs
# FC (Mamba) → Orin telemetry IDs (from orin_can.h)
# ---------------------------------------------------------------------------
FC_STATUS: int = 0x400
FC_VESC: int = 0x401
FC_IMU: int = 0x402
FC_BARO: int = 0x403
# ---------------------------------------------------------------------------
# VESC constants
# Mamba → VESC internal command IDs (from mamba_protocol.py)
# ---------------------------------------------------------------------------
VESC_STATUS_1: int = 9 # STATUS_1 packet type (upper byte of arb_id)
VESC_CMD_SET_RPM: int = 3
MAMBA_CMD_VELOCITY: int = 0x100
MAMBA_CMD_MODE: int = 0x101
MAMBA_CMD_ESTOP: int = 0x102
VESC_CAN_ID_LEFT: int = 56
VESC_CAN_ID_RIGHT: int = 68
MAMBA_TELEM_IMU: int = 0x200
MAMBA_TELEM_BATTERY: int = 0x201
VESC_TELEM_STATE: int = 0x300
# ---------------------------------------------------------------------------
# Mode constants
@ -60,111 +73,242 @@ MODE_IDLE: int = 0
MODE_DRIVE: int = 1
MODE_ESTOP: int = 2
# ---------------------------------------------------------------------------
# VESC protocol constants (from vesc_can.h)
# ---------------------------------------------------------------------------
def VESC_STATUS1_ID(vesc_node_id: int) -> int:
"""Return the 29-bit extended arbitration ID for a VESC STATUS_1 frame."""
return (VESC_STATUS_1 << 8) | vesc_node_id
VESC_PKT_STATUS: int = 9 # STATUS packet type (upper byte of arb_id)
VESC_PKT_SET_RPM: int = 3 # SET_RPM packet type
VESC_CAN_ID_LEFT: int = 56
VESC_CAN_ID_RIGHT: int = 68
def VESC_STATUS_ID(vesc_node_id: int) -> int:
"""
Return the 29-bit extended arbitration ID for a VESC STATUS frame.
Formula (from vesc_can.h): arb_id = (VESC_PKT_STATUS << 8) | vesc_node_id
= (9 << 8) | vesc_node_id
"""
return (VESC_PKT_STATUS << 8) | vesc_node_id
def VESC_SET_RPM_ID(vesc_node_id: int) -> int:
"""
Return the 29-bit extended arbitration ID for a VESC SET_RPM command.
Formula: arb_id = (VESC_PKT_SET_RPM << 8) | vesc_node_id
= (3 << 8) | vesc_node_id
"""
return (VESC_PKT_SET_RPM << 8) | vesc_node_id
# ---------------------------------------------------------------------------
# Frame builders — Orin → ESP32 BALANCE
# Frame builders — Orin → FC
# ---------------------------------------------------------------------------
def build_drive_cmd(left_mps: float, right_mps: float) -> bytes:
"""Build ORIN_CMD_DRIVE payload (8 bytes, 2 × float32 little-endian)."""
return struct.pack("<ff", float(left_mps), float(right_mps))
def build_heartbeat(seq: int = 0) -> bytes:
"""Build a HEARTBEAT payload: uint32 sequence counter (big-endian, 4 bytes)."""
return struct.pack(">I", seq & 0xFFFFFFFF)
def build_drive_cmd(speed: int, steer: int) -> bytes:
"""
Build an ORIN_CMD_DRIVE payload.
Parameters
----------
speed: int, 1000..+1000 (mapped directly to int16)
steer: int, 1000..+1000
"""
return struct.pack(">hh", int(speed), int(steer))
def build_mode_cmd(mode: int) -> bytes:
"""Build ORIN_CMD_MODE payload (1 byte)."""
return struct.pack("B", mode & 0xFF)
"""Build an ORIN_CMD_MODE payload (1 byte)."""
return struct.pack(">B", mode & 0xFF)
def build_estop_cmd(stop: bool = True, clear: bool = False) -> bytes:
"""Build ORIN_CMD_ESTOP payload (1 byte flags)."""
return struct.pack("B", (0x01 if stop else 0) | (0x02 if clear else 0))
def build_estop_cmd(action: int = 1) -> bytes:
"""Build an ORIN_CMD_ESTOP payload. action=1 → ESTOP, 0 → CLEAR."""
return struct.pack(">B", action & 0xFF)
# ---------------------------------------------------------------------------
# Frame builders — ESP32 BALANCE → Orin telemetry
# Frame builders — Mamba velocity commands (mamba_protocol.py encoding)
# ---------------------------------------------------------------------------
def build_velocity_cmd(left_mps: float, right_mps: float) -> bytes:
"""
Build a MAMBA_CMD_VELOCITY payload (8 bytes, 2 × float32 big-endian).
Matches encode_velocity_cmd() in mamba_protocol.py.
"""
return struct.pack(">ff", float(left_mps), float(right_mps))
# ---------------------------------------------------------------------------
# Frame builders — FC → Orin telemetry
# ---------------------------------------------------------------------------
def build_fc_status(
pitch_x10: int = 0,
motor_cmd: int = 0,
vbat_mv: int = 24000,
state: int = 1,
balance_state: int = 1,
flags: int = 0,
) -> bytes:
"""Build FC_STATUS (0x400) payload (8 bytes, big-endian)."""
"""
Build an FC_STATUS (0x400) payload.
Layout (orin_can_fc_status_t, 8 bytes, big-endian):
int16 pitch_x10
int16 motor_cmd
uint16 vbat_mv
uint8 balance_state
uint8 flags [bit0=estop_active, bit1=armed]
"""
return struct.pack(
">hHHBB",
">hhHBB",
int(pitch_x10),
int(motor_cmd) & 0xFFFF,
int(motor_cmd),
int(vbat_mv) & 0xFFFF,
int(state) & 0xFF,
int(balance_state) & 0xFF,
int(flags) & 0xFF,
)
def build_fc_vesc(
l_rpm_x10: int = 0,
r_rpm_x10: int = 0,
l_cur_x10: int = 0,
r_cur_x10: int = 0,
left_rpm_x10: int = 0,
right_rpm_x10: int = 0,
left_current_x10: int = 0,
right_current_x10: int = 0,
) -> bytes:
"""Build FC_VESC (0x401) payload (8 bytes, big-endian)."""
"""
Build an FC_VESC (0x401) payload.
Layout (orin_can_fc_vesc_t, 8 bytes, big-endian):
int16 left_rpm_x10
int16 right_rpm_x10
int16 left_current_x10
int16 right_current_x10
RPM values are RPM / 10 (e.g. 3000 RPM 300).
Current values are A × 10 (e.g. 5.5 A 55).
"""
return struct.pack(
">hhhh",
int(l_rpm_x10),
int(r_rpm_x10),
int(l_cur_x10),
int(r_cur_x10),
int(left_rpm_x10),
int(right_rpm_x10),
int(left_current_x10),
int(right_current_x10),
)
def build_vesc_status1(
erpm: int = 0,
def build_vesc_status(
rpm: int = 0,
current_x10: int = 0,
duty_x1000: int = 0,
) -> bytes:
"""Build VESC STATUS_1 payload (8 bytes, big-endian)."""
return struct.pack(">ihh", int(erpm), int(current_x10), int(duty_x1000))
"""
Build a VESC STATUS (packet type 9) payload.
Layout (from vesc_can.h / VESC FW 6.x, big-endian):
int32 rpm
int16 current × 10
int16 duty × 1000
Total: 8 bytes.
"""
return struct.pack(
">ihh",
int(rpm),
int(current_x10),
int(duty_x1000),
)
# ---------------------------------------------------------------------------
# Frame parsers
# ---------------------------------------------------------------------------
def parse_drive_cmd(data: bytes) -> Tuple[float, float]:
"""Parse ORIN_CMD_DRIVE payload → (left_mps, right_mps)."""
if len(data) < 8:
raise ValueError(f"ORIN_CMD_DRIVE needs 8 bytes, got {len(data)}")
return struct.unpack("<ff", data[:8])
def parse_fc_status(data: bytes):
"""Parse FC_STATUS (0x400) payload."""
"""
Parse an FC_STATUS (0x400) payload.
Returns
-------
dict with keys: pitch_x10, motor_cmd, vbat_mv, balance_state, flags,
estop_active (bool), armed (bool)
"""
if len(data) < 8:
raise ValueError(f"FC_STATUS needs 8 bytes, got {len(data)}")
pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack(">hHHBB", data[:8])
pitch_x10, motor_cmd, vbat_mv, balance_state, flags = struct.unpack(
">hhHBB", data[:8]
)
return {
"pitch_x10": pitch_x10,
"pitch_deg": pitch_x10 / 10.0,
"motor_cmd": motor_cmd,
"vbat_mv": vbat_mv,
"state": state,
"balance_state": balance_state,
"flags": flags,
"estop_active": bool(flags & 0x01),
"armed": bool(flags & 0x02),
}
def parse_vesc_status1(data: bytes):
"""Parse VESC STATUS_1 payload."""
def parse_fc_vesc(data: bytes):
"""
Parse an FC_VESC (0x401) payload.
Returns
-------
dict with keys: left_rpm_x10, right_rpm_x10, left_current_x10,
right_current_x10, left_rpm (float), right_rpm (float)
"""
if len(data) < 8:
raise ValueError(f"VESC STATUS_1 needs 8 bytes, got {len(data)}")
erpm, cur_x10, duty_x1000 = struct.unpack(">ihh", data[:8])
raise ValueError(f"FC_VESC needs 8 bytes, got {len(data)}")
left_rpm_x10, right_rpm_x10, left_cur_x10, right_cur_x10 = struct.unpack(
">hhhh", data[:8]
)
return {
"erpm": erpm,
"current": cur_x10 / 10.0,
"left_rpm_x10": left_rpm_x10,
"right_rpm_x10": right_rpm_x10,
"left_current_x10": left_cur_x10,
"right_current_x10": right_cur_x10,
"left_rpm": left_rpm_x10 * 10.0,
"right_rpm": right_rpm_x10 * 10.0,
}
def parse_vesc_status(data: bytes):
"""
Parse a VESC STATUS (packet type 9) payload.
Returns
-------
dict with keys: rpm, current_x10, duty_x1000, current (float), duty (float)
"""
if len(data) < 8:
raise ValueError(f"VESC STATUS needs 8 bytes, got {len(data)}")
rpm, current_x10, duty_x1000 = struct.unpack(">ihh", data[:8])
return {
"rpm": rpm,
"current_x10": current_x10,
"duty_x1000": duty_x1000,
"current": current_x10 / 10.0,
"duty": duty_x1000 / 1000.0,
}
def parse_velocity_cmd(data: bytes) -> Tuple[float, float]:
"""
Parse a MAMBA_CMD_VELOCITY payload (8 bytes, 2 × float32 big-endian).
Returns
-------
(left_mps, right_mps)
"""
if len(data) < 8:
raise ValueError(f"MAMBA_CMD_VELOCITY needs 8 bytes, got {len(data)}")
return struct.unpack(">ff", data[:8])

View File

@ -1,20 +1,20 @@
from setuptools import find_packages, setup
from setuptools import setup
package_name = "saltybot_can_e2e_test"
setup(
name=package_name,
version="0.0.1",
packages=find_packages(exclude=["test"]),
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="seb",
maintainer_email="seb@example.com",
description="Orin <-> ESP32 BALANCE <-> VESC end-to-end CAN integration tests",
maintainer="sl-jetson",
maintainer_email="sl-jetson@saltylab.local",
description="End-to-end CAN integration tests for Orin↔Mamba↔VESC full loop",
license="MIT",
tests_require=["pytest"],
entry_points={

View File

@ -1,27 +1,93 @@
#!/usr/bin/env python3
"""
conftest.py shared pytest fixtures for saltybot CAN end-to-end tests.
conftest.py pytest fixtures for the saltybot_can_e2e_test suite.
No ROS2 node infrastructure is started; all tests run purely in Python.
"""
import sys
import os
# Ensure the package root is on sys.path so relative imports work when running
# pytest directly from the saltybot_can_e2e_test/ directory.
_pkg_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _pkg_root not in sys.path:
sys.path.insert(0, _pkg_root)
# Also add the saltybot_can_bridge package so we can import mamba_protocol.
_bridge_pkg = os.path.join(
os.path.dirname(_pkg_root), "saltybot_can_bridge"
)
if _bridge_pkg not in sys.path:
sys.path.insert(0, _bridge_pkg)
import pytest
from saltybot_can_bridge.balance_protocol import (
encode_drive_cmd,
encode_mode_cmd,
encode_estop_cmd,
decode_fc_status,
decode_fc_vesc,
decode_vesc_status1,
from saltybot_can_e2e_test.can_mock import MockCANBus
from saltybot_can_e2e_test.protocol_defs import (
VESC_CAN_ID_LEFT,
VESC_CAN_ID_RIGHT,
)
@pytest.fixture
# ---------------------------------------------------------------------------
# Core fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="function")
def mock_can_bus():
"""
Provide a fresh MockCANBus instance per test function.
The bus is automatically shut down after each test.
"""
bus = MockCANBus(loopback=False)
yield bus
bus.shutdown()
@pytest.fixture(scope="function")
def loopback_can_bus():
"""
MockCANBus in loopback mode sent frames are also queued for recv.
Useful for testing round-trip behaviour without a second node.
"""
bus = MockCANBus(loopback=True)
yield bus
bus.shutdown()
@pytest.fixture(scope="function")
def bridge_components():
"""Return a dict of encode/decode helpers from balance_protocol."""
"""
Return the mamba_protocol encode/decode callables and a fresh mock bus.
Yields a dict with keys:
bus MockCANBus instance
encode_vel encode_velocity_cmd(left, right) bytes
encode_mode encode_mode_cmd(mode) bytes
encode_estop encode_estop_cmd(stop) bytes
decode_vesc decode_vesc_state(data) VescStateTelemetry
"""
from saltybot_can_bridge.mamba_protocol import (
encode_velocity_cmd,
encode_mode_cmd,
encode_estop_cmd,
decode_vesc_state,
decode_battery_telem,
decode_imu_telem,
)
bus = MockCANBus(loopback=False)
yield {
"encode_drive": encode_drive_cmd,
"bus": bus,
"encode_vel": encode_velocity_cmd,
"encode_mode": encode_mode_cmd,
"encode_estop": encode_estop_cmd,
"decode_fc_status": decode_fc_status,
"decode_fc_vesc": decode_fc_vesc,
"decode_vesc": decode_vesc_status1,
"decode_vesc": decode_vesc_state,
"decode_battery": decode_battery_telem,
"decode_imu": decode_imu_telem,
"left_vesc_id": VESC_CAN_ID_LEFT,
"right_vesc_id": VESC_CAN_ID_RIGHT,
}
bus.shutdown()

View File

@ -1,47 +1,193 @@
#!/usr/bin/env python3
"""
test_drive_command.py E2E tests for ORIN_CMD_DRIVE (0x300) encoding.
test_drive_command.py Integration tests for the drive command path.
Tests verify:
DRIVE cmd Mamba receives velocity command frame mock VESC status response
FC_VESC broadcast contains correct RPMs.
All tests run without real hardware or a running ROS2 system.
Run with: python -m pytest test/test_drive_command.py -v
"""
import struct
import pytest
from saltybot_can_e2e_test.protocol_defs import (
ORIN_CMD_DRIVE,
build_drive_cmd,
parse_drive_cmd,
MODE_IDLE,
MAMBA_CMD_VELOCITY,
MAMBA_CMD_MODE,
FC_VESC,
MODE_DRIVE,
MODE_IDLE,
VESC_CAN_ID_LEFT,
VESC_CAN_ID_RIGHT,
VESC_STATUS_ID,
build_velocity_cmd,
build_fc_vesc,
build_vesc_status,
parse_velocity_cmd,
parse_fc_vesc,
)
from saltybot_can_bridge.mamba_protocol import (
encode_velocity_cmd,
encode_mode_cmd,
)
class TestDriveCommandEncoding:
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def test_frame_id(self):
assert ORIN_CMD_DRIVE == 0x300
def _send_drive(bus, left_mps: float, right_mps: float) -> None:
"""Simulate the bridge encoding and sending a velocity command."""
from saltybot_can_e2e_test.can_mock import MockCANBus
def test_zero_drive(self):
payload = build_drive_cmd(0.0, 0.0)
left, right = parse_drive_cmd(payload)
payload = encode_velocity_cmd(left_mps, right_mps)
# Create a minimal message object compatible with our mock
class _Msg:
def __init__(self, arb_id, data):
self.arbitration_id = arb_id
self.data = bytearray(data)
self.is_extended_id = False
bus.send(_Msg(MAMBA_CMD_VELOCITY, payload))
bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE)))
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestDriveForward:
def test_drive_forward_velocity_frame_sent(self, mock_can_bus):
"""
Inject DRIVE cmd (1.0 m/s, 1.0 m/s) verify Mamba receives
a MAMBA_CMD_VELOCITY frame with correct payload.
"""
_send_drive(mock_can_bus, 1.0, 1.0)
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 1, "Expected exactly one velocity command frame"
left, right = parse_velocity_cmd(bytes(vel_frames[0].data))
assert abs(left - 1.0) < 1e-4, f"Left speed {left} != 1.0"
assert abs(right - 1.0) < 1e-4, f"Right speed {right} != 1.0"
def test_drive_forward_mode_drive_sent(self, mock_can_bus):
"""After a drive command, a MODE=drive frame must accompany it."""
_send_drive(mock_can_bus, 1.0, 1.0)
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert len(mode_frames) >= 1, "Expected at least one MODE frame"
assert bytes(mode_frames[0].data) == bytes([MODE_DRIVE])
def test_drive_forward_fc_vesc_broadcast(self, mock_can_bus):
"""
Simulate FC_VESC broadcast arriving after drive cmd; verify parse is correct.
(In the real loop Mamba computes RPM from m/s and broadcasts FC_VESC.)
This test checks the FC_VESC frame format and parser.
"""
# Simulate: 1.0 m/s → ~300 RPM × 10 = 3000 (representative, not physics)
fc_payload = build_fc_vesc(
left_rpm_x10=300, right_rpm_x10=300,
left_current_x10=50, right_current_x10=50,
)
mock_can_bus.inject(FC_VESC, fc_payload)
frame = mock_can_bus.recv(timeout=0.1)
assert frame is not None, "FC_VESC frame not received"
parsed = parse_fc_vesc(bytes(frame.data))
assert parsed["left_rpm_x10"] == 300
assert parsed["right_rpm_x10"] == 300
assert abs(parsed["left_rpm"] - 3000.0) < 0.1
class TestDriveTurn:
def test_drive_turn_differential_rpm(self, mock_can_bus):
"""
DRIVE cmd (0.5, 0.5) verify differential RPM in velocity command.
"""
_send_drive(mock_can_bus, 0.5, -0.5)
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 1
left, right = parse_velocity_cmd(bytes(vel_frames[0].data))
assert abs(left - 0.5) < 1e-4, f"Left speed {left} != 0.5"
assert abs(right - (-0.5)) < 1e-4, f"Right speed {right} != -0.5"
# Signs must be opposite for a zero-radius spin
assert left > 0 and right < 0
def test_drive_turn_fc_vesc_differential(self, mock_can_bus):
"""Simulated FC_VESC for a turn has opposite-sign RPMs."""
fc_payload = build_fc_vesc(
left_rpm_x10=150, right_rpm_x10=-150,
left_current_x10=30, right_current_x10=30,
)
mock_can_bus.inject(FC_VESC, fc_payload)
frame = mock_can_bus.recv(timeout=0.1)
parsed = parse_fc_vesc(bytes(frame.data))
assert parsed["left_rpm_x10"] > 0
assert parsed["right_rpm_x10"] < 0
class TestDriveZero:
def test_drive_zero_stops_motors(self, mock_can_bus):
"""
After a non-zero drive cmd, sending zero velocity must result in
RPM=0 being commanded to both VESCs.
"""
_send_drive(mock_can_bus, 1.0, 1.0)
mock_can_bus.reset() # clear prior frames
_send_drive(mock_can_bus, 0.0, 0.0)
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 1
left, right = parse_velocity_cmd(bytes(vel_frames[0].data))
assert abs(left) < 1e-5, "Left motor not stopped"
assert abs(right) < 1e-5, "Right motor not stopped"
class TestDriveCmdTimeout:
def test_drive_cmd_timeout_sends_zero(self, mock_can_bus):
"""
Simulate the watchdog behaviour: if no DRIVE cmd arrives for >500 ms,
zero velocity is sent. We test the encoding directly (without timers).
"""
# The watchdog in CanBridgeNode calls encode_velocity_cmd(0.0, 0.0) and
# sends it on MAMBA_CMD_VELOCITY. Replicate that here.
zero_payload = encode_velocity_cmd(0.0, 0.0)
class _Msg:
def __init__(self, arb_id, data):
self.arbitration_id = arb_id
self.data = bytearray(data)
self.is_extended_id = False
mock_can_bus.send(_Msg(MAMBA_CMD_VELOCITY, zero_payload))
mock_can_bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE)))
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 1
left, right = parse_velocity_cmd(bytes(vel_frames[0].data))
assert abs(left) < 1e-5
assert abs(right) < 1e-5
def test_forward(self):
payload = build_drive_cmd(1.5, 1.5)
left, right = parse_drive_cmd(payload)
assert abs(left - 1.5) < 1e-4
assert abs(right - 1.5) < 1e-4
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert len(mode_frames) == 1
assert bytes(mode_frames[0].data) == bytes([MODE_IDLE])
def test_differential(self):
payload = build_drive_cmd(-0.5, 0.5)
left, right = parse_drive_cmd(payload)
assert abs(left - (-0.5)) < 1e-4
assert abs(right - 0.5) < 1e-4
def test_payload_is_little_endian(self):
# First 4 bytes encode left speed in LE float32
payload = build_drive_cmd(1.0, 0.0)
(left,) = struct.unpack("<f", payload[:4])
assert abs(left - 1.0) < 1e-5
def test_payload_length(self):
assert len(build_drive_cmd(1.0, -1.0)) == 8
@pytest.mark.parametrize("left_mps,right_mps", [
(0.5, 0.5),
(1.0, 0.0),
(0.0, -1.0),
(-0.5, -0.5),
])
def test_drive_cmd_payload_roundtrip(mock_can_bus, left_mps, right_mps):
"""Parametrized: encode then decode must recover original velocities."""
payload = encode_velocity_cmd(left_mps, right_mps)
l, r = parse_velocity_cmd(payload)
assert abs(l - left_mps) < 1e-4
assert abs(r - right_mps) < 1e-4

View File

@ -1,52 +1,264 @@
#!/usr/bin/env python3
"""
test_estop.py E2E tests for ORIN_CMD_ESTOP (0x302) and FC_STATUS flags.
test_estop.py E-stop command integration tests.
Covers:
- ESTOP command halts motors immediately
- ESTOP persists: DRIVE commands ignored while ESTOP is active
- ESTOP clear restores normal drive operation
- Firmware-side estop via FC_STATUS flags is detected correctly
No ROS2 or real CAN hardware required.
Run with: python -m pytest test/test_estop.py -v
"""
import struct
import pytest
from saltybot_can_e2e_test.can_mock import MockCANBus
from saltybot_can_e2e_test.protocol_defs import (
MAMBA_CMD_VELOCITY,
MAMBA_CMD_MODE,
MAMBA_CMD_ESTOP,
ORIN_CMD_ESTOP,
FC_STATUS,
MODE_IDLE,
MODE_DRIVE,
MODE_ESTOP,
build_estop_cmd,
build_mode_cmd,
build_velocity_cmd,
build_fc_status,
parse_velocity_cmd,
parse_fc_status,
)
from saltybot_can_bridge.mamba_protocol import (
encode_velocity_cmd,
encode_mode_cmd,
encode_estop_cmd,
)
class TestEstopAssert:
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def test_frame_id(self):
assert ORIN_CMD_ESTOP == 0x302
class _Msg:
"""Minimal CAN message stand-in."""
def __init__(self, arb_id: int, data: bytes, is_extended_id: bool = False):
self.arbitration_id = arb_id
self.data = bytearray(data)
self.is_extended_id = is_extended_id
def test_stop_bit_set(self):
payload = build_estop_cmd(stop=True, clear=False)
assert payload[0] & 0x01 == 0x01
def test_stop_bit_clear_not_set(self):
payload = build_estop_cmd(stop=True, clear=False)
assert payload[0] & 0x02 == 0x00
class EstopStateMachine:
"""
Lightweight state machine that mirrors the bridge estop logic.
Tracks whether ESTOP is active and gates velocity commands accordingly.
Sends frames to the supplied MockCANBus.
"""
def __init__(self, bus: MockCANBus):
self._bus = bus
self._estop_active = False
self._mode = MODE_IDLE
def assert_estop(self) -> None:
"""Send ESTOP and transition to estop mode."""
self._estop_active = True
self._mode = MODE_ESTOP
self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.0, 0.0)))
self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP)))
self._bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(True)))
def clear_estop(self) -> None:
"""Clear ESTOP and return to IDLE mode."""
self._estop_active = False
self._mode = MODE_IDLE
self._bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(False)))
self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE)))
def send_drive(self, left_mps: float, right_mps: float) -> None:
"""Send velocity command only if ESTOP is not active."""
if self._estop_active:
# Bridge silently drops commands while estopped
return
self._mode = MODE_DRIVE
self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(left_mps, right_mps)))
self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE)))
@property
def estop_active(self) -> bool:
return self._estop_active
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestEstopHaltsMotors:
def test_estop_command_halts_motors(self, mock_can_bus):
"""
After injecting ESTOP, zero velocity must be commanded immediately.
"""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) >= 1, "No velocity frame after ESTOP"
l, r = parse_velocity_cmd(bytes(vel_frames[-1].data))
assert abs(l) < 1e-5, f"Left motor {l} not zero after ESTOP"
assert abs(r) < 1e-5, f"Right motor {r} not zero after ESTOP"
def test_estop_mode_frame_sent(self, mock_can_bus):
"""ESTOP mode byte must be broadcast on CAN."""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert any(
bytes(f.data) == bytes([MODE_ESTOP]) for f in mode_frames
), "MODE=ESTOP not found in sent frames"
def test_estop_flag_byte_is_0x01(self, mock_can_bus):
"""MAMBA_CMD_ESTOP payload must be 0x01 when asserting e-stop."""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP)
assert len(estop_frames) >= 1
assert bytes(estop_frames[-1].data) == b"\x01", \
f"ESTOP payload {estop_frames[-1].data!r} != 0x01"
class TestEstopPersists:
def test_estop_persists_after_drive_cmd(self, mock_can_bus):
"""
After ESTOP, injecting a DRIVE command must NOT forward velocity to the bus.
"""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
mock_can_bus.reset() # start fresh after initial ESTOP frames
sm.send_drive(1.0, 1.0) # should be suppressed
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 0, \
"Velocity command was forwarded while ESTOP is active"
def test_estop_mode_unchanged_after_drive_attempt(self, mock_can_bus):
"""
After ESTOP, attempting DRIVE must not change the mode to DRIVE.
"""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
mock_can_bus.reset()
sm.send_drive(0.5, 0.5)
# No mode frames should have been emitted (drive was suppressed)
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert all(
bytes(f.data) != bytes([MODE_DRIVE]) for f in mode_frames
), "MODE=DRIVE was set despite active ESTOP"
class TestEstopClear:
def test_estop_clear_restores_drive(self, mock_can_bus):
"""After ESTOP_CLEAR, drive commands must be accepted again."""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
sm.clear_estop()
mock_can_bus.reset()
def test_clear_bit_set(self):
payload = build_estop_cmd(stop=False, clear=True)
assert payload[0] & 0x02 == 0x02
sm.send_drive(0.8, 0.8)
def test_stop_not_set_on_clear(self):
payload = build_estop_cmd(stop=False, clear=True)
assert payload[0] & 0x01 == 0x00
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 1, "Velocity command not sent after ESTOP clear"
l, r = parse_velocity_cmd(bytes(vel_frames[0].data))
assert abs(l - 0.8) < 1e-4
assert abs(r - 0.8) < 1e-4
def test_estop_clear_flag_byte_is_0x00(self, mock_can_bus):
"""MAMBA_CMD_ESTOP payload must be 0x00 when clearing e-stop."""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
sm.clear_estop()
estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP)
assert len(estop_frames) >= 2
# Last ESTOP frame should be the clear
assert bytes(estop_frames[-1].data) == b"\x00", \
f"ESTOP clear payload {estop_frames[-1].data!r} != 0x00"
def test_estop_clear_mode_returns_to_idle(self, mock_can_bus):
"""After clearing ESTOP, the mode frame must be MODE_IDLE."""
sm = EstopStateMachine(mock_can_bus)
sm.assert_estop()
sm.clear_estop()
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
last_mode = bytes(mode_frames[-1].data)
assert last_mode == bytes([MODE_IDLE]), \
f"Mode after ESTOP clear is {last_mode!r}, expected MODE_IDLE"
class TestAttitudeEstopFlag:
class TestFirmwareSideEstop:
def test_fc_status_estop_flag_detected(self, mock_can_bus):
"""
Simulate firmware sending estop via FC_STATUS flags (bit0=estop_active).
Verify the Orin bridge side correctly parses the flag.
"""
# Build FC_STATUS with estop_active bit set (flags=0x01)
payload = build_fc_status(
pitch_x10=0,
motor_cmd=0,
vbat_mv=24000,
balance_state=2, # TILT_FAULT
flags=0x01, # bit0 = estop_active
)
mock_can_bus.inject(FC_STATUS, payload)
def test_estop_active_flag_in_fc_status(self):
# FC_STATUS flags bit0 = estop active
raw = build_fc_status(flags=0x01)
status = parse_fc_status(raw)
assert status["flags"] & 0x01 == 0x01
frame = mock_can_bus.recv(timeout=0.1)
assert frame is not None, "FC_STATUS frame not received"
parsed = parse_fc_status(bytes(frame.data))
assert parsed["estop_active"] is True, \
"estop_active flag not set in FC_STATUS"
assert parsed["balance_state"] == 2
def test_no_estop_flag(self):
raw = build_fc_status(flags=0x00)
status = parse_fc_status(raw)
assert status["flags"] & 0x01 == 0x00
def test_fc_status_no_estop_flag(self, mock_can_bus):
"""FC_STATUS with flags=0x00 must NOT set estop_active."""
payload = build_fc_status(flags=0x00)
mock_can_bus.inject(FC_STATUS, payload)
frame = mock_can_bus.recv(timeout=0.1)
parsed = parse_fc_status(bytes(frame.data))
assert parsed["estop_active"] is False
def test_fc_status_armed_flag_detected(self, mock_can_bus):
"""FC_STATUS flags bit1=armed must parse correctly."""
payload = build_fc_status(flags=0x02) # bit1 = armed
mock_can_bus.inject(FC_STATUS, payload)
frame = mock_can_bus.recv(timeout=0.1)
parsed = parse_fc_status(bytes(frame.data))
assert parsed["armed"] is True
assert parsed["estop_active"] is False
def test_fc_status_roundtrip(self, mock_can_bus):
"""build_fc_status → inject → recv → parse_fc_status must be identity."""
payload = build_fc_status(
pitch_x10=150,
motor_cmd=-200,
vbat_mv=23800,
balance_state=1,
flags=0x03,
)
mock_can_bus.inject(FC_STATUS, payload)
frame = mock_can_bus.recv(timeout=0.1)
parsed = parse_fc_status(bytes(frame.data))
assert parsed["pitch_x10"] == 150
assert parsed["motor_cmd"] == -200
assert parsed["vbat_mv"] == 23800
assert parsed["balance_state"] == 1
assert parsed["estop_active"] is True
assert parsed["armed"] is True

View File

@ -1,78 +1,315 @@
#!/usr/bin/env python3
"""
test_fc_vesc_broadcast.py E2E tests for ESP32 BALANCE telemetry parsing.
test_fc_vesc_broadcast.py FC_VESC broadcast and VESC STATUS integration tests.
Covers:
- VESC STATUS_1 extended frames (arb_id = (9 << 8) | node_id)
- FC_STATUS (0x400) attitude/battery frame
- FC_VESC (0x401) VESC RPM/current broadcast frame
- VESC STATUS extended frame for left VESC (ID 56) triggers FC_VESC broadcast
- Both left (56) and right (68) VESC STATUS combined in FC_VESC
- FC_VESC broadcast rate (~10 Hz)
- current_x10 scaling matches protocol spec
No ROS2 or real CAN hardware required.
Run with: python -m pytest test/test_fc_vesc_broadcast.py -v
"""
import struct
import time
import threading
import pytest
from saltybot_can_e2e_test.can_mock import MockCANBus
from saltybot_can_e2e_test.protocol_defs import (
FC_STATUS,
FC_VESC,
VESC_CAN_ID_LEFT,
VESC_CAN_ID_RIGHT,
VESC_STATUS1_ID,
build_fc_status,
VESC_STATUS_ID,
VESC_SET_RPM_ID,
VESC_TELEM_STATE,
build_vesc_status,
build_fc_vesc,
build_vesc_status1,
parse_fc_status,
parse_vesc_status1,
parse_fc_vesc,
parse_vesc_status,
)
from saltybot_can_bridge.mamba_protocol import (
VESC_TELEM_STATE as BRIDGE_VESC_TELEM_STATE,
decode_vesc_state,
)
class TestVescStatusParsing:
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def test_status1_id_left(self):
arb_id = VESC_STATUS1_ID(VESC_CAN_ID_LEFT)
assert arb_id == (9 << 8) | 56
class VescStatusAggregator:
"""
Simulates the firmware logic that:
1. Receives VESC STATUS extended frames from left/right VESCs
2. Builds an FC_VESC broadcast payload
3. Injects the FC_VESC frame onto the mock bus
def test_status1_id_right(self):
arb_id = VESC_STATUS1_ID(VESC_CAN_ID_RIGHT)
assert arb_id == (9 << 8) | 68
This represents the Mamba Orin telemetry path.
"""
def test_parse_vesc_status1_nominal(self):
raw = build_vesc_status1(erpm=3000, current_x10=55, duty_x1000=250)
result = parse_vesc_status1(raw)
assert abs(result["erpm"] - 3000) < 1
assert abs(result["current"] - 5.5) < 0.01
assert abs(result["duty"] - 0.25) < 0.001
def __init__(self, bus: MockCANBus):
self._bus = bus
self._left_rpm_x10 = 0
self._right_rpm_x10 = 0
self._left_current_x10 = 0
self._right_current_x10 = 0
self._left_seen = False
self._right_seen = False
def test_parse_vesc_status1_reverse(self):
raw = build_vesc_status1(erpm=-1500, current_x10=-20, duty_x1000=-150)
result = parse_vesc_status1(raw)
assert result["erpm"] == -1500
assert abs(result["duty"] - (-0.15)) < 0.001
def process_vesc_status(self, arb_id: int, data: bytes) -> None:
"""
Process an incoming VESC STATUS frame (extended 29-bit ID).
Updates internal state; broadcasts FC_VESC when at least one side is known.
"""
node_id = arb_id & 0xFF
parsed = parse_vesc_status(data)
rpm_x10 = parsed["rpm"] // 10 # convert full RPM to RPM/10
if node_id == VESC_CAN_ID_LEFT:
self._left_rpm_x10 = rpm_x10
self._left_current_x10 = parsed["current_x10"]
self._left_seen = True
elif node_id == VESC_CAN_ID_RIGHT:
self._right_rpm_x10 = rpm_x10
self._right_current_x10 = parsed["current_x10"]
self._right_seen = True
# Broadcast FC_VESC whenever we receive any update
self._broadcast_fc_vesc()
def _broadcast_fc_vesc(self) -> None:
payload = build_fc_vesc(
left_rpm_x10=self._left_rpm_x10,
right_rpm_x10=self._right_rpm_x10,
left_current_x10=self._left_current_x10,
right_current_x10=self._right_current_x10,
)
self._bus.inject(FC_VESC, payload)
class TestFcStatusTelemetry:
def test_frame_id(self):
assert FC_STATUS == 0x400
def test_nominal(self):
raw = build_fc_status(pitch_x10=125, motor_cmd=300, vbat_mv=24000, state=1, flags=0)
status = parse_fc_status(raw)
assert abs(status["pitch_deg"] - 12.5) < 0.01
assert status["vbat_mv"] == 24000
assert status["state"] == 1
def test_zero(self):
raw = build_fc_status(pitch_x10=0, motor_cmd=0, vbat_mv=0, state=0, flags=0)
status = parse_fc_status(raw)
assert status["pitch_x10"] == 0
def _inject_vesc_status(bus: MockCANBus, vesc_id: int, rpm: int,
current_x10: int = 50, duty_x1000: int = 250) -> None:
"""Inject a VESC STATUS extended frame for the given node ID."""
arb_id = VESC_STATUS_ID(vesc_id)
payload = build_vesc_status(rpm=rpm, current_x10=current_x10, duty_x1000=duty_x1000)
bus.inject(arb_id, payload, is_extended_id=True)
class TestFcVescTelemetry:
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_frame_id(self):
assert FC_VESC == 0x401
class TestVescStatusToFcVesc:
def test_left_vesc_status_triggers_broadcast(self, mock_can_bus):
"""
Inject VESC STATUS for left VESC (ID 56) verify FC_VESC contains
the correct left RPM (rpm / 10).
"""
agg = VescStatusAggregator(mock_can_bus)
def test_nominal(self):
raw = build_fc_vesc(l_rpm_x10=300, r_rpm_x10=-300, l_cur_x10=55, r_cur_x10=-55)
l_rpm, r_rpm, l_cur, r_cur = struct.unpack(">hhhh", raw)
assert l_rpm == 300
assert r_rpm == -300
# Left VESC: 3000 RPM → rpm_x10 = 300
arb_id = VESC_STATUS_ID(VESC_CAN_ID_LEFT)
payload = build_vesc_status(rpm=3000, current_x10=55)
agg.process_vesc_status(arb_id, payload)
frame = mock_can_bus.recv(timeout=0.1)
assert frame is not None, "No FC_VESC broadcast after left VESC STATUS"
parsed = parse_fc_vesc(bytes(frame.data))
assert parsed["left_rpm_x10"] == 300, \
f"left_rpm_x10 {parsed['left_rpm_x10']} != 300"
assert abs(parsed["left_rpm"] - 3000.0) < 1.0
def test_right_vesc_status_triggers_broadcast(self, mock_can_bus):
"""Inject VESC STATUS for right VESC (ID 68) → verify right RPM in FC_VESC."""
agg = VescStatusAggregator(mock_can_bus)
arb_id = VESC_STATUS_ID(VESC_CAN_ID_RIGHT)
payload = build_vesc_status(rpm=2000, current_x10=40)
agg.process_vesc_status(arb_id, payload)
frame = mock_can_bus.recv(timeout=0.1)
assert frame is not None
parsed = parse_fc_vesc(bytes(frame.data))
assert parsed["right_rpm_x10"] == 200
def test_left_vesc_id_matches_constant(self):
"""VESC_STATUS_ID(56) must equal (9 << 8) | 56 = 0x938."""
assert VESC_STATUS_ID(VESC_CAN_ID_LEFT) == (9 << 8) | 56
assert VESC_STATUS_ID(VESC_CAN_ID_LEFT) == 0x938
def test_right_vesc_id_matches_constant(self):
"""VESC_STATUS_ID(68) must equal (9 << 8) | 68 = 0x944."""
assert VESC_STATUS_ID(VESC_CAN_ID_RIGHT) == (9 << 8) | 68
assert VESC_STATUS_ID(VESC_CAN_ID_RIGHT) == 0x944
class TestBothVescStatusCombined:
def test_both_vesc_status_combined_in_fc_vesc(self, mock_can_bus):
"""
Inject both left (56) and right (68) VESC STATUS frames.
Final FC_VESC must contain both RPMs.
"""
agg = VescStatusAggregator(mock_can_bus)
# Left: 3000 RPM
agg.process_vesc_status(
VESC_STATUS_ID(VESC_CAN_ID_LEFT),
build_vesc_status(rpm=3000, current_x10=50),
)
# Right: -1500 RPM (reverse)
agg.process_vesc_status(
VESC_STATUS_ID(VESC_CAN_ID_RIGHT),
build_vesc_status(rpm=-1500, current_x10=30),
)
# Drain two FC_VESC frames (one per update), check the latest
frames = []
while True:
f = mock_can_bus.recv(timeout=0.05)
if f is None:
break
frames.append(f)
assert len(frames) >= 2, "Expected at least 2 FC_VESC frames"
# Last frame must have both sides
last = parse_fc_vesc(bytes(frames[-1].data))
assert last["left_rpm_x10"] == 300, \
f"left_rpm_x10 {last['left_rpm_x10']} != 300"
assert last["right_rpm_x10"] == -150, \
f"right_rpm_x10 {last['right_rpm_x10']} != -150"
def test_both_vesc_currents_combined(self, mock_can_bus):
"""Both current values must appear in FC_VESC after two STATUS frames."""
agg = VescStatusAggregator(mock_can_bus)
agg.process_vesc_status(
VESC_STATUS_ID(VESC_CAN_ID_LEFT),
build_vesc_status(rpm=1000, current_x10=55),
)
agg.process_vesc_status(
VESC_STATUS_ID(VESC_CAN_ID_RIGHT),
build_vesc_status(rpm=1000, current_x10=42),
)
frames = []
while True:
f = mock_can_bus.recv(timeout=0.05)
if f is None:
break
frames.append(f)
last = parse_fc_vesc(bytes(frames[-1].data))
assert last["left_current_x10"] == 55
assert last["right_current_x10"] == 42
class TestVescBroadcastRate:
def test_fc_vesc_broadcast_at_10hz(self, mock_can_bus):
"""
Simulate FC_VESC broadcasts at ~10 Hz and verify the rate.
We inject 12 frames over ~120 ms, then verify count and average interval.
"""
_FC_VESC_HZ = 10
_interval = 1.0 / _FC_VESC_HZ
timestamps = []
stop_event = threading.Event()
def broadcaster():
while not stop_event.is_set():
t = time.monotonic()
mock_can_bus.inject(
FC_VESC,
build_fc_vesc(100, -100, 30, 30),
timestamp=t,
)
timestamps.append(t)
time.sleep(_interval)
t = threading.Thread(target=broadcaster, daemon=True)
t.start()
time.sleep(0.15) # collect ~1.5 broadcasts
stop_event.set()
t.join(timeout=0.2)
assert len(timestamps) >= 1, "No FC_VESC broadcasts in 150 ms window"
if len(timestamps) >= 2:
intervals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
avg = sum(intervals) / len(intervals)
# ±40 ms tolerance for OS scheduling
assert 0.06 <= avg <= 0.14, \
f"FC_VESC broadcast interval {avg*1000:.1f} ms not ~100 ms"
def test_fc_vesc_frame_is_8_bytes(self):
"""FC_VESC payload must always be exactly 8 bytes."""
payload = build_fc_vesc(300, -150, 55, 42)
assert len(payload) == 8
class TestVescCurrentScaling:
def test_current_x10_scaling(self, mock_can_bus):
"""
Verify current_x10 scaling: 5.5 A current_x10=55.
build_vesc_status stores current_x10 directly; parse_vesc_status
returns current = current_x10 / 10.
"""
payload = build_vesc_status(rpm=1000, current_x10=55, duty_x1000=250)
parsed = parse_vesc_status(payload)
assert parsed["current_x10"] == 55
assert abs(parsed["current"] - 5.5) < 0.01
def test_current_negative_scaling(self, mock_can_bus):
"""Negative current (regen) must scale correctly."""
payload = build_vesc_status(rpm=-500, current_x10=-30)
parsed = parse_vesc_status(payload)
assert parsed["current_x10"] == -30
assert abs(parsed["current"] - (-3.0)) < 0.01
def test_fc_vesc_current_x10_roundtrip(self, mock_can_bus):
"""build_fc_vesc → inject → recv → parse must preserve current_x10."""
payload = build_fc_vesc(
left_rpm_x10=200,
right_rpm_x10=200,
left_current_x10=55,
right_current_x10=42,
)
mock_can_bus.inject(FC_VESC, payload)
frame = mock_can_bus.recv(timeout=0.1)
parsed = parse_fc_vesc(bytes(frame.data))
assert parsed["left_current_x10"] == 55
assert parsed["right_current_x10"] == 42
@pytest.mark.parametrize("vesc_id", [VESC_CAN_ID_LEFT, VESC_CAN_ID_RIGHT])
def test_vesc_status_id_both_nodes(self, vesc_id, mock_can_bus):
"""
VESC_STATUS_ID(vesc_id) must produce the correct 29-bit extended arb_id
for both the left (56) and right (68) node IDs.
"""
expected = (9 << 8) | vesc_id
assert VESC_STATUS_ID(vesc_id) == expected
@pytest.mark.parametrize("vesc_id,rpm,expected_rpm_x10", [
(VESC_CAN_ID_LEFT, 3000, 300),
(VESC_CAN_ID_LEFT, -1500, -150),
(VESC_CAN_ID_RIGHT, 2000, 200),
(VESC_CAN_ID_RIGHT, 0, 0),
])
def test_rpm_x10_conversion_parametrized(
self, mock_can_bus, vesc_id, rpm, expected_rpm_x10
):
"""Parametrized: verify rpm → rpm_x10 conversion for both VESCs."""
agg = VescStatusAggregator(mock_can_bus)
agg.process_vesc_status(
VESC_STATUS_ID(vesc_id),
build_vesc_status(rpm=rpm),
)
frame = mock_can_bus.recv(timeout=0.05)
assert frame is not None
parsed = parse_fc_vesc(bytes(frame.data))
if vesc_id == VESC_CAN_ID_LEFT:
assert parsed["left_rpm_x10"] == expected_rpm_x10, \
f"left_rpm_x10={parsed['left_rpm_x10']} expected {expected_rpm_x10}"
else:
assert parsed["right_rpm_x10"] == expected_rpm_x10, \
f"right_rpm_x10={parsed['right_rpm_x10']} expected {expected_rpm_x10}"

View File

@ -1,51 +1,238 @@
#!/usr/bin/env python3
"""
test_heartbeat_timeout.py E2E tests for watchdog / heartbeat timeout behavior.
test_heartbeat_timeout.py Tests for heartbeat loss and recovery.
When no drive command is received within the timeout period, the bridge should
send zero-velocity ORIN_CMD_DRIVE with MODE_IDLE.
Covers:
- Heartbeat loss triggers e-stop escalation (timeout logic)
- Heartbeat recovery restores previous mode
- Heartbeat interval is sent at ~100 ms
No ROS2 or real CAN hardware required.
Run with: python -m pytest test/test_heartbeat_timeout.py -v
"""
import time
import struct
import threading
import pytest
from saltybot_can_e2e_test.can_mock import MockCANBus
from saltybot_can_e2e_test.protocol_defs import (
ORIN_CMD_DRIVE,
ORIN_CMD_MODE,
ORIN_CMD_HEARTBEAT,
ORIN_CMD_ESTOP,
ORIN_CMD_MODE,
MAMBA_CMD_VELOCITY,
MAMBA_CMD_MODE,
MAMBA_CMD_ESTOP,
MODE_IDLE,
build_drive_cmd,
MODE_DRIVE,
MODE_ESTOP,
build_heartbeat,
build_estop_cmd,
build_mode_cmd,
parse_drive_cmd,
build_velocity_cmd,
parse_velocity_cmd,
)
from saltybot_can_bridge.mamba_protocol import (
encode_velocity_cmd,
encode_mode_cmd,
encode_estop_cmd,
)
# Heartbeat timeout from orin_can.h: 500 ms
ORIN_HB_TIMEOUT_MS = 500
ORIN_HB_TIMEOUT_S = ORIN_HB_TIMEOUT_MS / 1000.0
class TestWatchdogZeroVelocity:
def test_watchdog_sends_zero_drive(self):
"""Watchdog must send zero-velocity drive command."""
payload = build_drive_cmd(0.0, 0.0)
left, right = parse_drive_cmd(payload)
assert abs(left) < 1e-5
assert abs(right) < 1e-5
assert len(payload) == 8
def test_watchdog_drive_id(self):
assert ORIN_CMD_DRIVE == 0x300
def test_watchdog_mode_idle(self):
payload = build_mode_cmd(MODE_IDLE)
assert payload == b"\x00"
def test_watchdog_mode_id(self):
assert ORIN_CMD_MODE == 0x301
# Expected heartbeat interval
HB_INTERVAL_MS = 100
HB_INTERVAL_S = HB_INTERVAL_MS / 1000.0
class TestEstopOnTimeout:
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def test_estop_frame_id(self):
assert ORIN_CMD_ESTOP == 0x302
class HeartbeatSimulator:
"""
Simulate periodic heartbeat injection into a MockCANBus.
def test_estop_stop_flag(self):
from saltybot_can_e2e_test.protocol_defs import build_estop_cmd
payload = build_estop_cmd(stop=True, clear=False)
assert payload[0] & 0x01 == 0x01
Call start() to begin sending heartbeats every interval_s.
Call stop() to cease after ORIN_HB_TIMEOUT_S the firmware would declare
Orin offline.
"""
def __init__(self, bus: MockCANBus, interval_s: float = HB_INTERVAL_S):
self._bus = bus
self._interval_s = interval_s
self._seq = 0
self._running = False
self._thread: threading.Thread | None = None
def start(self):
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._running = False
def _run(self):
while self._running:
self._bus.inject(
ORIN_CMD_HEARTBEAT,
build_heartbeat(self._seq),
is_extended_id=False,
)
self._seq += 1
time.sleep(self._interval_s)
def _simulate_estop_on_timeout(bus: MockCANBus) -> None:
"""
Simulate the firmware-side logic: when heartbeat timeout expires,
the FC sends an e-stop command by setting estop mode on the Mamba bus.
We model this as the bridge sending zero velocity + ESTOP mode.
"""
class _Msg:
def __init__(self, arb_id, data):
self.arbitration_id = arb_id
self.data = bytearray(data)
self.is_extended_id = False
bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.0, 0.0)))
bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP)))
bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(True)))
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestHeartbeatLoss:
def test_heartbeat_loss_triggers_estop(self, mock_can_bus):
"""
After heartbeat ceases, the bridge must command zero velocity and
set ESTOP mode. We simulate this directly using _simulate_estop_on_timeout.
"""
# First confirm the bus is clean
assert len(mock_can_bus.get_sent_frames()) == 0
# Simulate bridge detecting timeout and escalating
_simulate_estop_on_timeout(mock_can_bus)
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) >= 1, "Zero velocity not sent after timeout"
l, r = parse_velocity_cmd(bytes(vel_frames[-1].data))
assert abs(l) < 1e-5, "Left not zero on timeout"
assert abs(r) < 1e-5, "Right not zero on timeout"
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert any(
bytes(f.data) == bytes([MODE_ESTOP]) for f in mode_frames
), "ESTOP mode not asserted on heartbeat timeout"
estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP)
assert len(estop_frames) >= 1, "ESTOP command not sent"
assert bytes(estop_frames[0].data) == b"\x01"
def test_heartbeat_loss_zero_velocity(self, mock_can_bus):
"""Zero velocity frame must appear among sent frames after timeout."""
_simulate_estop_on_timeout(mock_can_bus)
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) >= 1
for f in vel_frames:
l, r = parse_velocity_cmd(bytes(f.data))
assert abs(l) < 1e-5
assert abs(r) < 1e-5
class TestHeartbeatRecovery:
def test_heartbeat_recovery_restores_drive_mode(self, mock_can_bus):
"""
After heartbeat loss + recovery, drive commands must be accepted again.
We simulate: ESTOP clear estop send drive verify velocity frame.
"""
class _Msg:
def __init__(self, arb_id, data):
self.arbitration_id = arb_id
self.data = bytearray(data)
self.is_extended_id = False
# Phase 1: timeout → estop
_simulate_estop_on_timeout(mock_can_bus)
mock_can_bus.reset()
# Phase 2: recovery — clear estop, restore drive mode
mock_can_bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(False)))
mock_can_bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE)))
mock_can_bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.5, 0.5)))
estop_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_ESTOP)
assert any(bytes(f.data) == b"\x00" for f in estop_frames), \
"ESTOP clear not sent on recovery"
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert any(
bytes(f.data) == bytes([MODE_DRIVE]) for f in mode_frames
), "DRIVE mode not restored after recovery"
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) >= 1
l, r = parse_velocity_cmd(bytes(vel_frames[-1].data))
assert abs(l - 0.5) < 1e-4
def test_heartbeat_sequence_increments(self, mock_can_bus):
"""Heartbeat payloads must have incrementing sequence numbers."""
payloads = []
for seq in range(5):
mock_can_bus.inject(
ORIN_CMD_HEARTBEAT,
build_heartbeat(seq),
is_extended_id=False,
)
for i in range(5):
frame = mock_can_bus.recv(timeout=0.05)
assert frame is not None
(seq_val,) = struct.unpack(">I", bytes(frame.data))
assert seq_val == i, f"Expected seq {i}, got {seq_val}"
class TestHeartbeatInterval:
def test_heartbeat_interval_approx_100ms(self, mock_can_bus):
"""
Start HeartbeatSimulator, collect timestamps over ~300 ms, and verify
the average interval is within 20% of 100 ms.
"""
sim = HeartbeatSimulator(mock_can_bus, interval_s=0.1)
sim.start()
time.sleep(0.35)
sim.stop()
timestamps = []
while True:
frame = mock_can_bus.recv(timeout=0.01)
if frame is None:
break
if frame.arbitration_id == ORIN_CMD_HEARTBEAT:
timestamps.append(frame.timestamp)
assert len(timestamps) >= 2, "Not enough heartbeat frames captured"
intervals = [
timestamps[i + 1] - timestamps[i]
for i in range(len(timestamps) - 1)
]
avg_interval = sum(intervals) / len(intervals)
# Allow ±30 ms tolerance (OS scheduling jitter in CI)
assert 0.07 <= avg_interval <= 0.13, \
f"Average heartbeat interval {avg_interval*1000:.1f} ms not ~100 ms"
def test_heartbeat_payload_is_4_bytes(self, mock_can_bus):
"""Heartbeat payload must be exactly 4 bytes (uint32 sequence)."""
for seq in (0, 1, 0xFFFFFFFF):
payload = build_heartbeat(seq)
assert len(payload) == 4, \
f"Heartbeat payload length {len(payload)} != 4"

View File

@ -1,63 +1,236 @@
#!/usr/bin/env python3
"""
test_mode_switching.py E2E tests for ORIN_CMD_MODE (0x301) encoding.
test_mode_switching.py Mode transition integration tests.
Covers:
- idle drive: drive commands become accepted
- drive estop: immediate motor stop
- MODE frame byte values match protocol constants
- Unknown mode byte is ignored (no state change)
No ROS2 or real CAN hardware required.
Run with: python -m pytest test/test_mode_switching.py -v
"""
import struct
import pytest
from saltybot_can_e2e_test.can_mock import MockCANBus
from saltybot_can_e2e_test.protocol_defs import (
ORIN_CMD_MODE,
MAMBA_CMD_VELOCITY,
MAMBA_CMD_MODE,
MAMBA_CMD_ESTOP,
MODE_IDLE,
MODE_DRIVE,
MODE_ESTOP,
build_mode_cmd,
build_velocity_cmd,
parse_velocity_cmd,
)
from saltybot_can_bridge.mamba_protocol import (
encode_velocity_cmd,
encode_mode_cmd,
encode_estop_cmd,
)
class TestModeCommandEncoding:
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def test_frame_id(self):
assert ORIN_CMD_MODE == 0x301
class _Msg:
def __init__(self, arb_id: int, data: bytes, is_extended_id: bool = False):
self.arbitration_id = arb_id
self.data = bytearray(data)
self.is_extended_id = is_extended_id
def test_idle_mode(self):
assert build_mode_cmd(MODE_IDLE) == b"\x00"
def test_drive_mode(self):
assert build_mode_cmd(MODE_DRIVE) == b"\x01"
class ModeStateMachine:
"""
Minimal state machine tracking mode transitions and gating commands.
"""
def test_estop_mode(self):
assert build_mode_cmd(MODE_ESTOP) == b"\x02"
def __init__(self, bus: MockCANBus):
self._bus = bus
self._mode = MODE_IDLE
def test_payload_length(self):
for m in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
assert len(build_mode_cmd(m)) == 1
def set_mode(self, mode: int) -> bool:
"""
Transition to mode. Returns True if accepted, False if invalid.
Invalid mode values (not 0, 1, 2) are ignored.
"""
if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
return False # silently ignore
prev_mode = self._mode
self._mode = mode
self._bus.send(_Msg(MAMBA_CMD_MODE, encode_mode_cmd(mode)))
# Side-effects of entering ESTOP from DRIVE
if mode == MODE_ESTOP and prev_mode == MODE_DRIVE:
self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(0.0, 0.0)))
self._bus.send(_Msg(MAMBA_CMD_ESTOP, encode_estop_cmd(True)))
return True
def send_drive(self, left_mps: float, right_mps: float) -> bool:
"""
Send a velocity command. Returns True if forwarded, False if blocked.
"""
if self._mode != MODE_DRIVE:
return False
self._bus.send(_Msg(MAMBA_CMD_VELOCITY, encode_velocity_cmd(left_mps, right_mps)))
return True
@property
def mode(self) -> int:
return self._mode
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestIdleToDrive:
def test_idle_to_drive_mode_frame(self, mock_can_bus):
"""Transitioning to DRIVE must emit a MODE=drive frame."""
sm = ModeStateMachine(mock_can_bus)
sm.set_mode(MODE_DRIVE)
def test_idle_to_drive_sequence(self):
idle = build_mode_cmd(MODE_IDLE)
drive = build_mode_cmd(MODE_DRIVE)
assert idle[0] < drive[0]
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert len(mode_frames) == 1
assert bytes(mode_frames[0].data) == bytes([MODE_DRIVE])
def test_mode_values_are_sequential(self):
assert MODE_IDLE == 0
assert MODE_DRIVE == 1
assert MODE_ESTOP == 2
def test_idle_blocks_drive_commands(self, mock_can_bus):
"""In IDLE mode, drive commands must be suppressed."""
sm = ModeStateMachine(mock_can_bus)
# Attempt drive without entering DRIVE mode
forwarded = sm.send_drive(1.0, 1.0)
assert forwarded is False, "Drive cmd should be blocked in IDLE mode"
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 0
def test_drive_mode_allows_commands(self, mock_can_bus):
"""After entering DRIVE mode, velocity commands must be forwarded."""
sm = ModeStateMachine(mock_can_bus)
sm.set_mode(MODE_DRIVE)
mock_can_bus.reset()
forwarded = sm.send_drive(0.5, 0.5)
assert forwarded is True
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 1
l, r = parse_velocity_cmd(bytes(vel_frames[0].data))
assert abs(l - 0.5) < 1e-4
assert abs(r - 0.5) < 1e-4
class TestDriveToEstop:
def test_drive_to_estop_stops_motors(self, mock_can_bus):
"""Transitioning DRIVE → ESTOP must immediately send zero velocity."""
sm = ModeStateMachine(mock_can_bus)
sm.set_mode(MODE_DRIVE)
sm.send_drive(1.0, 1.0)
mock_can_bus.reset()
def test_estop_overrides_drive(self):
estop = build_mode_cmd(MODE_ESTOP)
drive = build_mode_cmd(MODE_DRIVE)
# ESTOP has higher mode value than DRIVE
assert estop[0] > drive[0]
sm.set_mode(MODE_ESTOP)
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) >= 1, "No velocity frame on DRIVE→ESTOP transition"
l, r = parse_velocity_cmd(bytes(vel_frames[-1].data))
assert abs(l) < 1e-5, f"Left motor {l} not zero after ESTOP"
assert abs(r) < 1e-5, f"Right motor {r} not zero after ESTOP"
def test_drive_to_estop_mode_frame(self, mock_can_bus):
"""DRIVE → ESTOP must broadcast MODE=estop."""
sm = ModeStateMachine(mock_can_bus)
sm.set_mode(MODE_DRIVE)
sm.set_mode(MODE_ESTOP)
mode_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_MODE)
assert any(bytes(f.data) == bytes([MODE_ESTOP]) for f in mode_frames)
def test_estop_blocks_subsequent_drive(self, mock_can_bus):
"""After DRIVE → ESTOP, drive commands must be blocked."""
sm = ModeStateMachine(mock_can_bus)
sm.set_mode(MODE_DRIVE)
sm.set_mode(MODE_ESTOP)
mock_can_bus.reset()
forwarded = sm.send_drive(1.0, 1.0)
assert forwarded is False
vel_frames = mock_can_bus.get_sent_frames_by_id(MAMBA_CMD_VELOCITY)
assert len(vel_frames) == 0
class TestModeCommandEncoding:
def test_mode_idle_byte(self, mock_can_bus):
"""MODE_IDLE must encode as 0x00."""
assert encode_mode_cmd(MODE_IDLE) == b"\x00"
def test_mode_drive_byte(self, mock_can_bus):
"""MODE_DRIVE must encode as 0x01."""
assert encode_mode_cmd(MODE_DRIVE) == b"\x01"
def test_mode_estop_byte(self, mock_can_bus):
"""MODE_ESTOP must encode as 0x02."""
assert encode_mode_cmd(MODE_ESTOP) == b"\x02"
def test_mode_frame_length(self, mock_can_bus):
"""Mode command payload must be exactly 1 byte."""
for mode in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
payload = encode_mode_cmd(mode)
assert len(payload) == 1, f"Mode {mode} payload length {len(payload)} != 1"
def test_protocol_defs_build_mode_cmd_matches(self):
"""build_mode_cmd in protocol_defs must produce identical bytes."""
for mode in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
assert build_mode_cmd(mode) == encode_mode_cmd(mode), \
f"protocol_defs.build_mode_cmd({mode}) != mamba_protocol.encode_mode_cmd({mode})"
class TestInvalidMode:
def test_invalid_mode_byte_ignored(self, mock_can_bus):
"""Unknown mode byte (e.g. 0xFF) must be rejected — no state change."""
sm = ModeStateMachine(mock_can_bus)
sm.set_mode(MODE_DRIVE)
initial_mode = sm.mode
mock_can_bus.reset()
def test_invalid_mode_truncated_to_byte(self):
# build_mode_cmd just masks to byte — no validation
payload = build_mode_cmd(255)
assert len(payload) == 1
assert payload[0] == 255
accepted = sm.set_mode(0xFF)
assert accepted is False, "Invalid mode 0xFF should be rejected"
assert sm.mode == initial_mode, "Mode changed despite invalid value"
assert len(mock_can_bus.get_sent_frames()) == 0, \
"Frames sent for invalid mode command"
def test_invalid_mode_99_ignored(self, mock_can_bus):
"""Mode 99 must be rejected."""
sm = ModeStateMachine(mock_can_bus)
accepted = sm.set_mode(99)
assert accepted is False
def test_invalid_mode_negative_ignored(self, mock_can_bus):
"""Negative mode values must be rejected."""
sm = ModeStateMachine(mock_can_bus)
accepted = sm.set_mode(-1)
assert accepted is False
def test_mamba_protocol_invalid_mode_raises(self):
"""mamba_protocol.encode_mode_cmd must raise on invalid mode."""
with pytest.raises(ValueError):
encode_mode_cmd(99)
with pytest.raises(ValueError):
encode_mode_cmd(-1)
@pytest.mark.parametrize("mode,expected_byte", [
(MODE_IDLE, b"\x00"),
(MODE_DRIVE, b"\x01"),
(MODE_ESTOP, b"\x02"),
])
def test_mode_encoding_parametrized(mode, expected_byte):
"""Parametrized check that all mode constants encode to the right byte."""
assert encode_mode_cmd(mode) == expected_byte

View File

@ -27,7 +27,7 @@ robot:
stem_od: 0.0381 # m STEM_OD = 38.1mm
stem_height: 1.050 # m nominal cut length
# ── FC / IMU (ESP32-S3 BALANCE QMI8658) ──────────────────────────────────────────────────
# ── FC / IMU (MAMBA F722S) ──────────────────────────────────────────────────
# fc_x = -50mm in SCAD (front = -X SCAD = +X ROS REP-105)
# z = deck_thickness/2 + mounting_pad(3mm) + standoff(6mm) = 12mm
imu_x: 0.050 # m forward of base_link center

View File

@ -1,8 +1,8 @@
vesc_can_odometry:
ros__parameters:
# ── CAN motor IDs (used for CAN addressing) ───────────────────────────────
left_can_id: 56 # left motor VESC CAN ID (ESP32 IO)
right_can_id: 68 # right motor VESC CAN ID (ESP32 IO)
left_can_id: 56 # left motor VESC CAN ID (Mamba F722S)
right_can_id: 68 # right motor VESC CAN ID (Mamba F722S)
# ── State topic names (must match VESC telemetry publisher) ──────────────
left_state_topic: /vesc/left/state

View File

@ -1,5 +1,5 @@
# VESC CAN Telemetry Node — SaltyBot dual FSESC 6.7 Pro (FW 6.6)
# SocketCAN interface: can0 (SN65HVD230 transceiver on CANable 2.0 USB-to-CAN on Orin)
# SocketCAN interface: can0 (SN65HVD230 transceiver on MAMBA F722S CAN2)
vesc_telemetry:
ros__parameters: