feat(arch): implement SAUL-TEE ESP32 protocol specs from hal reference doc

Spec source: docs/SAUL-TEE-SYSTEM-REFERENCE.md (hal, 2026-04-04)

stm32_protocol.py — rewritten for inter-board UART protocol (ESP32 BALANCE ↔ IO):
- Frame: [0xAA][LEN][TYPE][PAYLOAD][CRC8] @ 460800 baud (was STX/ETX/CRC16)
- CRC-8 poly 0x07 over LEN+TYPE+PAYLOAD
- New message types: RC_CHANNELS(0x01), SENSORS(0x02), LED_CMD(0x10),
  OUTPUT_CMD(0x11), MOTOR_CMD(0x12), HEARTBEAT(0x20)

mamba_protocol.py — updated CAN IDs and frame formats:
- Orin→BALANCE: DRIVE(0x300) f32×2 LE, MODE(0x301), ESTOP(0x302), LED(0x303)
- BALANCE→Orin: FC_STATUS(0x400) pitch/vbat/state, FC_VESC(0x401) rpm/current
- VESC node IDs: Left=56, Right=68 (authoritative per §8)
- VESC extended frames: STATUS1(cmd=9), STATUS4(cmd=16), STATUS5(cmd=27)
- Replaced old MAMBA_CMD_*/MAMBA_TELEM_* constants

can_bridge_node.py — updated to use new IDs:
- ORIN_CMD_DRIVE/MODE/ESTOP replace MAMBA_CMD_VELOCITY/MODE/ESTOP
- FC_STATUS handler: publishes pitch→/can/imu, vbat_mv→/can/battery
- FC_VESC handler: publishes rpm/cur→/can/vesc/left|right/state
- VESC STATUS1 extended frames decoded per node ID (56/68)
- Removed PID CAN command (not in new spec)

CLAUDE.md — updated with ESP32-S3 BALANCE/IO hardware summary + key protocols

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-webui 2026-04-04 08:25:24 -04:00
parent 7db6158ada
commit cfd5a15b3e
5 changed files with 874 additions and 553 deletions

View File

@ -1,17 +1,31 @@
# SaltyLab Firmware — Agent Playbook # SaltyLab Firmware — Agent Playbook
## Project ## Project
Self-balancing two-wheeled robot: STM32F722 flight controller, hoverboard hub motors, Jetson Nano for AI/SLAM. **SAUL-TEE** — 4-wheel wagon robot (870×510×550 mm, 23 kg).
Two ESP32-S3 boards + Jetson Orin for AI/ROS2.
> **Full hardware spec:** `docs/SAUL-TEE-SYSTEM-REFERENCE.md`
### Embedded boards
| Board | Hardware | Role |
|-------|----------|------|
| **ESP32-S3 BALANCE** | Waveshare Touch LCD 1.28 (CH343 USB) | QMI8658 IMU, PID balance loop, CAN→VESCs (SN65HVD230, 500 kbps) |
| **ESP32-S3 IO** | Bare DevKit (JTAG USB) | BTS7960 motors, TBS Crossfire (UART0) + ELRS failover (UART2), NFC/baro/ToF (I2C), WS2812 LEDs, horn/headlight/fan/buzzer |
### Key protocols
- **Orin ↔ BALANCE:** CAN 500 kbps via CANable2 (slcan0). Cmds 0x3000x303, telemetry 0x4000x401
- **BALANCE ↔ IO:** UART 460800 baud, frame `[0xAA][LEN][TYPE][PAYLOAD][CRC8]`
- **VESC IDs:** Left = 56, Right = 68
## Team ## Team
| Agent | Role | Focus | | Agent | Role | Focus |
|-------|------|-------| |-------|------|-------|
| **sl-firmware** | Embedded Firmware Lead | STM32 HAL, USB CDC debugging, SPI/UART, PlatformIO, DFU bootloader | | **sl-firmware** | Embedded Firmware Lead | ESP32-S3 firmware (PlatformIO), QMI8658 IMU, PID, VESC CAN, inter-board UART |
| **sl-controls** | Control Systems Engineer | PID tuning, IMU sensor fusion, real-time control loops, safety systems | | **sl-controls** | Control Systems Engineer | PID tuning, IMU sensor fusion, real-time control loops, safety systems |
| **sl-perception** | Perception / SLAM Engineer | Jetson Nano, RealSense D435i, RPLIDAR, ROS2, Nav2 | | **sl-perception** | Perception / SLAM Engineer | Jetson Orin, RealSense D435i, RPLIDAR, ROS2, Nav2 |
## Status ## Status
USB CDC TX bug resolved (PR #10 — DCache MPU non-cacheable region + IWDG ordering fix). Architecture migrated from Mamba F722S/BlackPill → ESP32-S3 BALANCE + IO (PR #712, 2026-04-04).
## Repo Structure ## Repo Structure
- `projects/saltybot/SALTYLAB.md` — Design doc - `projects/saltybot/SALTYLAB.md` — Design doc

View File

@ -0,0 +1,444 @@
# SAUL-TEE — System Reference
**Rev A — 2026-04-04**
_Authoritative architecture reference for all agents. Source: hal (Orin), max._
---
## 1. Robot Overview
| Parameter | Value |
|-----------|-------|
| **Name** | SAUL-TEE |
| **Config** | 4-wheel wagon (replaces 2-wheel self-balancing bot) |
| **Dimensions** | 870 × 510 × 550 mm (L × W × H) |
| **Mass** | ~23 kg |
| **Drive** | 4× hub motors via 2× VESC 6.7 (dual channel each) |
| **Power** | 36V battery bus |
| **AI brain** | Jetson Orin Nano Super (25W) |
| **CAN bus** | 500 kbps, CANable 2.0 USB↔CAN on Orin |
> ⚠️ **ARCHITECTURE CHANGE (2026-04-03):** Mamba F722S / STM32F722 / BlackPill are retired.
> New embedded stack: **ESP32-S3 BALANCE** + **ESP32-S3 IO** (see §23 below).
---
## 2. ESP32-S3 BALANCE Board
### Hardware
| Item | Detail |
|------|--------|
| **Module** | Waveshare ESP32-S3 Touch LCD 1.28 |
| **MCU** | ESP32-S3 (dual-core 240 MHz, 512KB SRAM, 8MB flash, 8MB PSRAM) |
| **USB** | CH343G USB-UART bridge (UART0 / GPIO43 TX, GPIO44 RX) |
| **Display** | 1.28" round GC9A01 240×240 LCD (SPI) |
| **Touch** | CST816S capacitive touch (I2C) |
| **IMU** | QMI8658 6-axis (gyro + accel), I2C on-board |
| **CAN transceiver** | SN65HVD230, external, wired to ESP32-S3 TWAI peripheral |
### Role
- Runs the **PID balance / drive loop** (or stability assist for wagon mode)
- Reads QMI8658 IMU at high rate for tilt / attitude
- Sends drive commands to VESCs over **500 kbps CAN** (VESC native protocol)
- Receives high-level velocity commands from Orin over CAN (0x3000x303)
- Publishes telemetry to Orin over CAN (0x4000x401)
- Bridges ESP32-IO sensor data over **inter-board UART** @ 460800 baud
### On-Board Pin Map (fixed by Waveshare PCB)
| Signal | GPIO | Notes |
|--------|------|-------|
| LCD SCLK | 10 | GC9A01 SPI clock |
| LCD MOSI | 11 | GC9A01 SPI data |
| LCD CS | 9 | GC9A01 chip select |
| LCD DC | 8 | GC9A01 data/cmd |
| LCD BL | 2 | Backlight PWM |
| Touch SDA | 6 | CST816S / I2C-0 |
| Touch SCL | 7 | CST816S / I2C-0 |
| Touch INT | 5 | Active-low interrupt |
| Touch RST | 4 | Active-low reset |
| IMU SDA | 6 | QMI8658 shares I2C-0 |
| IMU SCL | 7 | QMI8658 shares I2C-0 |
| IMU INT1 | 3 | Data-ready interrupt |
| CH343 USB TX | 43 | UART0 TX (USB serial) |
| CH343 USB RX | 44 | UART0 RX (USB serial) |
### External Wiring (user-soldered, confirm with calipers/multimeter)
| Signal | GPIO | Notes |
|--------|------|-------|
| CAN TX | **TBD** | → SN65HVD230 D pin; use free header GPIO |
| CAN RX | **TBD** | ← SN65HVD230 R pin; use free header GPIO |
| Inter-board UART TX | **TBD** | → ESP32-IO UART RX @ 460800 |
| Inter-board UART RX | **TBD** | ← ESP32-IO UART TX @ 460800 |
> ⚠️ **Verify GPIO assignments in `esp32/balance/src/config.h` before writing firmware.**
### QMI8658 Details
| Item | Value |
|------|-------|
| I2C address | 0x6A (SA0 pin low) or 0x6B (SA0 high) |
| Gyro full-scale | ±2048 °/s (configurable ±16/32/64/128/256/512/1024/2048) |
| Accel full-scale | ±16 g (configurable ±2/4/8/16) |
| Output data rate | Up to 7174.4 Hz |
| Interface | I2C or SPI (board routes I2C) |
---
## 3. ESP32-S3 IO Board
### Hardware
| Item | Detail |
|------|--------|
| **Module** | ESP32-S3 bare dev board (JTAG USB) |
| **MCU** | ESP32-S3 (dual-core 240 MHz) |
| **USB** | Built-in USB-JTAG/Serial (no external bridge) |
### Role
- All **physical I/O**: motors, RC, sensors, indicators, accessories
- TBS Crossfire primary RC (UART0)
- ELRS failover RC (UART2)
- BTS7960 half-bridge motor drivers (4-wheel PWM drive)
- NFC, barometer, ToF range sensors (I2C)
- WS2812B LED strip
- Horn, headlight, cooling fan, buzzer
### Pin Map
| Signal | GPIO | Protocol | Notes |
|--------|------|----------|-------|
| **TBS Crossfire RX** UART TX | 43 | CRSF @ 420000 baud | Telemetry out to TBS TX module |
| **TBS Crossfire RX** UART RX | 44 | CRSF @ 420000 baud | UART0 — RC frames in |
| **ELRS failover** UART TX | **TBD** | CRSF @ 420000 baud | UART2 |
| **ELRS failover** UART RX | **TBD** | CRSF @ 420000 baud | UART2 |
| **Inter-board UART TX** | **TBD** | Binary @ 460800 | → BALANCE UART RX |
| **Inter-board UART RX** | **TBD** | Binary @ 460800 | ← BALANCE UART TX |
| **BTS7960 — FL** RPWM | **TBD** | PWM | Front-left forward |
| **BTS7960 — FL** LPWM | **TBD** | PWM | Front-left reverse |
| **BTS7960 — FL** R_EN | **TBD** | GPIO | Enable H |
| **BTS7960 — FL** L_EN | **TBD** | GPIO | Enable H |
| **BTS7960 — FR** RPWM | **TBD** | PWM | Front-right forward |
| **BTS7960 — FR** LPWM | **TBD** | PWM | Front-right reverse |
| **BTS7960 — FR** R_EN | **TBD** | GPIO | Enable H |
| **BTS7960 — FR** L_EN | **TBD** | GPIO | Enable H |
| **BTS7960 — RL** RPWM | **TBD** | PWM | Rear-left forward |
| **BTS7960 — RL** LPWM | **TBD** | PWM | Rear-left reverse |
| **BTS7960 — RL** R_EN | **TBD** | GPIO | Enable H |
| **BTS7960 — RL** L_EN | **TBD** | GPIO | Enable H |
| **BTS7960 — RR** RPWM | **TBD** | PWM | Rear-right forward |
| **BTS7960 — RR** LPWM | **TBD** | PWM | Rear-right reverse |
| **BTS7960 — RR** R_EN | **TBD** | GPIO | Enable H |
| **BTS7960 — RR** L_EN | **TBD** | GPIO | Enable H |
| **I2C SDA** | **TBD** | I2C | NFC + baro + ToF shared bus |
| **I2C SCL** | **TBD** | I2C | NFC + baro + ToF shared bus |
| **WS2812B data** | **TBD** | RMT | LED strip |
| **Horn** | **TBD** | GPIO/PWM | MOSFET or relay |
| **Headlight** | **TBD** | GPIO/PWM | MOSFET |
| **Fan** | **TBD** | PWM | ESC/electronics cooling |
| **Buzzer** | **TBD** | GPIO/PWM | Piezo or active buzzer |
> ⚠️ **TBD pin assignments** — to be confirmed in `esp32/io/src/config.h` once wiring is set.
### I2C Peripherals
| Device | Address | Function |
|--------|---------|----------|
| NFC module | 0x24 (PN532) or 0x28 | NFC tag read/write |
| Barometer | 0x76 (BMP280/BMP388) | Altitude + temp |
| ToF range | 0x29 (VL53L0X) or 0x52 (VL53L4CD) | Proximity/obstacle |
---
## 4. Inter-Board UART Protocol (BALANCE ↔ IO)
### Link Parameters
| Parameter | Value |
|-----------|-------|
| Baud rate | 460800 |
| Format | 8N1 |
| Direction | Full-duplex |
### Frame Format
```
Byte 0: 0xAA (start-of-frame magic)
Byte 1: LEN (payload length in bytes, uint8)
Byte 2: TYPE (message type, uint8)
Byte 3…N: PAYLOAD (LEN bytes)
Byte N+1: CRC8 (CRC-8/MAXIM over bytes 1..N)
```
### Message Types (draft — confirm in firmware)
| Type | Direction | Payload | Description |
|------|-----------|---------|-------------|
| 0x01 | IO → BAL | `[ch1:u16][ch2:u16]…[ch16:u16][flags:u8]` | RC channels (16× uint16, 10002000 µs) + status flags |
| 0x02 | IO → BAL | `[sensor_id:u8][value:f32]` | Sensor reading (ToF, baro, etc.) |
| 0x03 | IO → BAL | `[nfc_uid:u8×7][len:u8]` | NFC tag detected |
| 0x10 | BAL → IO | `[leds:u8][horn:u8][light:u8][fan:u8][buzz:u8]` | Actuator commands |
| 0x11 | BAL → IO | `[state:u8][pitch:f32][speed:f32]` | Status for LED animations |
| 0xFF | both | `[uptime_ms:u32]` | Heartbeat (watchdog reset) |
> Frame types are **draft** — refer to `esp32/shared/protocol.h` for authoritative definitions.
### CRC-8 Polynomial
```
Poly: 0x31 (CRC-8/MAXIM, also called CRC-8/1-Wire)
Init: 0x00
RefIn/RefOut: true
XorOut: 0x00
```
---
## 5. CAN Bus
### Bus Parameters
| Parameter | Value |
|-----------|-------|
| Bit rate | 500 kbps |
| Topology | Single bus; all nodes share CANH/CANL + GND |
| Termination | 120 Ω at each end of the bus |
| Orin interface | CANable 2.0 USB↔CAN → `/dev/canable0` (or `can0` after `ip link`) |
### Node Addresses
| Node | CAN ID / Role |
|------|--------------|
| VESC left motor | ID **68** (0x44) — FSESC 6.7 Pro Mini Dual, left channel |
| VESC right motor | ID **56** (0x38) — FSESC 6.7 Pro Mini Dual, right channel |
| ESP32-S3 BALANCE | transmits telemetry 0x4000x401; receives Orin cmds 0x3000x303 |
| Orin (CANable2) | transmits cmds 0x3000x303; receives telemetry 0x4000x401 |
### Orin → Robot Command Frames (0x3000x303)
| CAN ID | DLC | Payload | Description |
|--------|-----|---------|-------------|
| **0x300** | 8 | `[speed:i16][steer:i16][mode:u8][flags:u8][_:u16]` | Drive command (speed/steer ±1000, mode byte) |
| **0x301** | 1 | `[arm:u8]` | Arm/disarm (0x01 = arm, 0x00 = disarm) |
| **0x302** | 8 | `[kp:f16][ki:f16][kd:f16][_:u16]` | PID update (half-float) |
| **0x303** | 1 | `[0xE5]` | Emergency stop (magic byte, cuts all motors) |
### Robot → Orin Telemetry Frames (0x4000x401)
| CAN ID | DLC | Payload | Description |
|--------|-----|---------|-------------|
| **0x400** | 8 | `[pitch:f16][speed:f16][yaw_rate:f16][state:u8][flags:u8]` | Attitude + drive state |
| **0x401** | 4 | `[vbat:u16][fault_code:u8][rssi:i8]` | Battery voltage (mV), fault, RC RSSI |
### VESC Native CAN (standard VESC protocol)
ESP32-S3 BALANCE sends VESC commands using the standard VESC CAN protocol:
| Frame type | CAN ID formula | Notes |
|------------|---------------|-------|
| SET_DUTY | `(0x00 << 8) | VESC_ID` | Duty cycle 1.0..+1.0 × 100000 |
| SET_CURRENT | `(0x01 << 8) | VESC_ID` | Current in mA |
| SET_RPM | `(0x03 << 8) | VESC_ID` | Electrical RPM |
| SET_CURRENT_BRAKE | `(0x02 << 8) | VESC_ID` | Braking current in mA |
| STATUS_1 | `(0x09 << 8) | VESC_ID` | ERPMs + current + duty (rx) |
| STATUS_4 | `(0x10 << 8) | VESC_ID` | Temp + input voltage + input current (rx) |
---
## 6. RC Channel Mapping
### Primary: TBS Crossfire (UART0 on ESP32-IO, CRSF @ 420000 baud)
| Channel | Function | Range | Notes |
|---------|----------|-------|-------|
| CH1 | Roll / Steer | 10002000 µs | Left stick X (mode 2) |
| CH2 | Pitch / Speed | 10002000 µs | Left stick Y |
| CH3 | Throttle | 10002000 µs | Right stick Y |
| CH4 | Yaw | 10002000 µs | Right stick X |
| CH5 | Arm | 1000 / 2000 µs | 2-pos switch — <1200=DISARM, >1800=ARM |
| CH6 | Drive mode | 1000/1500/2000 µs | 3-pos: RC / Assisted / Autonomous |
| CH7 | Speed limit | 10002000 µs | Analog knob, scales max speed |
| CH8 | Aux / Horn | 1000 / 2000 µs | Momentary for horn; held = klaxon |
### Failover: ELRS (UART2 on ESP32-IO, CRSF @ 420000 baud)
Same channel mapping as TBS Crossfire. IO board switches to ELRS automatically if Crossfire link lost for > 300 ms.
### Failsafe
- **RC lost > 300 ms**: motors stop, DISARM state, wait for link restoration
- **Arm switch** must be cycled to DISARM→ARM to re-arm after failsafe
---
## 7. Serial Commands (Orin → ESP32-S3 BALANCE via CAN 0x300)
All high-level commands from the Orin go over CAN. The BALANCE board also accepts direct USB-serial commands on its CH343 port for bench debugging:
```
# Arm / disarm
ARM
DISARM
# Set drive (speed -1000..1000, steer -1000..1000)
DRIVE <speed> <steer>
# Set mode
MODE RC # full RC passthrough
MODE ASSISTED # Orin sends velocity, BALANCE does stability
MODE AUTO # full autonomous (Orin CAN commands only)
# PID update
PID <kp> <ki> <kd>
# Emergency stop (same as CAN 0x303)
ESTOP
# Status query
STATUS # returns JSON telemetry line
# IMU calibration
IMU CAL
```
---
## 8. System Wiring Diagram
```
┌──────────────────────────────────────────────────────────┐
│ JETSON ORIN NANO SUPER │
│ (Top plate, 25W) │
│ │
│ USB-A ─── CANable 2.0 USB↔CAN (can0, 500 kbps) │
│ USB-A ─── RealSense D435i (USB 3.1) │
│ USB-A ─── RPLIDAR A1M8 (CP2102, 115200) │
│ USB-C ─── SIM7600A 4G/LTE modem │
│ CSI-A ─── 2× IMX219 cameras │
│ CSI-B ─── 2× IMX219 cameras │
│ 40-pin ── ReSpeaker 2-Mic HAT │
└──────────────────────┬───────────────────────────────────┘
│ USB-A
│ CANable 2.0
│ 500 kbps CAN
┌────────────────────────────────┴──────────────────────────────────┐
│ CAN BUS (CANH / CANL) │
│ 120Ω ───┤ (all nodes) ├─── 120Ω │
└───┬──────────────────────────┬───────────────────┬────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ (CAN ID 56/68)
│ ESP32-S3 BALANCE │ │ VESC left (ID 68)│ VESC right (ID 56)
│ Waveshare LCD 1.28 │ │ FSESC 6.7 Pro Mini│
│ │ │ Dual │
│ QMI8658 IMU (I2C) │ └────────┬────────────┘
│ SN65HVD230 (CAN) │ │ Phase wires
│ CH343 USB (debug) │ ┌─────┴────────────────┐
│ │ │ Hub motors (4×) │
│ UART ─────────────────┐ │ FL / FR / RL / RR │
└─────────────────────┘ │ └──────────────────────┘
↑ 460800 baud binary │
↓ inter-board protocol │
┌─────────────────────────┘
│ ESP32-S3 IO (bare board)
│ JTAG USB (debug)
│ UART0 ── TBS Crossfire RX (CRSF, 420000)
│ UART2 ── ELRS receiver (CRSF failover, 420000)
│ PWM ──── 4× BTS7960 motor drivers
│ I2C ──── NFC + Baro + ToF sensors
│ GPIO ─── WS2812B LEDs
│ GPIO ─── Horn / Headlight / Fan / Buzzer
└─────────────────────────────────────────────
```
---
## 9. Power Architecture
```
36V BATTERY
├── VESCs (36V direct) ──── 4× Hub motors
├── BTS7960 boards (36V → motor logic 5V via onboard reg)
├── DC-DC 12V ──── Fan / Headlight / Accessories
└── DC-DC 5V ─┬── Jetson Orin (USB-C PD or barrel)
├── ESP32-S3 BALANCE (USB 5V or VIN)
├── ESP32-S3 IO (USB 5V or VIN)
├── TBS Crossfire RX (5V)
├── ELRS RX (5V)
├── WS2812B strip (5V)
├── RPLIDAR A1M8 (5V via USB)
└── Sensors / NFC / ToF / Baro (3.3V LDO from ESP32)
```
---
## 10. Device Nodes (Orin)
| Device | Node | Notes |
|--------|------|-------|
| CANable 2.0 | `can0` | `ip link set can0 up type can bitrate 500000` |
| RealSense D435i | `/dev/bus/usb/...` | realsense-ros driver |
| RPLIDAR A1M8 | `/dev/rplidar` | udev symlink from ttyUSB* |
| SIM7600A 4G | `/dev/ttyUSB02` | AT, PPP, GNSS NMEA |
| IMX219 cameras | `/dev/video0,2,4,6` | CSI-A: 0/2, CSI-B: 4/6 |
| ESP32-S3 BAL debug | `/dev/ttyACM0` or `/dev/esp32-balance` | CH343 CDC |
| ESP32-S3 IO debug | `/dev/ttyACM1` or `/dev/esp32-io` | JTAG/CDC |
---
## 11. Safety Systems
| System | Trigger | Action |
|--------|---------|--------|
| **HW kill switch** | Big red button (NC inline with 36V) | Cuts all power instantly |
| **Tilt cutoff** | `|pitch| > 25°` | Motors off → DISARM, manual re-arm required |
| **RC failsafe** | No RC frame > 300 ms | Motors off → DISARM |
| **CAN watchdog** | No Orin heartbeat > 500 ms | Drop to RC-only mode |
| **ESTOP CAN** | CAN frame 0x303 (magic 0xE5) | Immediate motor off, DISARM |
| **ESTOP inter-board** | Type 0xFF heartbeat missing > 200 ms | IO board stops BTS7960 enables |
| **Startup arming** | Cold power-on | Motors NEVER spin; need deliberate ARM |
---
## 12. Firmware Repository Layout
```
esp32/
├── balance/ — ESP32-S3 BALANCE firmware (PlatformIO)
│ ├── src/
│ │ ├── main.cpp
│ │ ├── config.h ← GPIO assignments live here
│ │ ├── imu_qmi8658.cpp/.h
│ │ ├── can_vesc.cpp/.h
│ │ └── protocol.cpp/.h
│ └── platformio.ini
├── io/ — ESP32-S3 IO firmware (PlatformIO)
│ ├── src/
│ │ ├── main.cpp
│ │ ├── config.h ← GPIO assignments live here
│ │ ├── rc_crsf.cpp/.h
│ │ ├── motor_bts7960.cpp/.h
│ │ └── protocol.cpp/.h
│ └── platformio.ini
└── shared/
└── protocol.h — inter-board frame types (authoritative)
src/ — LEGACY STM32 firmware (ARCHIVED — do not extend)
include/ — LEGACY STM32 headers (ARCHIVED — do not extend)
```
---
## 13. Open Questions / TBDs
| Item | Owner | Status |
|------|-------|--------|
| GPIO assignments for CAN TX/RX on BALANCE board | sl-firmware | **TBD** |
| GPIO assignments for IO board (all external pins) | sl-firmware | **TBD** |
| Inter-board protocol message type table (finalized) | sl-firmware | **TBD** |
| BTS7960 wiring — 4WD vs 2WD mode (all 4 or front 2 only) | max | **TBD** |
| UWB ESP-NOW receiver — BALANCE or IO board? | sl-uwb + max | **TBD** |
| VESC CAN IDs confirmed (68/56 left/right) | max | ✅ Confirmed |
| Robot dimensions 870×510×550 mm, 23 kg | max | ✅ Confirmed |
---
_Last updated: 2026-04-04. Contact max or hal on MQTT for corrections._

View File

@ -1,332 +1,260 @@
"""stm32_protocol.py — Binary frame codec for Jetson↔STM32 communication. """stm32_protocol.py — Inter-board UART frame codec (ESP32 BALANCE ↔ ESP32 IO).
Issue #119: defines the binary serial protocol between the Jetson Nano and the File name retained for import compatibility. This module implements the binary
STM32F722 flight controller over USB CDC @ 921600 baud. serial protocol that runs between the two ESP32-S3 embedded boards.
Frame layout (all multi-byte fields are big-endian): Spec source: docs/SAUL-TEE-SYSTEM-REFERENCE.md §5 (2026-04-04)
Physical: UART @ 460800 baud, 8N1
STX TYPE LEN PAYLOAD CRC16 ETX
0x02 1B 1B LEN bytes 2B BE 0x03
CRC16 covers: TYPE + LEN + PAYLOAD (not STX, ETX, or CRC bytes themselves). Frame layout:
CRC algorithm: CCITT-16, polynomial=0x1021, init=0xFFFF, no final XOR.
HEADER LEN TYPE PAYLOAD CRC8
0xAA 1 B 1 B LEN bytes 1 B
Command types (Jetson STM32): CRC8 covers: LEN + TYPE + PAYLOAD (polynomial 0x07, init 0x00).
0x01 HEARTBEAT no payload (len=0) Max payload: 64 bytes. No ETX byte.
0x02 SPEED_STEER int16 speed + int16 steer (len=4) range: -1000..+1000
0x03 ARM uint8 (0=disarm, 1=arm) (len=1)
0x04 SET_MODE uint8 mode (len=1)
0x05 PID_UPDATE float32 kp + ki + kd (len=12)
Telemetry types (STM32 Jetson): Note: the Orin communicates with ESP32 BALANCE via CAN (CANable2/slcan0),
0x10 IMU int16×6: pitch,roll,yaw (×100 deg), ax,ay,az (×100 m/) (len=12) NOT via this serial protocol. See mamba_protocol.py for the CAN frame codec.
0x11 BATTERY uint16 voltage_mv + int16 current_ma + uint8 soc_pct (len=5)
0x12 MOTOR_RPM int16 left_rpm + int16 right_rpm (len=4)
0x13 ARM_STATE uint8 state + uint8 error_flags (len=2)
0x14 ERROR uint8 error_code + uint8 subcode (len=2)
Usage: Message types IO BALANCE:
# Encoding (Jetson → STM32) 0x01 RC_CHANNELS raw RC channel values (CRSF or ELRS)
frame = encode_speed_steer(300, -150) 0x02 SENSORS barometer + ToF data
ser.write(frame)
# Decoding (STM32 → Jetson), one byte at a time Message types BALANCE IO:
parser = FrameParser() 0x10 LED_CMD LED strip pattern [pattern u8][r u8][g u8][b u8]
for byte in incoming_bytes: 0x11 OUTPUT_CMD horn/buzzer/headlight/fan [flags u8][headlight u8][fan u8]
result = parser.feed(byte) 0x12 MOTOR_CMD BTS7960 aux motor [motor_a i16 BE][motor_b i16 BE]
if result is not None: 0x20 HEARTBEAT status keepalive [state u8][error_flags u8]
handle_frame(result)
RC channel mapping (CH1CH8, CRSF range 1721811, centre 992):
CH1 Steer CH2 Drive CH3 Throttle CH4 Yaw
CH5 Arm CH6 Mode CH7 E-stop CH8 Spare
""" """
from __future__ import annotations from __future__ import annotations
import struct import struct
from dataclasses import dataclass from dataclasses import dataclass, field
from enum import IntEnum from enum import IntEnum
from typing import Optional from typing import Optional
# ── Frame constants ─────────────────────────────────────────────────────────── # ── Frame constants ───────────────────────────────────────────────────────────
STX = 0x02 FRAME_HEADER = 0xAA
ETX = 0x03 MAX_PAYLOAD_LEN = 64
MAX_PAYLOAD_LEN = 64 # hard limit; any frame larger is corrupt BAUD_RATE = 460800 # inter-board UART
# ── Message type codes ────────────────────────────────────────────────────────
class IOMsg(IntEnum):
RC_CHANNELS = 0x01
SENSORS = 0x02
# ── Command / telemetry type codes ──────────────────────────────────────────── class BalMsg(IntEnum):
LED_CMD = 0x10
class CmdType(IntEnum): OUTPUT_CMD = 0x11
HEARTBEAT = 0x01 MOTOR_CMD = 0x12
SPEED_STEER = 0x02 HEARTBEAT = 0x20
ARM = 0x03
SET_MODE = 0x04
PID_UPDATE = 0x05
class TelType(IntEnum): # ── Parsed message dataclasses ────────────────────────────────────────────────
IMU = 0x10
BATTERY = 0x11
MOTOR_RPM = 0x12
ARM_STATE = 0x13
ERROR = 0x14
# ── Parsed telemetry objects ──────────────────────────────────────────────────
@dataclass @dataclass
class ImuFrame: class RcChannels:
pitch_deg: float # degrees (positive = forward tilt) """RC channel values from TBS Crossfire (primary) or ELRS (failover).
roll_deg: float CRSF range: 1721811 (centre 992).
yaw_deg: float """
accel_x: float # m/s² channels: list = field(default_factory=lambda: [992] * 8)
accel_y: float source: int = 0 # 0 = CRSF, 1 = ELRS failover
accel_z: float
@dataclass @dataclass
class BatteryFrame: class SensorData:
voltage_mv: int # millivolts (e.g. 11100 = 11.1 V) pressure_pa: float = 0.0 # Pascal (barometer)
current_ma: int # milliamps (negative = charging) temperature_c: float = 0.0 # °C
soc_pct: int # state of charge 0100 (from STM32 fuel gauge or lookup) tof_mm: int = 0 # Time-of-flight distance mm
@dataclass @dataclass
class MotorRpmFrame: class LedCmd:
left_rpm: int pattern: int = 0
right_rpm: int r: int = 0
g: int = 0
b: int = 0
@dataclass @dataclass
class ArmStateFrame: class OutputCmd:
state: int # 0=DISARMED 1=ARMED 2=TILT_FAULT horn: bool = False
error_flags: int # bitmask buzzer: bool = False
headlight: int = 0 # 0255 PWM
fan: int = 0 # 0255 PWM
@dataclass @dataclass
class ErrorFrame: class MotorCmd:
error_code: int motor_a: int = 0 # -255..+255 (BTS7960 channel A)
subcode: int motor_b: int = 0 # -255..+255 (BTS7960 channel B)
# Union type for decoded results @dataclass
TelemetryFrame = ImuFrame | BatteryFrame | MotorRpmFrame | ArmStateFrame | ErrorFrame class Heartbeat:
state: int = 0 # 0=IDLE 1=RUNNING 2=FAULT
error_flags: int = 0
# ── CRC16 CCITT ─────────────────────────────────────────────────────────────── InterboardMsg = RcChannels | SensorData | LedCmd | OutputCmd | MotorCmd | Heartbeat
def _crc16_ccitt(data: bytes, init: int = 0xFFFF) -> int:
"""CRC16-CCITT: polynomial 0x1021, init 0xFFFF, no final XOR.""" # ── CRC-8 (polynomial 0x07, init 0x00) ───────────────────────────────────────
crc = init
def _crc8(data: bytes) -> int:
crc = 0
for byte in data: for byte in data:
crc ^= byte << 8 crc ^= byte
for _ in range(8): for _ in range(8):
if crc & 0x8000: crc = ((crc << 1) ^ 0x07) if (crc & 0x80) else (crc << 1)
crc = (crc << 1) ^ 0x1021 crc &= 0xFF
else:
crc <<= 1
crc &= 0xFFFF
return crc return crc
# ── Frame encoder ───────────────────────────────────────────────────────────── # ── Frame encoder ─────────────────────────────────────────────────────────────
def _build_frame(cmd_type: int, payload: bytes) -> bytes: def _build_frame(msg_type: int, payload: bytes) -> bytes:
"""Assemble a complete binary frame with CRC16.""" assert len(payload) <= MAX_PAYLOAD_LEN
assert len(payload) <= MAX_PAYLOAD_LEN, "Payload too large" crc_data = bytes([len(payload), msg_type]) + payload
length = len(payload) return bytes([FRAME_HEADER, len(payload), msg_type]) + payload + bytes([_crc8(crc_data)])
header = bytes([cmd_type, length])
crc = _crc16_ccitt(header + payload)
return bytes([STX, cmd_type, length]) + payload + struct.pack(">H", crc) + bytes([ETX])
def encode_heartbeat() -> bytes: def encode_heartbeat(state: int = 0, error_flags: int = 0) -> bytes:
"""HEARTBEAT frame — no payload.""" return _build_frame(BalMsg.HEARTBEAT, struct.pack("BB", state & 0xFF, error_flags & 0xFF))
return _build_frame(CmdType.HEARTBEAT, b"")
def encode_speed_steer(speed: int, steer: int) -> bytes: def encode_led_cmd(pattern: int, r: int, g: int, b: int) -> bytes:
"""SPEED_STEER frame — int16 speed + int16 steer, both in -1000..+1000.""" return _build_frame(BalMsg.LED_CMD, struct.pack("BBBB", pattern & 0xFF, r & 0xFF, g & 0xFF, b & 0xFF))
speed = max(-1000, min(1000, int(speed)))
steer = max(-1000, min(1000, int(steer)))
return _build_frame(CmdType.SPEED_STEER, struct.pack(">hh", speed, steer))
def encode_arm(arm: bool) -> bytes: def encode_output_cmd(horn: bool, buzzer: bool, headlight: int, fan: int) -> bytes:
"""ARM frame — 0=disarm, 1=arm.""" flags = (int(horn) & 1) | ((int(buzzer) & 1) << 1)
return _build_frame(CmdType.ARM, struct.pack("B", 1 if arm else 0)) return _build_frame(BalMsg.OUTPUT_CMD, struct.pack("BBB", flags, headlight & 0xFF, fan & 0xFF))
def encode_set_mode(mode: int) -> bytes: def encode_motor_cmd(motor_a: int, motor_b: int) -> bytes:
"""SET_MODE frame — mode byte.""" a = max(-255, min(255, int(motor_a)))
return _build_frame(CmdType.SET_MODE, struct.pack("B", mode & 0xFF)) b = max(-255, min(255, int(motor_b)))
return _build_frame(BalMsg.MOTOR_CMD, struct.pack(">hh", a, b))
def encode_pid_update(kp: float, ki: float, kd: float) -> bytes: # ── Streaming frame parser ────────────────────────────────────────────────────
"""PID_UPDATE frame — three float32 values."""
return _build_frame(CmdType.PID_UPDATE, struct.pack(">fff", kp, ki, kd))
class _St(IntEnum):
# ── Frame decoder (state-machine parser) ───────────────────────────────────── HDR = 0
LEN = 1
class ParserState(IntEnum): TYPE = 2
WAIT_STX = 0 PAY = 3
WAIT_TYPE = 1 CRC = 4
WAIT_LEN = 2
PAYLOAD = 3
CRC_HI = 4
CRC_LO = 5
WAIT_ETX = 6
class ParseError(Exception):
pass
class FrameParser: class FrameParser:
"""Byte-by-byte streaming parser for STM32 telemetry frames. """Byte-by-byte streaming parser for inter-board frames.
Feed individual bytes via feed(); returns a decoded TelemetryFrame (or raw
bytes tuple) when a complete valid frame is received.
Thread-safety: single-threaded wrap in a lock if shared across threads.
Usage:: Usage::
parser = FrameParser() parser = FrameParser()
for b in incoming: for b in incoming:
result = parser.feed(b) msg = parser.feed(b)
if result is not None: if msg is not None:
process(result) handle(msg)
""" """
def __init__(self) -> None: def __init__(self):
self._state = ParserState.WAIT_STX self._s = _St.HDR
self._len = 0
self._type = 0 self._type = 0
self._length = 0 self._pay = bytearray()
self._payload = bytearray()
self._crc_rcvd = 0
self.frames_ok = 0 self.frames_ok = 0
self.frames_error = 0 self.frames_error = 0
def reset(self) -> None: def reset(self):
"""Reset parser to initial state (call after error or port reconnect).""" self._s = _St.HDR
self._state = ParserState.WAIT_STX self._pay = bytearray()
self._payload = bytearray()
def feed(self, byte: int) -> Optional[TelemetryFrame | tuple]: def feed(self, byte: int) -> Optional[InterboardMsg | tuple]:
"""Process one byte. Returns decoded frame on success, None otherwise. s = self._s
On CRC error, increments frames_error and resets. The return value on if s == _St.HDR:
success is a dataclass (ImuFrame, BatteryFrame, etc.) or a if byte == FRAME_HEADER:
(type_code, raw_payload) tuple for unknown type codes. self._s = _St.LEN
"""
s = self._state
if s == ParserState.WAIT_STX:
if byte == STX:
self._state = ParserState.WAIT_TYPE
return None return None
if s == ParserState.WAIT_TYPE: if s == _St.LEN:
if byte > MAX_PAYLOAD_LEN:
self.frames_error += 1; self.reset(); return None
self._len = byte
self._s = _St.TYPE
return None
if s == _St.TYPE:
self._type = byte self._type = byte
self._state = ParserState.WAIT_LEN self._pay = bytearray()
self._s = _St.PAY if self._len > 0 else _St.CRC
return None return None
if s == ParserState.WAIT_LEN: if s == _St.PAY:
self._length = byte self._pay.append(byte)
self._payload = bytearray() if len(self._pay) == self._len:
if self._length > MAX_PAYLOAD_LEN: self._s = _St.CRC
# Corrupt frame — too big; reset return None
self.frames_error += 1
if s == _St.CRC:
self.reset() self.reset()
return None expected = _crc8(bytes([self._len, self._type]) + self._pay)
if self._length == 0: if byte != expected:
self._state = ParserState.CRC_HI self.frames_error += 1; return None
else:
self._state = ParserState.PAYLOAD
return None
if s == ParserState.PAYLOAD:
self._payload.append(byte)
if len(self._payload) == self._length:
self._state = ParserState.CRC_HI
return None
if s == ParserState.CRC_HI:
self._crc_rcvd = byte << 8
self._state = ParserState.CRC_LO
return None
if s == ParserState.CRC_LO:
self._crc_rcvd |= byte
self._state = ParserState.WAIT_ETX
return None
if s == ParserState.WAIT_ETX:
self.reset() # always reset so we look for next STX
if byte != ETX:
self.frames_error += 1
return None
# Verify CRC
crc_data = bytes([self._type, self._length]) + self._payload
expected = _crc16_ccitt(crc_data)
if expected != self._crc_rcvd:
self.frames_error += 1
return None
# Decode
self.frames_ok += 1 self.frames_ok += 1
return _decode_telemetry(self._type, bytes(self._payload)) return _decode(self._type, bytes(self._pay))
# Should never reach here self.reset(); return None
self.reset()
return None
# ── Telemetry decoder ───────────────────────────────────────────────────────── # ── Message decoder ───────────────────────────────────────────────────────────
def _decode_telemetry(type_code: int, payload: bytes) -> Optional[TelemetryFrame | tuple]: def _decode(type_code: int, payload: bytes) -> Optional[InterboardMsg | tuple]:
"""Decode a validated telemetry payload into a typed dataclass."""
try: try:
if type_code == TelType.IMU: if type_code == IOMsg.RC_CHANNELS:
if len(payload) < 12: if len(payload) < 17: return None
return None ch = list(struct.unpack_from("<8H", payload))
p, r, y, ax, ay, az = struct.unpack_from(">hhhhhh", payload) return RcChannels(channels=ch, source=payload[16])
return ImuFrame(
pitch_deg=p / 100.0,
roll_deg=r / 100.0,
yaw_deg=y / 100.0,
accel_x=ax / 100.0,
accel_y=ay / 100.0,
accel_z=az / 100.0,
)
if type_code == TelType.BATTERY: if type_code == IOMsg.SENSORS:
if len(payload) < 5: if len(payload) < 10: return None
return None pres, temp, tof = struct.unpack_from("<ffH", payload)
v_mv, i_ma, soc = struct.unpack_from(">HhB", payload) return SensorData(pressure_pa=pres, temperature_c=temp, tof_mm=tof)
return BatteryFrame(voltage_mv=v_mv, current_ma=i_ma, soc_pct=soc)
if type_code == TelType.MOTOR_RPM: if type_code == BalMsg.LED_CMD:
if len(payload) < 4: if len(payload) < 4: return None
return None pat, r, g, b = struct.unpack_from("BBBB", payload)
left, right = struct.unpack_from(">hh", payload) return LedCmd(pattern=pat, r=r, g=g, b=b)
return MotorRpmFrame(left_rpm=left, right_rpm=right)
if type_code == TelType.ARM_STATE: if type_code == BalMsg.OUTPUT_CMD:
if len(payload) < 2: if len(payload) < 3: return None
return None flags, headlight, fan = struct.unpack_from("BBB", payload)
return OutputCmd(horn=bool(flags & 1), buzzer=bool(flags & 2),
headlight=headlight, fan=fan)
if type_code == BalMsg.MOTOR_CMD:
if len(payload) < 4: return None
a, b = struct.unpack_from(">hh", payload)
return MotorCmd(motor_a=a, motor_b=b)
if type_code == BalMsg.HEARTBEAT:
if len(payload) < 2: return None
state, flags = struct.unpack_from("BB", payload) state, flags = struct.unpack_from("BB", payload)
return ArmStateFrame(state=state, error_flags=flags) return Heartbeat(state=state, error_flags=flags)
if type_code == TelType.ERROR:
if len(payload) < 2:
return None
code, sub = struct.unpack_from("BB", payload)
return ErrorFrame(error_code=code, subcode=sub)
except struct.error: except struct.error:
return None return None
# Unknown telemetry type — return raw
return (type_code, payload) return (type_code, payload)

View File

@ -1,22 +1,21 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
can_bridge_node.py ROS2 node bridging the SaltyBot Orin to the Mamba motor can_bridge_node.py ROS2 node bridging the Jetson Orin to the ESP32-S3 BALANCE
controller and VESC motor controllers over CAN bus. board and VESC motor controllers over CAN bus (CANable2 / slcan0, 500 kbps).
The node opens the SocketCAN interface (slcan0 by default), spawns a background Spec: docs/SAUL-TEE-SYSTEM-REFERENCE.md §4 & §6 (2026-04-04)
reader thread to process incoming telemetry, and exposes the following interface:
Subscriptions Subscriptions
------------- -------------
/cmd_vel geometry_msgs/Twist VESC speed commands (CAN) /cmd_vel geometry_msgs/Twist ORIN_CMD_DRIVE (0x300)
/estop std_msgs/Bool Mamba e-stop (CAN) /estop std_msgs/Bool ORIN_CMD_ESTOP (0x302)
Publications Publications
------------ ------------
/can/imu sensor_msgs/Imu Mamba IMU telemetry /can/imu sensor_msgs/Imu from FC_STATUS (0x400) pitch
/can/battery sensor_msgs/BatteryState Mamba battery telemetry /can/battery sensor_msgs/BatteryState from FC_STATUS (0x400) vbat_mv
/can/vesc/left/state std_msgs/Float32MultiArray Left VESC state /can/vesc/left/state std_msgs/Float32MultiArray from FC_VESC (0x401)
/can/vesc/right/state std_msgs/Float32MultiArray Right VESC state /can/vesc/right/state std_msgs/Float32MultiArray from FC_VESC (0x401)
/can/connection_status std_msgs/String "connected" | "disconnected" /can/connection_status std_msgs/String "connected" | "disconnected"
Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674 Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
@ -30,30 +29,36 @@ import can
import rclpy import rclpy
from geometry_msgs.msg import Twist from geometry_msgs.msg import Twist
from rclpy.node import Node from rclpy.node import Node
from rcl_interfaces.msg import SetParametersResult
from sensor_msgs.msg import BatteryState, Imu from sensor_msgs.msg import BatteryState, Imu
from std_msgs.msg import Bool, Float32MultiArray, String from std_msgs.msg import Bool, Float32MultiArray, String
from saltybot_can_bridge.mamba_protocol import ( from saltybot_can_bridge.mamba_protocol import (
MAMBA_CMD_ESTOP, # Orin → BALANCE command IDs
MAMBA_CMD_MODE, ORIN_CMD_DRIVE,
MAMBA_CMD_VELOCITY, ORIN_CMD_MODE,
MAMBA_TELEM_BATTERY, ORIN_CMD_ESTOP,
MAMBA_TELEM_IMU, # BALANCE → Orin telemetry IDs
VESC_TELEM_STATE, FC_STATUS,
ORIN_CAN_ID_FC_PID_ACK, FC_VESC,
ORIN_CAN_ID_PID_SET, # VESC node IDs
VESC_LEFT_ID,
VESC_RIGHT_ID,
VESC_STATUS_1,
# Mode constants
MODE_DRIVE, MODE_DRIVE,
MODE_ESTOP, MODE_ESTOP,
MODE_IDLE, MODE_IDLE,
encode_estop_cmd, # Encoders
encode_drive_cmd,
encode_mode_cmd, encode_mode_cmd,
encode_velocity_cmd, encode_estop_cmd,
encode_pid_set_cmd, encode_led_cmd,
decode_battery_telem, # Decoders
decode_imu_telem, decode_fc_status,
decode_pid_ack, decode_fc_vesc,
decode_vesc_state, decode_vesc_status1,
decode_vesc_can_id,
) )
# Reconnect attempt interval when CAN bus is lost # Reconnect attempt interval when CAN bus is lost
@ -64,29 +69,21 @@ _WATCHDOG_HZ: float = 10.0
class CanBridgeNode(Node): class CanBridgeNode(Node):
"""CAN bus bridge between Orin ROS2 and Mamba / VESC controllers.""" """CAN bus bridge between Orin ROS2 and ESP32 BALANCE / VESC controllers."""
def __init__(self) -> None: def __init__(self) -> None:
super().__init__("can_bridge_node") super().__init__("can_bridge_node")
# ── Parameters ──────────────────────────────────────────────────── # ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("can_interface", "slcan0") self.declare_parameter("can_interface", "slcan0")
self.declare_parameter("left_vesc_can_id", 56) self.declare_parameter("left_vesc_can_id", VESC_LEFT_ID) # 56
self.declare_parameter("right_vesc_can_id", 68) self.declare_parameter("right_vesc_can_id", VESC_RIGHT_ID) # 68
self.declare_parameter("mamba_can_id", 1)
self.declare_parameter("command_timeout_s", 0.5) self.declare_parameter("command_timeout_s", 0.5)
self.declare_parameter("pid/kp", 0.0)
self.declare_parameter("pid/ki", 0.0)
self.declare_parameter("pid/kd", 0.0)
self._iface: str = self.get_parameter("can_interface").value self._iface: str = self.get_parameter("can_interface").value
self._left_vesc_id: int = self.get_parameter("left_vesc_can_id").value self._left_vesc_id: int = self.get_parameter("left_vesc_can_id").value
self._right_vesc_id: int = self.get_parameter("right_vesc_can_id").value self._right_vesc_id: int = self.get_parameter("right_vesc_can_id").value
self._mamba_id: int = self.get_parameter("mamba_can_id").value
self._cmd_timeout: float = self.get_parameter("command_timeout_s").value self._cmd_timeout: float = self.get_parameter("command_timeout_s").value
self._pid_kp: float = self.get_parameter("pid/kp").value
self._pid_ki: float = self.get_parameter("pid/ki").value
self._pid_kd: float = self.get_parameter("pid/kd").value
# ── State ───────────────────────────────────────────────────────── # ── State ─────────────────────────────────────────────────────────
self._bus: Optional[can.BusABC] = None self._bus: Optional[can.BusABC] = None
@ -110,7 +107,6 @@ class CanBridgeNode(Node):
# ── Subscriptions ───────────────────────────────────────────────── # ── Subscriptions ─────────────────────────────────────────────────
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10) self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
self.create_subscription(Bool, "/estop", self._estop_cb, 10) self.create_subscription(Bool, "/estop", self._estop_cb, 10)
self.add_on_set_parameters_callback(self._on_set_parameters)
# ── Timers ──────────────────────────────────────────────────────── # ── Timers ────────────────────────────────────────────────────────
self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb) self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb)
@ -128,33 +124,8 @@ class CanBridgeNode(Node):
self.get_logger().info( self.get_logger().info(
f"can_bridge_node ready — iface={self._iface} " f"can_bridge_node ready — iface={self._iface} "
f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id}" f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id}"
f"mamba={self._mamba_id}"
) )
# -- PID parameter callback (Issue #693) --
def _on_set_parameters(self, params) -> SetParametersResult:
"""Send new PID gains over CAN when pid/* params change."""
for p in params:
if p.name == "pid/kp":
self._pid_kp = float(p.value)
elif p.name == "pid/ki":
self._pid_ki = float(p.value)
elif p.name == "pid/kd":
self._pid_kd = float(p.value)
else:
continue
try:
payload = encode_pid_set_cmd(self._pid_kp, self._pid_ki, self._pid_kd)
self._send_can(ORIN_CAN_ID_PID_SET, payload, "pid_set")
self.get_logger().info(
f"PID gains sent: Kp={self._pid_kp:.2f} "
f"Ki={self._pid_ki:.2f} Kd={self._pid_kd:.2f}"
)
except ValueError as exc:
return SetParametersResult(successful=False, reason=str(exc))
return SetParametersResult(successful=True)
# ── Connection management ────────────────────────────────────────────── # ── Connection management ──────────────────────────────────────────────
def _try_connect(self) -> None: def _try_connect(self) -> None:
@ -212,29 +183,21 @@ class CanBridgeNode(Node):
linear = msg.linear.x linear = msg.linear.x
angular = msg.angular.z angular = msg.angular.z
# Forward left = forward right for pure translation; for rotation # Differential drive decomposition (positive angular = CCW = left turn).
# left slows and right speeds up (positive angular = CCW = left turn).
# The Mamba velocity command carries both wheels independently.
left_mps = linear - angular left_mps = linear - angular
right_mps = linear + angular right_mps = linear + angular
payload = encode_velocity_cmd(left_mps, right_mps) self._send_can(ORIN_CMD_DRIVE, encode_drive_cmd(left_mps, right_mps), "cmd_vel")
self._send_can(MAMBA_CMD_VELOCITY, payload, "cmd_vel") self._send_can(ORIN_CMD_MODE, encode_mode_cmd(MODE_DRIVE), "cmd_vel mode")
# Keep Mamba in DRIVE mode while receiving commands
self._send_can(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE), "cmd_vel mode")
def _estop_cb(self, msg: Bool) -> None: def _estop_cb(self, msg: Bool) -> None:
"""Forward /estop to Mamba over CAN.""" """Forward /estop to ESP32 BALANCE over CAN."""
if not self._connected: if not self._connected:
return return
payload = encode_estop_cmd(msg.data) self._send_can(ORIN_CMD_ESTOP, encode_estop_cmd(stop=msg.data), "estop")
self._send_can(MAMBA_CMD_ESTOP, payload, "estop")
if msg.data: if msg.data:
self._send_can( self._send_can(ORIN_CMD_MODE, encode_mode_cmd(MODE_ESTOP), "estop mode")
MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP), "estop mode" self.get_logger().warning("E-stop asserted — sent ESTOP to ESP32 BALANCE")
)
self.get_logger().warning("E-stop asserted — sent ESTOP to Mamba")
# ── Watchdog ────────────────────────────────────────────────────────── # ── Watchdog ──────────────────────────────────────────────────────────
@ -244,14 +207,8 @@ class CanBridgeNode(Node):
return return
elapsed = time.monotonic() - self._last_cmd_time elapsed = time.monotonic() - self._last_cmd_time
if elapsed > self._cmd_timeout: if elapsed > self._cmd_timeout:
self._send_can( self._send_can(ORIN_CMD_DRIVE, encode_drive_cmd(0.0, 0.0), "watchdog zero-vel")
MAMBA_CMD_VELOCITY, self._send_can(ORIN_CMD_MODE, encode_mode_cmd(MODE_IDLE), "watchdog idle")
encode_velocity_cmd(0.0, 0.0),
"watchdog zero-vel",
)
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "watchdog idle"
)
# ── CAN send helper ─────────────────────────────────────────────────── # ── CAN send helper ───────────────────────────────────────────────────
@ -305,24 +262,28 @@ class CanBridgeNode(Node):
arb_id = frame.arbitration_id arb_id = frame.arbitration_id
data = bytes(frame.data) data = bytes(frame.data)
# VESC STATUS_1 CAN IDs: (VESC_STATUS_1 << 8) | node_id
_vesc_left_status1 = (VESC_STATUS_1 << 8) | self._left_vesc_id
_vesc_right_status1 = (VESC_STATUS_1 << 8) | self._right_vesc_id
try: try:
if arb_id == MAMBA_TELEM_IMU: if arb_id == FC_STATUS:
self._handle_imu(data, frame.timestamp) self._handle_fc_status(data)
elif arb_id == MAMBA_TELEM_BATTERY: elif arb_id == FC_VESC:
self._handle_battery(data, frame.timestamp) self._handle_fc_vesc(data)
elif arb_id == VESC_TELEM_STATE + self._left_vesc_id: elif arb_id == _vesc_left_status1:
self._handle_vesc_state(data, frame.timestamp, side="left") telem = decode_vesc_status1(self._left_vesc_id, data)
msg = Float32MultiArray()
msg.data = [telem.erpm, telem.duty, 0.0, telem.current]
self._pub_vesc_left.publish(msg)
elif arb_id == VESC_TELEM_STATE + self._right_vesc_id: elif arb_id == _vesc_right_status1:
self._handle_vesc_state(data, frame.timestamp, side="right") telem = decode_vesc_status1(self._right_vesc_id, data)
msg = Float32MultiArray()
elif arb_id == ORIN_CAN_ID_FC_PID_ACK: msg.data = [telem.erpm, telem.duty, 0.0, telem.current]
gains = decode_pid_ack(data) self._pub_vesc_right.publish(msg)
self.get_logger().debug(
f"FC PID ACK: Kp={gains.kp:.2f} Ki={gains.ki:.2f} Kd={gains.kd:.2f}"
)
except Exception as exc: except Exception as exc:
self.get_logger().warning( self.get_logger().warning(
@ -331,51 +292,38 @@ class CanBridgeNode(Node):
# ── Frame handlers ──────────────────────────────────────────────────── # ── Frame handlers ────────────────────────────────────────────────────
def _handle_imu(self, data: bytes, timestamp: float) -> None: def _handle_fc_status(self, data: bytes) -> None:
telem = decode_imu_telem(data) """FC_STATUS (0x400): pitch, motor_cmd, vbat_mv, state, flags."""
telem = decode_fc_status(data)
msg = Imu() # Publish pitch as IMU (orientation only — yaw/roll unknown from FC_STATUS)
msg.header.stamp = self.get_clock().now().to_msg() imu_msg = Imu()
msg.header.frame_id = "imu_link" imu_msg.header.stamp = self.get_clock().now().to_msg()
imu_msg.header.frame_id = "imu_link"
# Only pitch is available; publish as angular velocity placeholder
imu_msg.angular_velocity.y = telem.pitch_deg # degrees, not rad/s
imu_msg.orientation_covariance[0] = -1.0 # covariance unknown
self._pub_imu.publish(imu_msg)
msg.linear_acceleration.x = telem.accel_x # Publish battery (vbat_mv → volts)
msg.linear_acceleration.y = telem.accel_y bat_msg = BatteryState()
msg.linear_acceleration.z = telem.accel_z bat_msg.header.stamp = imu_msg.header.stamp
bat_msg.voltage = telem.vbat_mv / 1000.0
bat_msg.present = True
bat_msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
self._pub_battery.publish(bat_msg)
msg.angular_velocity.x = telem.gyro_x def _handle_fc_vesc(self, data: bytes) -> None:
msg.angular_velocity.y = telem.gyro_y """FC_VESC (0x401): left/right RPM and current aggregated by BALANCE."""
msg.angular_velocity.z = telem.gyro_z telem = decode_fc_vesc(data)
# Covariance unknown; mark as -1 per REP-145 left_msg = Float32MultiArray()
msg.orientation_covariance[0] = -1.0 left_msg.data = [telem.left_rpm, 0.0, 0.0, telem.left_cur]
self._pub_vesc_left.publish(left_msg)
self._pub_imu.publish(msg) right_msg = Float32MultiArray()
right_msg.data = [telem.right_rpm, 0.0, 0.0, telem.right_cur]
def _handle_battery(self, data: bytes, timestamp: float) -> None: self._pub_vesc_right.publish(right_msg)
telem = decode_battery_telem(data)
msg = BatteryState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.voltage = telem.voltage
msg.current = telem.current
msg.present = True
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
self._pub_battery.publish(msg)
def _handle_vesc_state(
self, data: bytes, timestamp: float, side: str
) -> None:
telem = decode_vesc_state(data)
msg = Float32MultiArray()
# Layout: [erpm, duty, voltage, current]
msg.data = [telem.erpm, telem.duty, telem.voltage, telem.current]
if side == "left":
self._pub_vesc_left.publish(msg)
else:
self._pub_vesc_right.publish(msg)
# ── Status helper ───────────────────────────────────────────────────── # ── Status helper ─────────────────────────────────────────────────────
@ -390,14 +338,8 @@ class CanBridgeNode(Node):
"""Send zero velocity and shut down the CAN bus cleanly.""" """Send zero velocity and shut down the CAN bus cleanly."""
if self._connected and self._bus is not None: if self._connected and self._bus is not None:
try: try:
self._send_can( self._send_can(ORIN_CMD_DRIVE, encode_drive_cmd(0.0, 0.0), "shutdown")
MAMBA_CMD_VELOCITY, self._send_can(ORIN_CMD_MODE, encode_mode_cmd(MODE_IDLE), "shutdown")
encode_velocity_cmd(0.0, 0.0),
"shutdown",
)
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "shutdown"
)
except Exception: except Exception:
pass pass
try: try:

View File

@ -1,87 +1,115 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """mamba_protocol.py — CAN frame codec for SAUL-TEE ESP32 BALANCE + VESC.
mamba_protocol.py CAN message encoding/decoding for the Mamba motor controller
and VESC telemetry.
CAN message layout Spec source: docs/SAUL-TEE-SYSTEM-REFERENCE.md §6 & §8 (2026-04-04)
------------------
Command frames (Orin Mamba / VESC):
MAMBA_CMD_VELOCITY 0x100 8 bytes left_speed (f32, m/s) | right_speed (f32, m/s)
MAMBA_CMD_MODE 0x101 1 byte mode (0=idle, 1=drive, 2=estop)
MAMBA_CMD_ESTOP 0x102 1 byte 0x01 = stop
Telemetry frames (Mamba Orin): File name retained for import compatibility.
MAMBA_TELEM_IMU 0x200 24 bytes accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z (f32 each)
MAMBA_TELEM_BATTERY 0x201 8 bytes voltage (f32, V) | current (f32, A)
VESC telemetry frame (VESC Orin): CAN bus: 500 kbps, CANable 2.0 (slcan0) on Orin.
VESC_TELEM_STATE 0x300 16 bytes erpm (f32) | duty (f32) | voltage (f32) | current (f32)
All multi-byte fields are big-endian. Orin ESP32 BALANCE (commands)
0x300 DRIVE 8 B [left_mps f32 LE][right_mps f32 LE] velocity setpoint
0x301 MODE 1 B [mode u8] 0=idle 1=drive 2=estop
0x302 ESTOP 1 B [flags u8] bit0=stop bit1=clear
0x303 LED 4 B [pattern u8][r u8][g u8][b u8]
Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674 ESP32 BALANCE Orin (telemetry)
0x400 FC_STATUS 8 B [pitch_x10 i16][motor_cmd i16][vbat_mv u16][state u8][flags u8] 10 Hz
0x401 FC_VESC 8 B [l_rpm_x10 i16][r_rpm_x10 i16][l_cur_x10 i16][r_cur_x10 i16] 10 Hz
VESC standard CAN frames (29-bit extended IDs)
VESC node IDs: Left = 56, Right = 68
ID = (packet_type << 8) | node_id
SET_RPM cmd=3 payload: rpm i32 big-endian
STATUS cmd=9 payload: erpm i32, current i16, duty i16
STATUS_4 cmd=16 payload: temp_fet×10 i16, temp_motor×10 i16,
current_in×10 i16, pid_pos×50 i16
STATUS_5 cmd=27 payload: tacho i32, vbat×10 i16
""" """
import struct import struct
from dataclasses import dataclass from dataclasses import dataclass
from typing import Tuple
# --------------------------------------------------------------------------- # ── CAN IDs ───────────────────────────────────────────────────────────────────
# CAN message IDs
# ---------------------------------------------------------------------------
MAMBA_CMD_VELOCITY: int = 0x100 # Orin → ESP32 BALANCE
MAMBA_CMD_MODE: int = 0x101 ORIN_CMD_DRIVE: int = 0x300
MAMBA_CMD_ESTOP: int = 0x102 ORIN_CMD_MODE: int = 0x301
ORIN_CMD_ESTOP: int = 0x302
ORIN_CMD_LED: int = 0x303
MAMBA_TELEM_IMU: int = 0x200 # ESP32 BALANCE → Orin
MAMBA_TELEM_BATTERY: int = 0x201 FC_STATUS: int = 0x400
FC_VESC: int = 0x401
VESC_TELEM_STATE: int = 0x300 # VESC node IDs (per docs/SAUL-TEE-SYSTEM-REFERENCE.md §8)
ORIN_CAN_ID_PID_SET: int = 0x305 VESC_LEFT_ID: int = 56
ORIN_CAN_ID_FC_PID_ACK: int = 0x405 VESC_RIGHT_ID: int = 68
# --------------------------------------------------------------------------- # VESC CAN packet types (standard VESC protocol)
# Mode constants VESC_CMD_SET_RPM: int = 3
# --------------------------------------------------------------------------- VESC_STATUS_1: int = 9
VESC_STATUS_4: int = 16
VESC_STATUS_5: int = 27
# ── Mode constants ────────────────────────────────────────────────────────────
MODE_IDLE: int = 0 MODE_IDLE: int = 0
MODE_DRIVE: int = 1 MODE_DRIVE: int = 1
MODE_ESTOP: int = 2 MODE_ESTOP: int = 2
# --------------------------------------------------------------------------- # ── Data classes ──────────────────────────────────────────────────────────────
# Data classes for decoded telemetry
# --------------------------------------------------------------------------- @dataclass
class DriveCmd:
left_mps: float = 0.0 # m/s, positive = forward
right_mps: float = 0.0
@dataclass @dataclass
class ImuTelemetry: class FcStatus:
"""Decoded IMU telemetry from Mamba (MAMBA_TELEM_IMU).""" """Decoded FC_STATUS (0x400) telemetry from ESP32 BALANCE."""
pitch_deg: float = 0.0 # pitch_x10 / 10.0
accel_x: float = 0.0 # m/s² motor_cmd: int = 0 # raw motor command output
accel_y: float = 0.0 vbat_mv: int = 0 # battery voltage in mV
accel_z: float = 0.0 state: int = 0 # 0=idle 1=running 2=fault
gyro_x: float = 0.0 # rad/s flags: int = 0 # error bitmask
gyro_y: float = 0.0
gyro_z: float = 0.0
@dataclass @dataclass
class BatteryTelemetry: class FcVesc:
"""Decoded battery telemetry from Mamba (MAMBA_TELEM_BATTERY).""" """Decoded FC_VESC (0x401) VESC aggregate telemetry."""
left_rpm: float = 0.0 # left_rpm_x10 / 10.0
right_rpm: float = 0.0
left_cur: float = 0.0 # left_cur_x10 / 10.0 (amps)
right_cur: float = 0.0
voltage: float = 0.0 # V
@dataclass
class VescStatus1:
"""Decoded VESC STATUS (cmd=9) frame."""
node_id: int = 0
erpm: float = 0.0
current: float = 0.0 # A current: float = 0.0 # A
duty: float = 0.0 # -1.0..+1.0
@dataclass @dataclass
class VescStateTelemetry: class VescStatus4:
"""Decoded VESC state telemetry (VESC_TELEM_STATE).""" """Decoded VESC STATUS_4 (cmd=16) frame."""
node_id: int = 0
temp_fet_c: float = 0.0
temp_motor_c: float = 0.0
current_in: float = 0.0
erpm: float = 0.0 # electrical RPM
duty: float = 0.0 # duty cycle [-1.0, 1.0] @dataclass
voltage: float = 0.0 # bus voltage, V class VescStatus5:
current: float = 0.0 # phase current, A """Decoded VESC STATUS_5 (cmd=27) frame."""
node_id: int = 0
tacho: int = 0
vbat_v: float = 0.0
@dataclass @dataclass
@ -92,133 +120,98 @@ class PidGains:
kd: float = 0.0 kd: float = 0.0
# --------------------------------------------------------------------------- # ── Orin → BALANCE encoders ───────────────────────────────────────────────────
# Encode helpers
# ---------------------------------------------------------------------------
_FMT_VEL = ">ff" # 2 × float32, big-endian def encode_drive_cmd(left_mps: float, right_mps: float) -> bytes:
_FMT_MODE = ">B" # 1 × uint8 """Encode ORIN_CMD_DRIVE (0x300) — 8 bytes, float32 LE."""
_FMT_ESTOP = ">B" # 1 × uint8 return struct.pack("<ff", float(left_mps), float(right_mps))
_FMT_IMU = ">ffffff" # 6 × float32
_FMT_BAT = ">ff" # 2 × float32
_FMT_VESC = ">ffff" # 4 × float32
def encode_velocity_cmd(left_mps: float, right_mps: float) -> bytes:
"""
Encode a MAMBA_CMD_VELOCITY payload.
Parameters
----------
left_mps: target left wheel speed in m/s (positive = forward)
right_mps: target right wheel speed in m/s (positive = forward)
Returns
-------
8-byte big-endian payload suitable for a CAN frame.
"""
return struct.pack(_FMT_VEL, float(left_mps), float(right_mps))
def encode_mode_cmd(mode: int) -> bytes: def encode_mode_cmd(mode: int) -> bytes:
""" """Encode ORIN_CMD_MODE (0x301) — 1 byte."""
Encode a MAMBA_CMD_MODE payload.
Parameters
----------
mode: one of MODE_IDLE (0), MODE_DRIVE (1), MODE_ESTOP (2)
Returns
-------
1-byte payload.
"""
if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP): if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
raise ValueError(f"Invalid mode {mode!r}; expected 0, 1, or 2") raise ValueError(f"Invalid mode {mode!r}")
return struct.pack(_FMT_MODE, mode) return struct.pack("B", mode)
def encode_estop_cmd(stop: bool = True) -> bytes: def encode_estop_cmd(stop: bool = True, clear: bool = False) -> bytes:
""" """Encode ORIN_CMD_ESTOP (0x302) — 1 byte flags (bit0=stop, bit1=clear)."""
Encode a MAMBA_CMD_ESTOP payload. flags = (0x01 if stop else 0) | (0x02 if clear else 0)
return struct.pack("B", flags)
Parameters
----------
stop: True to assert e-stop, False to clear.
Returns
-------
1-byte payload (0x01 = stop, 0x00 = clear).
"""
return struct.pack(_FMT_ESTOP, 0x01 if stop else 0x00)
def encode_pid_set_cmd(kp: float, ki: float, kd: float) -> bytes: def encode_led_cmd(pattern: int, r: int, g: int, b: int) -> bytes:
"""Encode ORIN_CAN_ID_PID_SET (6 bytes, uint16 BE x3). Issue #693.""" """Encode ORIN_CMD_LED (0x303) — 4 bytes."""
if kp < 0.0 or ki < 0.0 or kd < 0.0: return struct.pack("BBBB", pattern & 0xFF, r & 0xFF, g & 0xFF, b & 0xFF)
raise ValueError("PID gains must be non-negative")
return struct.pack(_FMT_PID, round(min(kp,_PID_KP_MAX)*100), round(min(ki,_PID_KI_MAX)*100), round(min(kd,_PID_KD_MAX)*100))
# --------------------------------------------------------------------------- # ── VESC encoders ─────────────────────────────────────────────────────────────
# Decode helpers
# ---------------------------------------------------------------------------
def decode_imu_telem(data: bytes) -> ImuTelemetry: def encode_vesc_set_rpm(node_id: int, rpm: int) -> tuple[int, bytes]:
""" """Encode VESC SET_RPM command. Returns (can_id, payload)."""
Decode a MAMBA_TELEM_IMU payload. can_id = (VESC_CMD_SET_RPM << 8) | (node_id & 0xFF)
return can_id, struct.pack(">i", int(rpm))
Parameters
----------
data: exactly 24 bytes (6 × float32, big-endian).
Returns # ── BALANCE → Orin decoders ───────────────────────────────────────────────────
-------
ImuTelemetry dataclass instance.
Raises def decode_fc_status(data: bytes) -> FcStatus:
------ """Decode FC_STATUS (0x400) — 8 bytes."""
struct.error if data is the wrong length. if len(data) < 8:
""" raise ValueError(f"FC_STATUS expects 8 bytes, got {len(data)}")
ax, ay, az, gx, gy, gz = struct.unpack(_FMT_IMU, data) pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack_from(">hHHBB", data)
return ImuTelemetry( return FcStatus(
accel_x=ax, accel_y=ay, accel_z=az, pitch_deg=pitch_x10 / 10.0,
gyro_x=gx, gyro_y=gy, gyro_z=gz, motor_cmd=motor_cmd,
vbat_mv=vbat_mv,
state=state,
flags=flags,
) )
def decode_battery_telem(data: bytes) -> BatteryTelemetry: def decode_fc_vesc(data: bytes) -> FcVesc:
""" """Decode FC_VESC (0x401) — 8 bytes."""
Decode a MAMBA_TELEM_BATTERY payload. if len(data) < 8:
raise ValueError(f"FC_VESC expects 8 bytes, got {len(data)}")
Parameters l_rpm, r_rpm, l_cur, r_cur = struct.unpack_from(">hhhh", data)
---------- return FcVesc(
data: exactly 8 bytes (2 × float32, big-endian). left_rpm=l_rpm / 10.0,
right_rpm=r_rpm / 10.0,
Returns left_cur=l_cur / 10.0,
------- right_cur=r_cur / 10.0,
BatteryTelemetry dataclass instance. )
"""
voltage, current = struct.unpack(_FMT_BAT, data)
return BatteryTelemetry(voltage=voltage, current=current)
def decode_vesc_state(data: bytes) -> VescStateTelemetry: # ── VESC telemetry decoders ───────────────────────────────────────────────────
"""
Decode a VESC_TELEM_STATE payload.
Parameters def decode_vesc_can_id(can_id: int) -> tuple[int, int]:
---------- """Split a VESC extended CAN ID into (packet_type, node_id)."""
data: exactly 16 bytes (4 × float32, big-endian). return (can_id >> 8) & 0xFF, can_id & 0xFF
Returns
-------
VescStateTelemetry dataclass instance.
"""
erpm, duty, voltage, current = struct.unpack(_FMT_VESC, data)
return VescStateTelemetry(erpm=erpm, duty=duty, voltage=voltage, current=current)
def decode_pid_ack(data: bytes) -> PidGains: def decode_vesc_status1(node_id: int, data: bytes) -> VescStatus1:
"""Decode ORIN_CAN_ID_FC_PID_ACK (6 bytes). Issue #693.""" """Decode VESC STATUS (cmd=9) — erpm i32, current i16, duty i16."""
kp_x100, ki_x100, kd_x100 = struct.unpack(_FMT_PID, data) erpm, cur_x10, duty_x1000 = struct.unpack_from(">ihh", data[:8])
return PidGains(kp=kp_x100/100.0, ki=ki_x100/100.0, kd=kd_x100/100.0) return VescStatus1(
node_id=node_id,
erpm=float(erpm),
current=cur_x10 / 10.0,
duty=duty_x1000 / 1000.0,
)
def decode_vesc_status4(node_id: int, data: bytes) -> VescStatus4:
"""Decode VESC STATUS_4 (cmd=16) — temp_fet×10, temp_motor×10, current_in×10."""
tfet, tmot, cur_in, _ = struct.unpack_from(">hhhh", data[:8])
return VescStatus4(
node_id=node_id,
temp_fet_c=tfet / 10.0,
temp_motor_c=tmot / 10.0,
current_in=cur_in / 10.0,
)
def decode_vesc_status5(node_id: int, data: bytes) -> VescStatus5:
"""Decode VESC STATUS_5 (cmd=27) — tacho i32, vbat×10 i16."""
tacho, vbat_x10 = struct.unpack_from(">ih", data[:6])
return VescStatus5(node_id=node_id, tacho=tacho, vbat_v=vbat_x10 / 10.0)