Compare commits

..

7 Commits

Author SHA1 Message Date
aec9c472cf feat: Orin serial OTA_CHECK + OTA_UPDATE commands, version reporting (bd-1s1s)
Extends the bd-66hx serial protocol with two new Orin→ESP32 commands:
  CMD_OTA_CHECK  (0x10): triggers gitea_ota_check_now(), responds with
    TELEM_VERSION_INFO (0x84) for Balance and IO (current + available ver).
  CMD_OTA_UPDATE (0x11): uint8 target (0=balance, 1=io, 2=both) — triggers
    uart_ota_trigger() for IO or ota_self_trigger() for Balance.
    NACK with ERR_OTA_BUSY or ERR_OTA_NO_UPDATE on failure.
New telemetry: TELEM_OTA_STATUS (0x83, target+state+progress+err),
  TELEM_VERSION_INFO (0x84, target+current[16]+available[16]).
Wires OTA stack into app_main: ota_self_health_check on boot,
  gitea_ota_init + ota_display_init after peripherals ready.
CMakeLists updated with all OTA component dependencies.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:18:57 -04:00
d6b15957db feat: GC9A01 OTA notification badge + progress ring UI (bd-1yr8)
Adds ota_display_task (5 Hz) on GC9A01 240×240 round LCD:
- Idle: orange dot badge at top-right when update available, version text
- Progress: arc sweeping 0→360° around display perimeter with phase label
- States: Downloading/Verifying/Applying/Rebooting (Balance) and
  Downloading/Sending/Done (IO via UART)
- Error: red arc + "FAILED RETRY?" prompt
Display primitives (fill_rect, draw_string, draw_arc) are stubs called
from the GC9A01 SPI driver layer (separate driver bead).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:17:07 -04:00
7ef6220e15 feat: UART OTA protocol Balance→IO board, 1 KB chunk + ACK (bd-21hv)
Balance side (uart_ota.c): downloads io-firmware.bin from Gitea to RAM,
computes SHA256, then streams to IO over UART1 (GPIO17/18, 460800 baud)
as OTA_BEGIN/OTA_DATA/OTA_END frames with CRC8 + per-chunk ACK/retry (×3).
IO side (uart_ota_recv.c): receives frames, writes to inactive OTA partition
via esp_ota_write, verifies SHA256 on OTA_END, sets boot partition, reboots.
IO board main.c + CMakeLists.txt scaffold included.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:16:11 -04:00
87b715f22a feat: Balance self-OTA download, SHA256 verify, rollback (bd-18nb)
Downloads balance-firmware.bin from Gitea release URL to inactive OTA
partition, streams SHA256 verification via mbedTLS, sets boot partition
and reboots. Auto-rollback via CONFIG_BOOTLOADER_APP_ROLLBACK_ENABLE if
ota_self_health_check() not called within 30 s of boot. Progress 0-100%
in g_ota_self_progress for display task.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:14:29 -04:00
105384e85a feat: Gitea release version checker with WiFi (bd-3hte)
Adds gitea_ota_check_task on Balance board: fetches Gitea releases API
every 30 min and on boot, filters by esp32-balance/ and esp32-io/ tag
prefixes, compares semver against embedded FW version, stores update info
(version string, download URL, SHA256) in g_balance_update / g_io_update.
WiFi credentials read from NVS namespace "wifi"; falls back to compile-time
DEFAULT_WIFI_SSID/PASS if NVS is empty.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:13:47 -04:00
ada7972122 feat: partition tables + OTA setup for Balance and IO boards (bd-3gwo)
Add dual OTA partitions (ota_0/ota_1 × 1.75 MB each) and otadata to
both esp32s3/balance/ and esp32s3/io/ on 4 MB flash layouts.
Enable CONFIG_BOOTLOADER_APP_ROLLBACK_ENABLE and OTA HTTP on Balance.
Create esp32s3/io/ project scaffold with config.h pin assignments.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:12:47 -04:00
3c895ae37b feat: add UART/USB serial protocol for Orin comms, proxy VESC CAN (bd-66hx)
Replaces Orin↔ESP32-S3 BALANCE CAN comms (0x300-0x303 / 0x400-0x401)
with binary serial framing over CH343 USB-CDC at 460800 baud.

Protocol matches bd-wim1 (sl-perception) exactly:
  Frame: [0xAA][LEN][TYPE][PAYLOAD][CRC8-SMBUS]
  CRC covers LEN+TYPE+PAYLOAD, big-endian multi-byte fields.

Commands (Orin→ESP32): HEARTBEAT/DRIVE/ESTOP/ARM/PID
Telemetry (ESP32→Orin): TELEM_STATUS, TELEM_VESC_LEFT (ID 56),
                         TELEM_VESC_RIGHT (ID 68), ACK/NACK

VESC CAN TWAI kept for motor control; drive commands from Orin
forwarded to VESCs via SET_RPM. Hardware note: SN65HVD230
rewired from IO43/44 to IO2/IO1 to free IO43/44 for CH343.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-17 22:12:09 -04:00
23 changed files with 0 additions and 2796 deletions

View File

@ -1,162 +0,0 @@
# .gitea/workflows/ota-release.yml
# Gitea Actions — ESP32 OTA firmware build & release (bd-9kod)
#
# Triggers on signed release tags:
# esp32-balance/vX.Y.Z → builds esp32s3/balance/ (ESP32-S3 Balance board)
# esp32-io/vX.Y.Z → builds esp32s3-io/ (ESP32-S3 IO board)
#
# Uses the official espressif/idf Docker image for reproducible builds.
# Attaches <app>_<version>.bin + <app>_<version>.sha256 to the Gitea release.
# The ESP32 Balance OTA system fetches the .bin from the release asset URL.
name: OTA release — build & attach firmware
on:
push:
tags:
- "esp32-balance/v*"
- "esp32-io/v*"
permissions:
contents: write
jobs:
build-and-release:
name: Build ${{ github.ref_name }}
runs-on: ubuntu-latest
container:
image: espressif/idf:v5.2.2
options: --user root
steps:
# ── 1. Checkout ───────────────────────────────────────────────────────────
- name: Checkout
uses: actions/checkout@v4
# ── 2. Resolve build target from tag ─────────────────────────────────────
# Tag format: esp32-balance/v1.2.3 or esp32-io/v1.2.3
- name: Resolve project from tag
id: proj
shell: bash
run: |
TAG="${GITHUB_REF_NAME}"
case "$TAG" in
esp32-balance/*)
DIR="esp32s3/balance"
APP="esp32s3_balance"
;;
esp32-io/*)
DIR="esp32s3-io"
APP="esp32s3_io"
;;
*)
echo "::error::Unrecognised tag prefix: ${TAG}"
exit 1
;;
esac
VERSION="${TAG#*/}"
echo "dir=${DIR}" >> "$GITHUB_OUTPUT"
echo "app=${APP}" >> "$GITHUB_OUTPUT"
echo "version=${VERSION}" >> "$GITHUB_OUTPUT"
echo "tag=${TAG}" >> "$GITHUB_OUTPUT"
echo "Build: ${APP} ${VERSION} from ${DIR}"
# ── 3. Build with ESP-IDF ─────────────────────────────────────────────────
- name: Build firmware (idf.py build)
shell: bash
run: |
. "${IDF_PATH}/export.sh"
cd "${{ steps.proj.outputs.dir }}"
idf.py build
# ── 4. Collect binary & generate checksum ────────────────────────────────
- name: Collect artifacts
id: art
shell: bash
run: |
APP="${{ steps.proj.outputs.app }}"
VER="${{ steps.proj.outputs.version }}"
BIN_SRC="${{ steps.proj.outputs.dir }}/build/${APP}.bin"
BIN_OUT="${APP}_${VER}.bin"
SHA_OUT="${APP}_${VER}.sha256"
cp "$BIN_SRC" "$BIN_OUT"
sha256sum "$BIN_OUT" > "$SHA_OUT"
echo "bin=${BIN_OUT}" >> "$GITHUB_OUTPUT"
echo "sha=${SHA_OUT}" >> "$GITHUB_OUTPUT"
echo "Binary: ${BIN_OUT} ($(wc -c < "$BIN_OUT") bytes)"
echo "Checksum: $(cat "$SHA_OUT")"
# ── 5. Archive artifacts in CI workspace ─────────────────────────────────
- name: Upload build artifacts
uses: actions/upload-artifact@v4
with:
name: firmware-${{ steps.proj.outputs.app }}-${{ steps.proj.outputs.version }}
path: |
${{ steps.art.outputs.bin }}
${{ steps.art.outputs.sha }}
# ── 6. Create Gitea release (if needed) & upload assets ──────────────────
# Uses GITHUB_TOKEN (auto-provided, contents:write from permissions block).
# URL-encodes the tag to handle the slash in esp32-balance/vX.Y.Z.
- name: Publish assets to Gitea release
shell: bash
env:
GITEA_URL: https://gitea.vayrette.com
TOKEN: ${{ secrets.GITHUB_TOKEN }}
REPO: ${{ github.repository }}
TAG: ${{ steps.proj.outputs.tag }}
BIN: ${{ steps.art.outputs.bin }}
SHA: ${{ steps.art.outputs.sha }}
run: |
API="${GITEA_URL}/api/v1/repos/${REPO}"
# URL-encode the tag (slash in esp32-balance/vX.Y.Z must be escaped)
TAG_ENC=$(python3 -c "
import urllib.parse, sys
print(urllib.parse.quote(sys.argv[1], safe=''))
" "$TAG")
# Try to fetch an existing release for this tag
RELEASE=$(curl -sf \
-H "Authorization: token ${TOKEN}" \
"${API}/releases/tags/${TAG_ENC}") || true
# If no release yet, create it
if [ -z "$RELEASE" ]; then
echo "Creating release for tag: ${TAG}"
RELEASE=$(curl -sf \
-X POST \
-H "Authorization: token ${TOKEN}" \
-H "Content-Type: application/json" \
-d "$(python3 -c "
import json, sys
print(json.dumps({
'tag_name': sys.argv[1],
'name': sys.argv[1],
'draft': False,
'prerelease': False,
}))
" "$TAG")" \
"${API}/releases")
fi
RELEASE_ID=$(echo "$RELEASE" | python3 -c "
import sys, json; print(json.load(sys.stdin)['id'])
")
echo "Release ID: ${RELEASE_ID}"
# Upload binary and checksum
for FILE in "$BIN" "$SHA"; do
FNAME=$(basename "$FILE")
echo "Uploading: ${FNAME}"
curl -sf \
-X POST \
-H "Authorization: token ${TOKEN}" \
-F "attachment=@${FILE}" \
"${API}/releases/${RELEASE_ID}/assets?name=${FNAME}"
done
echo "Published: ${BIN} + ${SHA} → release ${TAG}"

View File

@ -1,22 +0,0 @@
here4_can_node:
ros__parameters:
# SocketCAN interface — freed from ESP32 BALANCE comms by bd-wim1
can_interface: can0
# DroneCAN bitrate — Here4 runs at 1 Mbps (not 500 kbps used previously)
bitrate: 1000000
# Local DroneCAN node ID for this Orin instance (1-127, must be unique on bus)
local_node_id: 126
# Set true to run: ip link set can0 type can bitrate 1000000 && ip link set can0 up
# Requires: sudo setcap cap_net_admin+eip $(which python3) or run as root
# Default false — manage interface externally (systemd-networkd or udev)
bring_up_can: false
# Here4 DroneCAN node ID — 0 means auto-detect from first Fix2 message
node_id_filter: 0
# TF frame IDs
fix_frame_id: gps
imu_frame_id: here4_imu

View File

@ -1,109 +0,0 @@
"""here4.launch.py — Launch the Here4 GPS DroneCAN bridge on CANable2.
bd-p47c: CANable2 freed by bd-wim1 (ESP32 moved to UART/USB) now used for
Here4 GPS at 1 Mbps DroneCAN/UAVCAN v0.
CAN setup (one-time, survives reboot via systemd-networkd or udev)
------------------------------------------------------------------
# Option A — manual (quick test):
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
# Option B — let the node do it (requires CAP_NET_ADMIN):
ros2 launch saltybot_dronecan_gps here4.launch.py bring_up_can:=true
# Option C — systemd-networkd (/etc/systemd/network/80-can0.network):
[Match]
Name=can0
[CAN]
BitRate=1M
Published topics
----------------
/gps/fix sensor_msgs/NavSatFix navsat_transform_node (EKF)
/gps/velocity geometry_msgs/TwistWithCovarianceStamped
/here4/fix sensor_msgs/NavSatFix (alias)
/here4/imu sensor_msgs/Imu
/here4/mag sensor_msgs/MagneticField
/here4/baro sensor_msgs/FluidPressure
/here4/status std_msgs/String JSON
/here4/node_id std_msgs/Int32
Subscribed topics
-----------------
/rtcm std_msgs/ByteMultiArray RTCM corrections (NTRIP client)
/rtcm_hex std_msgs/String hex-encoded RTCM (fallback)
Usage
-----
# Default (interface already up):
ros2 launch saltybot_dronecan_gps here4.launch.py
# Bring up interface automatically (CAP_NET_ADMIN required):
ros2 launch saltybot_dronecan_gps here4.launch.py bring_up_can:=true
# Override interface (e.g. second CAN adapter):
ros2 launch saltybot_dronecan_gps here4.launch.py can_interface:=can1
# Pin to specific Here4 node ID (skip auto-detect):
ros2 launch saltybot_dronecan_gps here4.launch.py node_id_filter:=10
System dependency
-----------------
pip install dronecan # DroneCAN/UAVCAN v0 Python library
apt install can-utils # optional: candump, cansend for debugging
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
pkg_share = get_package_share_directory("saltybot_dronecan_gps")
params_file = os.path.join(pkg_share, "config", "here4_params.yaml")
args = [
DeclareLaunchArgument(
"can_interface",
default_value="can0",
description="SocketCAN interface (CANable2 @ 1 Mbps)",
),
DeclareLaunchArgument(
"bring_up_can",
default_value="false",
description="Bring up can_interface via ip link (requires CAP_NET_ADMIN)",
),
DeclareLaunchArgument(
"node_id_filter",
default_value="0",
description="DroneCAN node ID of Here4 (0=auto-detect from first Fix2)",
),
DeclareLaunchArgument(
"local_node_id",
default_value="126",
description="DroneCAN node ID for this Orin (must be unique on bus)",
),
]
node = Node(
package="saltybot_dronecan_gps",
executable="here4_node",
name="here4_can_node",
output="screen",
parameters=[
params_file,
{
"can_interface": LaunchConfiguration("can_interface"),
"bring_up_can": LaunchConfiguration("bring_up_can"),
"node_id_filter": LaunchConfiguration("node_id_filter"),
"local_node_id": LaunchConfiguration("local_node_id"),
},
],
)
return LaunchDescription([*args, node])

View File

@ -1,37 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_dronecan_gps</name>
<version>0.1.0</version>
<description>
DroneCAN/UAVCAN v0 bridge for Here4 GPS on the CANable2 SocketCAN adapter.
Publishes GPS fix, IMU, magnetometer, and barometer data to ROS2.
Injects RTCM corrections for RTK.
bd-p47c: CANable2 freed from ESP32 BALANCE comms by bd-wim1.
Here4 operates at 1 Mbps on can0 (was 500 kbps for VESC comms).
System dependency: dronecan Python library (pip install dronecan)
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<!-- dronecan: pip install dronecan (DroneCAN/UAVCAN v0 Python library) -->
<!-- python3-can is a transitive dependency of dronecan -->
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -1 +0,0 @@
# saltybot_dronecan_gps — Here4 GPS DroneCAN bridge for CANable2 (bd-p47c)

View File

@ -1,333 +0,0 @@
"""dronecan_parser.py — Pure-Python conversion of DroneCAN/UAVCAN v0 message
objects to plain Python dicts, independent of ROS2.
Keeps the dronecan library dependency isolated so that unit tests can run
without the dronecan package or CAN hardware.
DroneCAN message types handled
-------------------------------
uavcan.equipment.gnss.Fix2 DTID 1063
uavcan.equipment.gnss.Auxiliary DTID 1060
uavcan.equipment.ahrs.RawIMU DTID 1003
uavcan.equipment.ahrs.MagneticFieldStrength2 DTID 1001
uavcan.equipment.air_data.StaticPressure DTID 1028
uavcan.equipment.air_data.StaticTemperature DTID 1029
uavcan.protocol.NodeStatus DTID 341
uavcan.equipment.gnss.RTCMStream DTID 1062 (TX: RTCM injection)
Fix2 status values
------------------
0 NO_FIX
1 TIME_ONLY
2 2D_FIX
3 3D_FIX
NodeStatus health / mode
-------------------------
health 0=OK 1=WARNING 2=ERROR 3=CRITICAL
mode 0=OPERATIONAL 1=INITIALIZATION 2=MAINTENANCE 3=SOFTWARE_UPDATE 7=OFFLINE
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
# ── DSDL constants ────────────────────────────────────────────────────────────
DTID_FIX2 = 1063
DTID_AUXILIARY = 1060
DTID_RAW_IMU = 1003
DTID_MAG_FIELD = 1001
DTID_STATIC_PRESSURE = 1028
DTID_STATIC_TEMPERATURE = 1029
DTID_NODE_STATUS = 341
DTID_RTCM_STREAM = 1062
# Fix2 status
FIX_NO_FIX = 0
FIX_TIME_ONLY = 1
FIX_2D = 2
FIX_3D = 3
FIX_LABEL = {FIX_NO_FIX: "NO_FIX", FIX_TIME_ONLY: "TIME_ONLY",
FIX_2D: "2D_FIX", FIX_3D: "3D_FIX"}
# NodeStatus health
HEALTH_OK = 0
HEALTH_WARNING = 1
HEALTH_ERROR = 2
HEALTH_CRITICAL = 3
HEALTH_LABEL = {0: "OK", 1: "WARNING", 2: "ERROR", 3: "CRITICAL"}
MODE_LABEL = {0: "OPERATIONAL", 1: "INITIALIZATION", 2: "MAINTENANCE",
3: "SOFTWARE_UPDATE", 7: "OFFLINE"}
# RTCM stream: max bytes per DroneCAN frame
RTCM_MAX_CHUNK = 128
# ── Helpers ───────────────────────────────────────────────────────────────────
def _getf(obj: Any, *attrs: str, default: Any = None) -> Any:
"""Get nested attribute with fallback, tolerant of dronecan proxy objects."""
for attr in attrs:
try:
obj = getattr(obj, attr)
except AttributeError:
return default
return obj
def _list3(vec: Any, default: float = 0.0) -> List[float]:
"""Convert a dronecan 3-element array to a plain list of floats."""
try:
return [float(vec[0]), float(vec[1]), float(vec[2])]
except (TypeError, IndexError):
return [default, default, default]
def _list9(mat: Any, default: float = 0.0) -> List[float]:
"""Convert a dronecan 9-element array (row-major 3×3) to a plain list."""
try:
return [float(mat[i]) for i in range(9)]
except (TypeError, IndexError):
return [default] * 9
# ── Fix2 ─────────────────────────────────────────────────────────────────────
def parse_fix2(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.gnss.Fix2 message.
Parameters
----------
msg : dronecan message object (event.message / event.transfer.payload)
Returns
-------
dict with keys:
lat_deg float degrees (WGS84)
lon_deg float degrees (WGS84)
alt_msl_m float metres above MSL
alt_ellipsoid_m float metres above WGS84 ellipsoid
ned_vel list[3] [north, east, down] m/s
fix_type int 0..3
fix_label str
sats_used int
pdop float
hdop float
vdop float
pos_cov list[9] row-major 3×3 position covariance,
undulation_m float WGS84 geoid undulation (m)
gnss_timestamp_us int microseconds (GPS epoch)
"""
if msg is None:
return None
try:
lat = float(_getf(msg, "latitude_deg_1e8", default=0)) * 1e-8
lon = float(_getf(msg, "longitude_deg_1e8", default=0)) * 1e-8
alt_msl = float(_getf(msg, "height_msl_mm", default=0)) * 1e-3
alt_ell = float(_getf(msg, "height_ellipsoid_mm", default=0)) * 1e-3
ned = _list3(_getf(msg, "ned_velocity"))
fix_type = int(_getf(msg, "status", default=FIX_NO_FIX))
# sats_used is uint7, may be named differently across DSDL revisions
sats = int(_getf(msg, "sats_used", default=0))
pdop = float(_getf(msg, "pdop", default=99.0))
hdop = float(_getf(msg, "hdop", default=99.0))
vdop = float(_getf(msg, "vdop", default=99.0))
undulation = float(_getf(msg, "undulation", default=0.0))
# position_covariance: optional, may be absent in older DSDL
pos_cov_raw = _getf(msg, "position_covariance")
pos_cov = _list9(pos_cov_raw) if pos_cov_raw is not None else [0.0] * 9
gnss_ts = int(_getf(msg, "gnss_timestamp", default=0))
return {
"lat_deg": lat,
"lon_deg": lon,
"alt_msl_m": alt_msl,
"alt_ellipsoid_m": alt_ell,
"ned_vel": ned,
"fix_type": fix_type,
"fix_label": FIX_LABEL.get(fix_type, f"UNKNOWN({fix_type})"),
"sats_used": sats,
"pdop": pdop,
"hdop": hdop,
"vdop": vdop,
"pos_cov": pos_cov,
"undulation_m": undulation,
"gnss_timestamp_us": gnss_ts,
}
except Exception:
return None
# ── RawIMU ────────────────────────────────────────────────────────────────────
def parse_raw_imu(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.ahrs.RawIMU message.
Returns
-------
dict with keys:
gyro_xyz list[3] rad/s
accel_xyz list[3] m/
dt_s float integration interval (seconds)
gyro_cov list[9] row-major 3×3 covariance
accel_cov list[9]
"""
if msg is None:
return None
try:
gyro = _list3(_getf(msg, "rate_gyro_latest"))
accel = _list3(_getf(msg, "accelerometer_latest"))
dt = float(_getf(msg, "integration_interval", default=0.0))
gyro_cov = _list9(_getf(msg, "rate_gyro_covariance"))
accel_cov = _list9(_getf(msg, "accelerometer_covariance"))
return {
"gyro_xyz": gyro,
"accel_xyz": accel,
"dt_s": dt,
"gyro_cov": gyro_cov,
"accel_cov": accel_cov,
}
except Exception:
return None
# ── MagneticFieldStrength2 ────────────────────────────────────────────────────
def parse_mag(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.ahrs.MagneticFieldStrength2 message.
Returns
-------
dict with keys:
sensor_id int
field_ga list[3] [x, y, z] Gauss
field_cov list[9] row-major 3×3 covariance (Gauss²)
"""
if msg is None:
return None
try:
sensor_id = int(_getf(msg, "sensor_id", default=0))
field_ga = _list3(_getf(msg, "magnetic_field_ga"))
field_cov = _list9(_getf(msg, "magnetic_field_covariance"))
return {
"sensor_id": sensor_id,
"field_ga": field_ga,
"field_cov": field_cov,
}
except Exception:
return None
# ── StaticPressure / StaticTemperature ───────────────────────────────────────
def parse_static_pressure(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.air_data.StaticPressure message.
Returns
-------
dict with keys:
pressure_pa float Pascals
pressure_variance float Pa²
"""
if msg is None:
return None
try:
return {
"pressure_pa": float(_getf(msg, "static_pressure", default=0.0)),
"pressure_variance": float(_getf(msg, "static_pressure_variance", default=0.0)),
}
except Exception:
return None
def parse_static_temperature(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.air_data.StaticTemperature message.
Returns
-------
dict with keys:
temperature_k float Kelvin
"""
if msg is None:
return None
try:
return {
"temperature_k": float(_getf(msg, "static_temperature", default=273.15)),
}
except Exception:
return None
# ── NodeStatus ────────────────────────────────────────────────────────────────
def parse_node_status(msg: Any, source_node_id: int) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.protocol.NodeStatus message.
Returns
-------
dict with keys:
node_id int
uptime_sec int
health int 0=OK .. 3=CRITICAL
health_label str
mode int
mode_label str
vendor_code int
"""
if msg is None:
return None
try:
health = int(_getf(msg, "health", default=0))
mode = int(_getf(msg, "mode", default=0))
uptime = int(_getf(msg, "uptime_sec", default=0))
vendor_code = int(_getf(msg, "vendor_specific_status_code", default=0))
return {
"node_id": source_node_id,
"uptime_sec": uptime,
"health": health,
"health_label": HEALTH_LABEL.get(health, f"UNKNOWN({health})"),
"mode": mode,
"mode_label": MODE_LABEL.get(mode, f"UNKNOWN({mode})"),
"vendor_code": vendor_code,
}
except Exception:
return None
# ── RTCM stream chunking ──────────────────────────────────────────────────────
def chunk_rtcm(data: bytes, sequence_id: int) -> List[Dict[str, Any]]:
"""Split a raw RTCM byte string into DroneCAN RTCMStream message dicts.
Each chunk carries up to RTCM_MAX_CHUNK bytes. The caller must convert
these dicts into actual dronecan.uavcan.equipment.gnss.RTCMStream objects
and broadcast them on the bus.
Parameters
----------
data : raw RTCM correction bytes
sequence_id : monotonically increasing counter (uint16, wraps at 65535)
Returns
-------
List of dicts: {"sequence_id": int, "data": bytes}
"""
chunks = []
for i in range(0, len(data), RTCM_MAX_CHUNK):
chunks.append({
"sequence_id": sequence_id & 0xFFFF,
"data": data[i:i + RTCM_MAX_CHUNK],
})
sequence_id += 1
return chunks

View File

@ -1,532 +0,0 @@
"""here4_node.py — ROS2 node for Here4 GPS via DroneCAN/UAVCAN v0 on CANable2.
bd-p47c: CANable2 (freed from ESP32 BALANCE comms by bd-wim1) is now used
for Here4 GPS at 1 Mbps DroneCAN.
CAN setup (run once before launching, or use bring_up_can:=true arg)
-----------------------------------------------------------------------
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
DroneCAN messages received from Here4
--------------------------------------
uavcan.equipment.gnss.Fix2 DTID 1063 ~5 Hz
uavcan.equipment.gnss.Auxiliary DTID 1060 ~5 Hz
uavcan.equipment.ahrs.RawIMU DTID 1003 ~50 Hz
uavcan.equipment.ahrs.MagneticFieldStrength2 DTID 1001 ~10 Hz
uavcan.equipment.air_data.StaticPressure DTID 1028 ~10 Hz
uavcan.equipment.air_data.StaticTemperature DTID 1029 ~10 Hz
uavcan.protocol.NodeStatus DTID 341 ~1 Hz
DroneCAN messages transmitted to Here4
----------------------------------------
uavcan.equipment.gnss.RTCMStream DTID 1062 on /rtcm input
ROS2 topics published
----------------------
/gps/fix sensor_msgs/NavSatFix 5 Hz feeds navsat_transform_node
/gps/velocity geometry_msgs/TwistWithCovarianceStamped 5 Hz
/here4/fix sensor_msgs/NavSatFix alias of /gps/fix
/here4/imu sensor_msgs/Imu 50 Hz
/here4/mag sensor_msgs/MagneticField 10 Hz
/here4/baro sensor_msgs/FluidPressure 10 Hz
/here4/status std_msgs/String JSON 1 Hz (node ID, health, fix quality)
/here4/node_id std_msgs/Int32 node ID once discovered
ROS2 topics subscribed
-----------------------
/rtcm std_msgs/ByteMultiArray RTCM corrections (from NTRIP client)
or std_msgs/String (hex-encoded, fallback)
Parameters
----------
can_interface str SocketCAN interface name default: can0
local_node_id int This node's DroneCAN ID (1-127) default: 126
bitrate int CAN bitrate, bps default: 1000000
bring_up_can bool Bring up can_interface via ip default: false
node_id_filter int Specific Here4 node ID (0=auto) default: 0
fix_frame_id str Frame ID for NavSatFix default: gps
imu_frame_id str Frame ID for Imu default: here4_imu
"""
from __future__ import annotations
import json
import math
import subprocess
import threading
import time
from typing import Any, Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TwistWithCovarianceStamped
from sensor_msgs.msg import FluidPressure, Imu, MagneticField, NavSatFix, NavSatStatus
from std_msgs.msg import ByteMultiArray, Int32, String
from .dronecan_parser import (
FIX_3D,
FIX_NO_FIX,
chunk_rtcm,
parse_fix2,
parse_mag,
parse_raw_imu,
parse_static_pressure,
parse_static_temperature,
parse_node_status,
)
# Gauss → Tesla conversion
_GA_TO_T = 1e-4
# NavSatStatus mapping from DroneCAN fix type
_FIX_TO_NAV_STATUS = {
0: NavSatStatus.STATUS_NO_FIX, # NO_FIX
1: NavSatStatus.STATUS_NO_FIX, # TIME_ONLY
2: NavSatStatus.STATUS_FIX, # 2D_FIX
3: NavSatStatus.STATUS_FIX, # 3D_FIX (upgraded to SBAS/RTK when applicable)
}
# Here4 supports GPS+GLONASS+BeiDou+Galileo
_SERVICE_FLAGS = (NavSatStatus.SERVICE_GPS | NavSatStatus.SERVICE_GLONASS |
NavSatStatus.SERVICE_GALILEO | NavSatStatus.SERVICE_COMPASS)
# RTCM injection sequence counter (per-session, wraps at 65535)
_rtcm_seq = 0
_rtcm_lock = threading.Lock()
def _bring_up_can(iface: str, bitrate: int, logger) -> bool:
"""Bring up the SocketCAN interface at the requested bitrate."""
try:
subprocess.run(["ip", "link", "set", iface, "down"], check=False)
subprocess.run(["ip", "link", "set", iface, "type", "can",
"bitrate", str(bitrate)], check=True)
subprocess.run(["ip", "link", "set", iface, "up"], check=True)
logger.info(f"SocketCAN {iface} up at {bitrate} bps")
return True
except subprocess.CalledProcessError as exc:
logger.error(f"Failed to bring up {iface}: {exc} — check sudo/udev rules")
return False
class Here4CanNode(Node):
"""DroneCAN bridge publishing Here4 GPS, IMU, mag, and baro to ROS2."""
def __init__(self) -> None:
super().__init__("here4_can_node")
# ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("can_interface", "can0")
self.declare_parameter("local_node_id", 126)
self.declare_parameter("bitrate", 1_000_000)
self.declare_parameter("bring_up_can", False)
self.declare_parameter("node_id_filter", 0) # 0 = auto-detect
self.declare_parameter("fix_frame_id", "gps")
self.declare_parameter("imu_frame_id", "here4_imu")
self._iface = self.get_parameter("can_interface").value
self._local_nid = self.get_parameter("local_node_id").value
self._bitrate = self.get_parameter("bitrate").value
self._bring_up = self.get_parameter("bring_up_can").value
self._nid_filter = self.get_parameter("node_id_filter").value
self._fix_frame = self.get_parameter("fix_frame_id").value
self._imu_frame = self.get_parameter("imu_frame_id").value
# ── State ─────────────────────────────────────────────────────────
self._here4_nid: Optional[int] = self._nid_filter if self._nid_filter else None
self._last_fix: Optional[dict] = None
self._last_temp_k: float = 293.15 # 20 °C default until first reading
self._dc_node = None # dronecan node, set up after CAN is ready
# ── Publishers ────────────────────────────────────────────────────
self._pub_fix = self.create_publisher(NavSatFix,
"/gps/fix", 10)
self._pub_fix_h4 = self.create_publisher(NavSatFix,
"/here4/fix", 10)
self._pub_vel = self.create_publisher(TwistWithCovarianceStamped,
"/gps/velocity", 10)
self._pub_imu = self.create_publisher(Imu,
"/here4/imu", 10)
self._pub_mag = self.create_publisher(MagneticField,
"/here4/mag", 10)
self._pub_baro = self.create_publisher(FluidPressure,
"/here4/baro", 10)
self._pub_status = self.create_publisher(String,
"/here4/status", 10)
self._pub_node_id = self.create_publisher(Int32,
"/here4/node_id", 10)
# ── Subscriptions ─────────────────────────────────────────────────
self.create_subscription(ByteMultiArray, "/rtcm",
self._on_rtcm_bytes, 10)
self.create_subscription(String, "/rtcm_hex",
self._on_rtcm_hex, 10)
# ── Bring up CAN if requested ─────────────────────────────────────
if self._bring_up:
_bring_up_can(self._iface, self._bitrate, self.get_logger())
# ── Initialise DroneCAN node ──────────────────────────────────────
self._init_dronecan()
# ── Timers ────────────────────────────────────────────────────────
# Poll DroneCAN at 200 Hz (5 ms) to drain the RX queue in real-time
self.create_timer(0.005, self._spin_dronecan)
# Status publish at 1 Hz
self.create_timer(1.0, self._publish_status)
# ── DroneCAN initialisation ───────────────────────────────────────────
def _init_dronecan(self) -> None:
try:
import dronecan # type: ignore[import]
except ImportError:
self.get_logger().error(
"dronecan package not found — install with: pip install dronecan\n"
"Here4 node will not receive GPS data until dronecan is installed."
)
return
try:
self._dc_node = dronecan.make_node(
f"socketcan:{self._iface}",
node_id=self._local_nid,
bitrate=self._bitrate,
)
except Exception as exc:
self.get_logger().error(
f"Failed to open DroneCAN on {self._iface}: {exc}\n"
f"Verify: ip link show {self._iface} and try bring_up_can:=true"
)
return
# ── Register message handlers ──────────────────────────────────────
dc = dronecan
self._dc_node.add_handler(dc.uavcan.equipment.gnss.Fix2,
self._on_fix2)
self._dc_node.add_handler(dc.uavcan.equipment.ahrs.RawIMU,
self._on_raw_imu)
self._dc_node.add_handler(dc.uavcan.equipment.ahrs.MagneticFieldStrength2,
self._on_mag)
self._dc_node.add_handler(dc.uavcan.equipment.air_data.StaticPressure,
self._on_pressure)
self._dc_node.add_handler(dc.uavcan.equipment.air_data.StaticTemperature,
self._on_temperature)
self._dc_node.add_handler(dc.uavcan.protocol.NodeStatus,
self._on_node_status)
self.get_logger().info(
f"DroneCAN node ready: {self._iface} @ {self._bitrate} bps "
f"local_id={self._local_nid}"
)
# ── DroneCAN spin (called from 5 ms timer) ────────────────────────────
def _spin_dronecan(self) -> None:
if self._dc_node is None:
return
try:
self._dc_node.spin(timeout=0)
except Exception as exc:
self.get_logger().warning(
f"DroneCAN spin error: {exc}",
throttle_duration_sec=5.0,
)
# ── Source node filter ────────────────────────────────────────────────
def _accept(self, event: Any) -> bool: # type: ignore[valid-type]
"""Return True if this event comes from the tracked Here4 node."""
src = getattr(getattr(event, "transfer", None), "source_node_id", None)
if src is None:
return True # unknown source — accept
if self._here4_nid is None:
return True # auto-detect mode — accept any
return src == self._here4_nid
# ── DroneCAN message handlers ─────────────────────────────────────────
def _on_fix2(self, event) -> None:
src = getattr(getattr(event, "transfer", None), "source_node_id", None)
# Auto-latch: first node that sends Fix2 becomes our Here4
if self._here4_nid is None and src is not None:
self._here4_nid = src
self.get_logger().info(
f"Here4 auto-discovered: DroneCAN node ID {src}"
)
nid_msg = Int32()
nid_msg.data = src
self._pub_node_id.publish(nid_msg)
if not self._accept(event):
return
d = parse_fix2(event.message)
if d is None:
return
self._last_fix = d
stamp = self.get_clock().now().to_msg()
self._publish_navsatfix(d, stamp)
self._publish_velocity(d, stamp)
def _on_raw_imu(self, event) -> None:
if not self._accept(event):
return
d = parse_raw_imu(event.message)
if d is None:
return
stamp = self.get_clock().now().to_msg()
msg = Imu()
msg.header.stamp = stamp
msg.header.frame_id = self._imu_frame
# Orientation: not provided by DroneCAN RawIMU — signal unknown
msg.orientation_covariance[0] = -1.0
gx, gy, gz = d["gyro_xyz"]
msg.angular_velocity.x = gx
msg.angular_velocity.y = gy
msg.angular_velocity.z = gz
_fill_cov9(msg.angular_velocity_covariance, d["gyro_cov"])
ax, ay, az = d["accel_xyz"]
msg.linear_acceleration.x = ax
msg.linear_acceleration.y = ay
msg.linear_acceleration.z = az
_fill_cov9(msg.linear_acceleration_covariance, d["accel_cov"])
self._pub_imu.publish(msg)
def _on_mag(self, event) -> None:
if not self._accept(event):
return
d = parse_mag(event.message)
if d is None:
return
stamp = self.get_clock().now().to_msg()
msg = MagneticField()
msg.header.stamp = stamp
msg.header.frame_id = self._imu_frame
gx, gy, gz = d["field_ga"]
msg.magnetic_field.x = gx * _GA_TO_T
msg.magnetic_field.y = gy * _GA_TO_T
msg.magnetic_field.z = gz * _GA_TO_T
_fill_cov9(msg.magnetic_field_covariance,
[v * _GA_TO_T * _GA_TO_T for v in d["field_cov"]])
self._pub_mag.publish(msg)
def _on_pressure(self, event) -> None:
if not self._accept(event):
return
d = parse_static_pressure(event.message)
if d is None:
return
stamp = self.get_clock().now().to_msg()
msg = FluidPressure()
msg.header.stamp = stamp
msg.header.frame_id = self._imu_frame
msg.fluid_pressure = d["pressure_pa"]
msg.variance = d["pressure_variance"]
self._pub_baro.publish(msg)
def _on_temperature(self, event) -> None:
if not self._accept(event):
return
d = parse_static_temperature(event.message)
if d is None:
return
self._last_temp_k = d["temperature_k"]
def _on_node_status(self, event) -> None:
src = getattr(getattr(event, "transfer", None), "source_node_id", None)
if src is None:
return
d = parse_node_status(event.message, src)
if d is None:
return
# Auto-discovery: latch first node that sends GPS-relevant data
# NodeStatus alone is not enough — we track whichever source_node_id
# first delivered a Fix2 message; if none yet, log all nodes seen.
if self._here4_nid is None:
# Any node could be Here4 — we'll latch on first Fix2 source
self.get_logger().debug(
f"DroneCAN node {src} alive: health={d['health_label']} "
f"mode={d['mode_label']} uptime={d['uptime_sec']}s"
)
elif src == self._here4_nid:
self._last_node_status = d
# ── NavSatFix publishing ─────────────────────────────────────────────
def _publish_navsatfix(self, d: dict, stamp) -> None:
fix_type = d["fix_type"]
status = NavSatStatus()
status.service = _SERVICE_FLAGS
# Upgrade status for RTK: hdop < 0.5 and 3D fix → treat as SBAS/RTK
if fix_type == FIX_3D:
if d["hdop"] < 0.5:
status.status = NavSatStatus.STATUS_GBAS_FIX # RTK fixed
elif d["hdop"] < 1.5:
status.status = NavSatStatus.STATUS_SBAS_FIX # SBAS/DGPS
else:
status.status = NavSatStatus.STATUS_FIX
else:
status.status = _FIX_TO_NAV_STATUS.get(fix_type, NavSatStatus.STATUS_NO_FIX)
msg = NavSatFix()
msg.header.stamp = stamp
msg.header.frame_id = self._fix_frame
msg.status = status
msg.latitude = d["lat_deg"]
msg.longitude = d["lon_deg"]
msg.altitude = d["alt_msl_m"]
# Position covariance (diagonal from DroneCAN; full matrix if available)
pos_cov = d["pos_cov"]
msg.position_covariance_type = (
NavSatFix.COVARIANCE_TYPE_FULL if any(pos_cov) else
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
)
if any(pos_cov):
for i, v in enumerate(pos_cov):
msg.position_covariance[i] = v
else:
# Approximate from HDOP/VDOP: σ_h ≈ hdop × 3m, σ_v ≈ vdop × 5m
sh2 = (d["hdop"] * 3.0) ** 2
sv2 = (d["vdop"] * 5.0) ** 2
msg.position_covariance[0] = sh2 # east
msg.position_covariance[4] = sh2 # north
msg.position_covariance[8] = sv2 # up
self._pub_fix.publish(msg)
self._pub_fix_h4.publish(msg)
def _publish_velocity(self, d: dict, stamp) -> None:
vn, ve, vd = d["ned_vel"]
msg = TwistWithCovarianceStamped()
msg.header.stamp = stamp
msg.header.frame_id = self._fix_frame
# NED → ENU: x=east=ve, y=north=vn, z=up=-vd
msg.twist.twist.linear.x = ve
msg.twist.twist.linear.y = vn
msg.twist.twist.linear.z = -vd
# Velocity covariance: approximate from HDOP
vel_var = (d["hdop"] * 0.2) ** 2
msg.twist.covariance[0] = vel_var # xx (east)
msg.twist.covariance[7] = vel_var # yy (north)
msg.twist.covariance[14] = (d["vdop"] * 0.3) ** 2 # zz (up)
self._pub_vel.publish(msg)
# ── Status publish (1 Hz) ────────────────────────────────────────────
def _publish_status(self) -> None:
fix = self._last_fix
ns = getattr(self, "_last_node_status", None)
payload: dict = {
"here4_node_id": self._here4_nid,
"can_interface": self._iface,
"dc_node_ready": self._dc_node is not None,
"fix_type": fix["fix_type"] if fix else FIX_NO_FIX,
"fix_label": fix["fix_label"] if fix else "NO_FIX",
"sats_used": fix["sats_used"] if fix else 0,
"hdop": round(fix["hdop"], 2) if fix else 99.0,
"lat_deg": round(fix["lat_deg"], 7) if fix else 0.0,
"lon_deg": round(fix["lon_deg"], 7) if fix else 0.0,
"alt_msl_m": round(fix["alt_msl_m"], 2) if fix else 0.0,
"temp_c": round(self._last_temp_k - 273.15, 1),
"node_health": ns["health_label"] if ns else "UNKNOWN",
"node_uptime_s": ns["uptime_sec"] if ns else 0,
}
msg = String()
msg.data = json.dumps(payload)
self._pub_status.publish(msg)
# Also publish discovered node ID
if self._here4_nid is not None:
nid_msg = Int32()
nid_msg.data = self._here4_nid
self._pub_node_id.publish(nid_msg)
# ── RTCM injection ────────────────────────────────────────────────────
def _on_rtcm_bytes(self, msg: ByteMultiArray) -> None:
"""Receive RTCM bytes from /rtcm and inject via DroneCAN RTCMStream."""
raw = bytes(msg.data)
self._inject_rtcm(raw)
def _on_rtcm_hex(self, msg: String) -> None:
"""Receive hex-encoded RTCM from /rtcm_hex (fallback topic)."""
try:
raw = bytes.fromhex(msg.data.strip())
except ValueError as exc:
self.get_logger().error(f"Bad /rtcm_hex: {exc}")
return
self._inject_rtcm(raw)
def _inject_rtcm(self, raw: bytes) -> None:
if self._dc_node is None or not raw:
return
global _rtcm_seq
try:
import dronecan # type: ignore[import]
except ImportError:
return
with _rtcm_lock:
chunks = chunk_rtcm(raw, _rtcm_seq)
_rtcm_seq = (_rtcm_seq + len(chunks)) & 0xFFFF
for chunk in chunks:
try:
rtcm_msg = dronecan.uavcan.equipment.gnss.RTCMStream()
rtcm_msg.sequence_id = chunk["sequence_id"]
rtcm_msg.data = list(chunk["data"])
self._dc_node.broadcast(rtcm_msg)
except Exception as exc:
self.get_logger().warning(
f"RTCM inject error (seq {chunk['sequence_id']}): {exc}",
throttle_duration_sec=2.0,
)
# ── Shutdown ──────────────────────────────────────────────────────────
def destroy_node(self) -> None:
if self._dc_node is not None:
try:
self._dc_node.close()
except Exception:
pass
super().destroy_node()
# ── Covariance helper ─────────────────────────────────────────────────────────
def _fill_cov9(ros_cov, values) -> None:
"""Fill a 9-element ROS2 covariance array from a list."""
for i, v in enumerate(values[:9]):
ros_cov[i] = float(v)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = Here4CanNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -1,5 +0,0 @@
[develop]
script_dir=$base/lib/saltybot_dronecan_gps
[install]
install_scripts=$base/lib/saltybot_dronecan_gps

View File

@ -1,27 +0,0 @@
from setuptools import setup
package_name = "saltybot_dronecan_gps"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/config", ["config/here4_params.yaml"]),
(f"share/{package_name}/launch", ["launch/here4.launch.py"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-perception",
maintainer_email="sl-perception@saltylab.local",
description="DroneCAN/UAVCAN v0 bridge for Here4 GPS via CANable2 (bd-p47c)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"here4_node = saltybot_dronecan_gps.here4_node:main",
],
},
)

View File

@ -1,367 +0,0 @@
#!/usr/bin/env python3
"""Unit tests for saltybot_dronecan_gps.dronecan_parser.
No dronecan library, ROS2, or CAN hardware required.
Uses simple Python objects (namespaces) to simulate dronecan message objects.
Run with: pytest test/ -v
"""
import math
import types
import unittest
from saltybot_dronecan_gps.dronecan_parser import (
DTID_FIX2,
DTID_RAW_IMU,
DTID_MAG_FIELD,
DTID_STATIC_PRESSURE,
DTID_STATIC_TEMPERATURE,
DTID_NODE_STATUS,
DTID_RTCM_STREAM,
FIX_NO_FIX,
FIX_TIME_ONLY,
FIX_2D,
FIX_3D,
RTCM_MAX_CHUNK,
chunk_rtcm,
parse_fix2,
parse_mag,
parse_node_status,
parse_raw_imu,
parse_static_pressure,
parse_static_temperature,
_getf,
_list3,
_list9,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ns(**kwargs) -> types.SimpleNamespace:
"""Build a simple namespace to mimic a dronecan message object."""
return types.SimpleNamespace(**kwargs)
def _ns_vec(*values):
"""Return an object that is indexable like a dronecan vector field."""
return list(values)
# ── DTID constants ────────────────────────────────────────────────────────────
class TestDtidConstants(unittest.TestCase):
def test_known_values(self):
self.assertEqual(DTID_FIX2, 1063)
self.assertEqual(DTID_RAW_IMU, 1003)
self.assertEqual(DTID_MAG_FIELD, 1001)
self.assertEqual(DTID_STATIC_PRESSURE, 1028)
self.assertEqual(DTID_STATIC_TEMPERATURE, 1029)
self.assertEqual(DTID_NODE_STATUS, 341)
self.assertEqual(DTID_RTCM_STREAM, 1062)
def test_rtcm_chunk_size(self):
self.assertEqual(RTCM_MAX_CHUNK, 128)
# ── _getf / _list3 / _list9 ──────────────────────────────────────────────────
class TestHelpers(unittest.TestCase):
def test_getf_simple(self):
obj = _ns(x=42)
self.assertEqual(_getf(obj, "x"), 42)
def test_getf_missing(self):
obj = _ns(x=1)
self.assertIsNone(_getf(obj, "y"))
def test_getf_default(self):
obj = _ns()
self.assertEqual(_getf(obj, "missing", default=99), 99)
def test_list3_ok(self):
self.assertEqual(_list3([1.0, 2.0, 3.0]), [1.0, 2.0, 3.0])
def test_list3_bad(self):
self.assertEqual(_list3(None), [0.0, 0.0, 0.0])
def test_list9_ok(self):
vals = list(range(9))
self.assertEqual(_list9(vals), [float(v) for v in vals])
def test_list9_short(self):
self.assertEqual(_list9(None), [0.0] * 9)
# ── parse_fix2 ────────────────────────────────────────────────────────────────
class TestParseFix2(unittest.TestCase):
def _make_fix(self, lat_1e8=0, lon_1e8=0, alt_msl_mm=0, alt_ell_mm=0,
ned_vel=None, status=FIX_3D, sats=12,
pdop=1.2, hdop=0.9, vdop=1.5,
pos_cov=None, undulation=5.0, gnss_ts=0):
return _ns(
latitude_deg_1e8 = lat_1e8,
longitude_deg_1e8 = lon_1e8,
height_msl_mm = alt_msl_mm,
height_ellipsoid_mm = alt_ell_mm,
ned_velocity = ned_vel if ned_vel is not None else _ns_vec(0.0, 0.0, 0.0),
status = status,
sats_used = sats,
pdop = pdop,
hdop = hdop,
vdop = vdop,
position_covariance = pos_cov,
undulation = undulation,
gnss_timestamp = gnss_ts,
)
def test_basic_3d_fix(self):
msg = self._make_fix(
lat_1e8 = 4_576_543_210, # 45.7654321°
lon_1e8 = -7_345_678_901, # -73.45678901°
alt_msl_mm = 50_000, # 50 m
alt_ell_mm = 52_000, # 52 m
ned_vel = _ns_vec(1.5, -0.3, 0.0),
sats = 14,
hdop = 0.8,
)
d = parse_fix2(msg)
self.assertIsNotNone(d)
self.assertAlmostEqual(d["lat_deg"], 45.7654321, places=5)
self.assertAlmostEqual(d["lon_deg"], -73.45678901, places=5)
self.assertAlmostEqual(d["alt_msl_m"], 50.0, places=3)
self.assertAlmostEqual(d["alt_ellipsoid_m"], 52.0, places=3)
self.assertEqual(d["ned_vel"], [1.5, -0.3, 0.0])
self.assertEqual(d["fix_type"], FIX_3D)
self.assertEqual(d["fix_label"], "3D_FIX")
self.assertEqual(d["sats_used"], 14)
self.assertAlmostEqual(d["hdop"], 0.8, places=5)
def test_no_fix(self):
msg = self._make_fix(status=FIX_NO_FIX, sats=0)
d = parse_fix2(msg)
self.assertIsNotNone(d)
self.assertEqual(d["fix_type"], FIX_NO_FIX)
self.assertEqual(d["fix_label"], "NO_FIX")
self.assertEqual(d["sats_used"], 0)
def test_time_only(self):
msg = self._make_fix(status=FIX_TIME_ONLY)
d = parse_fix2(msg)
self.assertEqual(d["fix_label"], "TIME_ONLY")
def test_2d_fix(self):
msg = self._make_fix(status=FIX_2D)
d = parse_fix2(msg)
self.assertEqual(d["fix_label"], "2D_FIX")
def test_position_covariance(self):
cov = [float(i) for i in range(9)]
msg = self._make_fix(pos_cov=cov)
d = parse_fix2(msg)
self.assertEqual(d["pos_cov"], cov)
def test_no_covariance_returns_zeros(self):
msg = self._make_fix(pos_cov=None)
d = parse_fix2(msg)
self.assertEqual(d["pos_cov"], [0.0] * 9)
def test_zero_lat_lon(self):
msg = self._make_fix(lat_1e8=0, lon_1e8=0)
d = parse_fix2(msg)
self.assertAlmostEqual(d["lat_deg"], 0.0, places=8)
self.assertAlmostEqual(d["lon_deg"], 0.0, places=8)
def test_returns_none_on_bad_input(self):
self.assertIsNone(parse_fix2(None))
def test_gnss_timestamp(self):
msg = self._make_fix(gnss_ts=1_000_000_000)
d = parse_fix2(msg)
self.assertEqual(d["gnss_timestamp_us"], 1_000_000_000)
# ── parse_raw_imu ─────────────────────────────────────────────────────────────
class TestParseRawImu(unittest.TestCase):
def _make_imu(self, gyro=None, accel=None, dt=0.02,
gyro_cov=None, accel_cov=None):
return _ns(
rate_gyro_latest = gyro if gyro else _ns_vec(0.0, 0.0, 0.0),
accelerometer_latest = accel if accel else _ns_vec(0.0, 0.0, 9.81),
integration_interval = dt,
rate_gyro_covariance = gyro_cov if gyro_cov else [0.0] * 9,
accelerometer_covariance = accel_cov if accel_cov else [0.0] * 9,
)
def test_basic(self):
msg = self._make_imu(
gyro = _ns_vec(0.01, -0.02, 0.03),
accel = _ns_vec(0.1, -0.2, 9.8),
dt = 0.02,
)
d = parse_raw_imu(msg)
self.assertIsNotNone(d)
self.assertEqual(d["gyro_xyz"], [0.01, -0.02, 0.03])
self.assertEqual(d["accel_xyz"], [0.1, -0.2, 9.8])
self.assertAlmostEqual(d["dt_s"], 0.02)
def test_covariances_returned(self):
cov = [float(i * 0.001) for i in range(9)]
msg = self._make_imu(gyro_cov=cov, accel_cov=cov)
d = parse_raw_imu(msg)
self.assertEqual(d["gyro_cov"], cov)
self.assertEqual(d["accel_cov"], cov)
def test_none_input(self):
self.assertIsNone(parse_raw_imu(None))
# ── parse_mag ─────────────────────────────────────────────────────────────────
class TestParseMag(unittest.TestCase):
def _make_mag(self, sensor_id=0, field=None, cov=None):
return _ns(
sensor_id = sensor_id,
magnetic_field_ga = field if field else _ns_vec(0.1, 0.2, -0.5),
magnetic_field_covariance = cov if cov else [0.0] * 9,
)
def test_basic(self):
msg = self._make_mag(sensor_id=1, field=_ns_vec(0.1, 0.2, -0.5))
d = parse_mag(msg)
self.assertIsNotNone(d)
self.assertEqual(d["sensor_id"], 1)
self.assertEqual(d["field_ga"], [0.1, 0.2, -0.5])
def test_none_input(self):
self.assertIsNone(parse_mag(None))
# ── parse_static_pressure ────────────────────────────────────────────────────
class TestParseStaticPressure(unittest.TestCase):
def test_basic(self):
msg = _ns(static_pressure=101325.0, static_pressure_variance=100.0)
d = parse_static_pressure(msg)
self.assertIsNotNone(d)
self.assertAlmostEqual(d["pressure_pa"], 101325.0)
self.assertAlmostEqual(d["pressure_variance"], 100.0)
def test_none_input(self):
self.assertIsNone(parse_static_pressure(None))
# ── parse_static_temperature ─────────────────────────────────────────────────
class TestParseStaticTemperature(unittest.TestCase):
def test_celsius_to_kelvin(self):
msg = _ns(static_temperature=293.15) # 20 °C
d = parse_static_temperature(msg)
self.assertIsNotNone(d)
self.assertAlmostEqual(d["temperature_k"], 293.15)
def test_none_input(self):
self.assertIsNone(parse_static_temperature(None))
# ── parse_node_status ────────────────────────────────────────────────────────
class TestParseNodeStatus(unittest.TestCase):
def _make_status(self, health=0, mode=0, uptime=120, vendor_code=0):
return _ns(
health = health,
mode = mode,
uptime_sec = uptime,
vendor_specific_status_code = vendor_code,
)
def test_ok_operational(self):
msg = self._make_status(health=0, mode=0, uptime=300)
d = parse_node_status(msg, source_node_id=10)
self.assertIsNotNone(d)
self.assertEqual(d["node_id"], 10)
self.assertEqual(d["health"], 0)
self.assertEqual(d["health_label"], "OK")
self.assertEqual(d["mode_label"], "OPERATIONAL")
self.assertEqual(d["uptime_sec"], 300)
def test_error_state(self):
msg = self._make_status(health=2, mode=0)
d = parse_node_status(msg, source_node_id=5)
self.assertEqual(d["health_label"], "ERROR")
def test_critical_state(self):
msg = self._make_status(health=3)
d = parse_node_status(msg, 7)
self.assertEqual(d["health_label"], "CRITICAL")
def test_none_input(self):
self.assertIsNone(parse_node_status(None, 1))
# ── chunk_rtcm ────────────────────────────────────────────────────────────────
class TestChunkRtcm(unittest.TestCase):
def test_empty(self):
chunks = chunk_rtcm(b"", 0)
self.assertEqual(chunks, [])
def test_single_chunk(self):
data = bytes(range(64))
chunks = chunk_rtcm(data, 5)
self.assertEqual(len(chunks), 1)
self.assertEqual(chunks[0]["sequence_id"], 5)
self.assertEqual(chunks[0]["data"], data)
def test_exact_chunk_boundary(self):
data = bytes(RTCM_MAX_CHUNK)
chunks = chunk_rtcm(data, 0)
self.assertEqual(len(chunks), 1)
self.assertEqual(len(chunks[0]["data"]), RTCM_MAX_CHUNK)
def test_two_chunks(self):
data = bytes(RTCM_MAX_CHUNK + 1)
chunks = chunk_rtcm(data, 0)
self.assertEqual(len(chunks), 2)
self.assertEqual(len(chunks[0]["data"]), RTCM_MAX_CHUNK)
self.assertEqual(len(chunks[1]["data"]), 1)
def test_sequence_id_increments(self):
data = bytes(RTCM_MAX_CHUNK * 3)
chunks = chunk_rtcm(data, 10)
for i, chunk in enumerate(chunks):
self.assertEqual(chunk["sequence_id"], 10 + i)
def test_sequence_id_wraps_at_16bit(self):
data = bytes(RTCM_MAX_CHUNK * 2)
chunks = chunk_rtcm(data, 0xFFFF)
self.assertEqual(chunks[0]["sequence_id"], 0xFFFF)
self.assertEqual(chunks[1]["sequence_id"], 0x0000) # wraps
def test_data_round_trip(self):
original = bytes(range(200))
chunks = chunk_rtcm(original, 0)
reassembled = b"".join(c["data"] for c in chunks)
self.assertEqual(reassembled, original)
def test_large_message(self):
data = bytes(1024)
chunks = chunk_rtcm(data, 0)
total_bytes = sum(len(c["data"]) for c in chunks)
self.assertEqual(total_bytes, 1024)
for c in chunks:
self.assertLessEqual(len(c["data"]), RTCM_MAX_CHUNK)
if __name__ == "__main__":
unittest.main()

View File

@ -1,20 +0,0 @@
esp32_balance_node:
ros__parameters:
# USB-CDC serial port: CH343 chip, VID:PID 1a86:55d3
# Udev symlink set by jetson/docs/pinout.md udev rules
serial_port: /dev/esp32-balance
baud_rate: 460800
# /cmd_vel linear.x (m/s) → speed units sent to ESP32-S3 BALANCE
speed_scale: 1000.0
# /cmd_vel angular.z (rad/s) → steer units (negative = flip turn convention)
steer_scale: -500.0
# Safety: send CMD_DRIVE(0,0) if /cmd_vel silent for this many seconds
command_timeout_s: 0.5
# CMD_HEARTBEAT TX interval in seconds (ESP32-S3 watchdog fires at 500 ms)
heartbeat_period: 0.2
# Reconnect attempt interval when serial is lost
reconnect_delay: 2.0

View File

@ -1,73 +0,0 @@
"""esp32_balance.launch.py — Launch the Orin↔ESP32-S3 BALANCE USB-serial bridge.
bd-wim1: replaces can_bridge_node (CANable2/python-can) with USB-CDC serial.
The freed CANable2 slot is used by Here4 GPS (bd-p47c).
USB device
----------
CH343 USB-CDC chip on ESP32-S3 BALANCE board
VID:PID 1a86:55d3
Symlink /dev/esp32-balance (via udev rule in jetson/docs/pinout.md)
Baud 460800
Usage
-----
# Default:
ros2 launch saltybot_esp32_serial esp32_balance.launch.py
# Override port (e.g. direct ACM path):
ros2 launch saltybot_esp32_serial esp32_balance.launch.py serial_port:=/dev/ttyACM0
# Custom velocity scales:
ros2 launch saltybot_esp32_serial esp32_balance.launch.py speed_scale:=800.0 steer_scale:=-400.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
pkg_share = get_package_share_directory("saltybot_esp32_serial")
params_file = os.path.join(pkg_share, "config", "esp32_balance_params.yaml")
serial_port_arg = DeclareLaunchArgument(
"serial_port",
default_value="/dev/esp32-balance",
description="USB-CDC serial port for ESP32-S3 BALANCE (CH343, 1a86:55d3)",
)
speed_scale_arg = DeclareLaunchArgument(
"speed_scale",
default_value="1000.0",
description="linear.x (m/s) → motor speed units",
)
steer_scale_arg = DeclareLaunchArgument(
"steer_scale",
default_value="-500.0",
description="angular.z (rad/s) → steer units",
)
node = Node(
package="saltybot_esp32_serial",
executable="esp32_balance_node",
name="esp32_balance_node",
output="screen",
parameters=[
params_file,
{
"serial_port": LaunchConfiguration("serial_port"),
"speed_scale": LaunchConfiguration("speed_scale"),
"steer_scale": LaunchConfiguration("steer_scale"),
},
],
)
return LaunchDescription([
serial_port_arg,
speed_scale_arg,
steer_scale_arg,
node,
])

View File

@ -1,40 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_esp32_serial</name>
<version>0.1.0</version>
<description>
UART/USB-CDC serial bridge for SaltyBot Orin — interfaces with the
ESP32-S3 BALANCE board (CH343, 1a86:55d3) using a binary framing protocol
[0xAA][LEN][TYPE][PAYLOAD][CRC8]. Replaces saltybot_can_bridge (CANable2)
to free the CAN bus for Here4 GPS (bd-p47c).
Publishes the same ROS2 topics as saltybot_can_bridge for backward compat:
/saltybot/attitude, /can/battery, /can/vesc/{left,right}/state,
/can/connection_status
System dependency: pyserial (apt: python3-serial or pip: pyserial)
Bead: bd-wim1 Counterpart firmware: bd-66hx (hal)
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<!-- pyserial: apt install python3-serial or pip install pyserial -->
<exec_depend>python3-serial</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -1 +0,0 @@
# saltybot_esp32_serial — UART/USB-CDC bridge for ESP32-S3 BALANCE (bd-wim1)

View File

@ -1,369 +0,0 @@
"""esp32_balance_node.py — ROS2 bridge: Orin ↔ ESP32-S3 BALANCE via USB-CDC serial.
bd-wim1: replaces saltybot_can_bridge (CANable2/python-can) with USB serial.
The CANable2 is freed for Here4 GPS (bd-p47c).
Physical connection
-------------------
Device : /dev/esp32-balance (CH343 USB-CDC, VID:PID 1a86:55d3)
Baud : 460800, 8N1
Protocol: [0xAA][LEN][TYPE][PAYLOAD][CRC8] (see esp32_balance_protocol.py)
Subscriptions
-------------
/cmd_vel geometry_msgs/Twist CMD_DRIVE (at up to 50 Hz)
/estop std_msgs/Bool CMD_ESTOP
/saltybot/arm std_msgs/Bool CMD_ARM
/saltybot/pid_update std_msgs/String JSON CMD_PID {"kp":,"ki":,"kd":}
Publications
------------
/saltybot/attitude std_msgs/String JSON pitch, motor_cmd, state, etc.
/saltybot/balance_state std_msgs/String same as /saltybot/attitude (alias)
/can/battery sensor_msgs/BatteryState from TELEM_STATUS.vbat_mv
/can/vesc/left/state std_msgs/Float32MultiArray [erpm, duty0, voltage_mv/1000, current_ma/1000]
/can/vesc/right/state std_msgs/Float32MultiArray same layout
/can/connection_status std_msgs/String "connected" | "disconnected"
NOTE: /can/* topic names are preserved for backward compatibility with
nodes that subscribed to saltybot_can_bridge outputs.
Parameters
----------
serial_port str USB-CDC port default: /dev/esp32-balance
baud_rate int Baud rate default: 460800
speed_scale float linear.x (m/s) units default: 1000.0
steer_scale float angular.z (rad/s) units default: -500.0
command_timeout_s float /cmd_vel watchdog default: 0.5
heartbeat_period float CMD_HEARTBEAT interval default: 0.2
reconnect_delay float Retry interval, s default: 2.0
"""
from __future__ import annotations
import json
import threading
import time
from typing import Optional
import rclpy
import serial
from geometry_msgs.msg import Twist
from rclpy.node import Node
from sensor_msgs.msg import BatteryState
from std_msgs.msg import Bool, Float32MultiArray, String
from .esp32_balance_protocol import (
BAUD_RATE,
SERIAL_PORT_DEFAULT,
AckFrame,
BalanceState,
BALANCE_STATE_LABEL,
FrameParser,
NackFrame,
StatusFrame,
VescFrame,
encode_arm,
encode_drive,
encode_estop,
encode_heartbeat,
encode_pid,
)
_WATCHDOG_HZ: float = 20.0 # TX watchdog check rate
class Esp32BalanceNode(Node):
"""Serial bridge replacing CANable2 CAN comms with USB-CDC to ESP32-S3 BALANCE."""
def __init__(self) -> None:
super().__init__("esp32_balance_node")
# ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("serial_port", SERIAL_PORT_DEFAULT)
self.declare_parameter("baud_rate", BAUD_RATE)
self.declare_parameter("speed_scale", 1000.0)
self.declare_parameter("steer_scale", -500.0)
self.declare_parameter("command_timeout_s", 0.5)
self.declare_parameter("heartbeat_period", 0.2)
self.declare_parameter("reconnect_delay", 2.0)
self._port_name = self.get_parameter("serial_port").value
self._baud = self.get_parameter("baud_rate").value
self._speed_scale = self.get_parameter("speed_scale").value
self._steer_scale = self.get_parameter("steer_scale").value
self._cmd_timeout = self.get_parameter("command_timeout_s").value
self._hb_period = self.get_parameter("heartbeat_period").value
self._reconnect_delay = self.get_parameter("reconnect_delay").value
# ── State ─────────────────────────────────────────────────────────
self._ser: Optional[serial.Serial] = None
self._ser_lock = threading.Lock()
self._parser = FrameParser()
self._last_cmd_time = time.monotonic()
self._watchdog_sent = False
# ── Publishers ────────────────────────────────────────────────────
self._pub_attitude = self.create_publisher(String, "/saltybot/attitude", 10)
self._pub_balance = self.create_publisher(String, "/saltybot/balance_state", 10)
self._pub_battery = self.create_publisher(BatteryState, "/can/battery", 10)
self._pub_vesc_left = self.create_publisher(Float32MultiArray, "/can/vesc/left/state", 10)
self._pub_vesc_right = self.create_publisher(Float32MultiArray, "/can/vesc/right/state", 10)
self._pub_status = self.create_publisher(String, "/can/connection_status", 10)
# ── Subscriptions ─────────────────────────────────────────────────
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
self.create_subscription(Bool, "/estop", self._estop_cb, 10)
self.create_subscription(Bool, "/saltybot/arm", self._arm_cb, 10)
self.create_subscription(String, "/saltybot/pid_update", self._pid_cb, 10)
# ── Timers ────────────────────────────────────────────────────────
self.create_timer(self._hb_period, self._heartbeat_cb)
self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb)
self.create_timer(self._reconnect_delay, self._reconnect_cb)
# ── Open port + start reader thread ───────────────────────────────
self._try_connect()
self._reader_thread = threading.Thread(
target=self._reader_loop, daemon=True, name="balance_serial_reader"
)
self._reader_thread.start()
self.get_logger().info(
f"esp32_balance_node — {self._port_name} @ {self._baud} baud | "
f"speed_scale={self._speed_scale} steer_scale={self._steer_scale}"
)
# ── Serial management ─────────────────────────────────────────────────
def _try_connect(self) -> None:
with self._ser_lock:
if self._ser is not None and self._ser.is_open:
return
try:
self._ser = serial.Serial(
port=self._port_name,
baudrate=self._baud,
timeout=0.05,
write_timeout=0.1,
)
self._ser.reset_input_buffer()
self._parser.reset()
self.get_logger().info(f"Serial open: {self._port_name}")
self._publish_status("connected")
except serial.SerialException as exc:
self.get_logger().warning(
f"Cannot open {self._port_name}: {exc} — retry in {self._reconnect_delay:.0f} s",
throttle_duration_sec=self._reconnect_delay,
)
self._ser = None
self._publish_status("disconnected")
def _reconnect_cb(self) -> None:
with self._ser_lock:
open_ok = self._ser is not None and self._ser.is_open
if not open_ok:
self._try_connect()
def _handle_serial_error(self, exc: Exception, context: str) -> None:
self.get_logger().warning(f"Serial error in {context}: {exc}")
with self._ser_lock:
if self._ser is not None:
try:
self._ser.close()
except Exception:
pass
self._ser = None
self._publish_status("disconnected")
# ── Serial write helper ───────────────────────────────────────────────
def _write(self, data: bytes, context: str = "") -> bool:
with self._ser_lock:
if self._ser is None or not self._ser.is_open:
return False
ser = self._ser
try:
ser.write(data)
return True
except serial.SerialException as exc:
self._handle_serial_error(exc, f"write({context})")
return False
# ── ROS subscriptions ─────────────────────────────────────────────────
def _cmd_vel_cb(self, msg: Twist) -> None:
"""Convert /cmd_vel to CMD_DRIVE at up to 50 Hz."""
self._last_cmd_time = time.monotonic()
self._watchdog_sent = False
speed = int(msg.linear.x * self._speed_scale)
steer = int(msg.angular.z * self._steer_scale)
self._write(encode_drive(speed, steer), "cmd_vel")
def _estop_cb(self, msg: Bool) -> None:
frame = encode_estop(msg.data)
self._write(frame, "estop")
if msg.data:
self.get_logger().warning("E-stop asserted → CMD_ESTOP sent to ESP32-S3 BALANCE")
def _arm_cb(self, msg: Bool) -> None:
self._write(encode_arm(msg.data), "arm")
def _pid_cb(self, msg: String) -> None:
"""Parse JSON {"kp":…,"ki":…,"kd":…} and send CMD_PID."""
try:
d = json.loads(msg.data)
kp = float(d["kp"])
ki = float(d["ki"])
kd = float(d["kd"])
except (KeyError, ValueError, json.JSONDecodeError) as exc:
self.get_logger().error(f"Bad /saltybot/pid_update JSON: {exc}")
return
self._write(encode_pid(kp, ki, kd), "pid_update")
# ── Timers ────────────────────────────────────────────────────────────
def _heartbeat_cb(self) -> None:
self._write(encode_heartbeat(), "heartbeat")
def _watchdog_cb(self) -> None:
"""Send CMD_DRIVE(0,0) if /cmd_vel has been silent for command_timeout_s."""
if self._watchdog_sent:
return
if time.monotonic() - self._last_cmd_time > self._cmd_timeout:
self._write(encode_drive(0, 0), "watchdog")
self._watchdog_sent = True
# ── Background reader ─────────────────────────────────────────────────
def _reader_loop(self) -> None:
while rclpy.ok():
with self._ser_lock:
ser = self._ser if (self._ser and self._ser.is_open) else None
if ser is None:
time.sleep(0.05)
continue
try:
n = ser.in_waiting
if n:
raw = ser.read(n)
else:
time.sleep(0.002)
continue
except serial.SerialException as exc:
self._handle_serial_error(exc, "reader_loop")
continue
for byte in raw:
frame = self._parser.feed(byte)
if frame is not None:
self._dispatch(frame)
def _dispatch(self, frame) -> None:
try:
if isinstance(frame, StatusFrame):
self._handle_status(frame)
elif isinstance(frame, VescFrame):
self._handle_vesc(frame)
elif isinstance(frame, AckFrame):
self.get_logger().debug(
f"ACK for cmd type 0x{frame.echoed_type:02X}"
)
elif isinstance(frame, NackFrame):
self.get_logger().warning(
f"NACK cmd=0x{frame.cmd_type:02X} err=0x{frame.error_code:02X}"
)
elif isinstance(frame, tuple):
type_code, _ = frame
self.get_logger().debug(f"Unknown telemetry type 0x{type_code:02X}")
except Exception as exc:
self.get_logger().warning(f"Error dispatching frame {type(frame).__name__}: {exc}")
# ── Frame handlers ────────────────────────────────────────────────────
def _handle_status(self, f: StatusFrame) -> None:
"""Publish balance controller status — replaces CAN 0x400 handler."""
now = self.get_clock().now().to_msg()
payload = {
"pitch_deg": round(f.pitch_deg, 2),
"motor_cmd": f.motor_cmd,
"vbat_mv": f.vbat_mv,
"balance_state": f.balance_state,
"state_label": BALANCE_STATE_LABEL.get(f.balance_state,
f"UNKNOWN({f.balance_state})"),
"flags": f.flags,
"estop_active": bool(f.flags & 0x01),
"hb_timeout": bool(f.flags & 0x02),
"ts": f"{now.sec}.{now.nanosec:09d}",
}
msg = String()
msg.data = json.dumps(payload)
self._pub_attitude.publish(msg)
self._pub_balance.publish(msg)
# Battery piggyback on STATUS frame for nodes that use /can/battery
bat = BatteryState()
bat.header.stamp = now
bat.voltage = f.vbat_mv / 1000.0
bat.present = True
bat.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
self._pub_battery.publish(bat)
def _handle_vesc(self, f: VescFrame) -> None:
"""Publish proxied VESC telemetry — replaces CAN VESC_STATUS_1 handler.
Float32MultiArray layout: [erpm, voltage_v, current_a, temp_c]
(duty not available via serial proxy; 0.0 placeholder)
"""
arr = Float32MultiArray()
arr.data = [
float(f.erpm),
float(f.voltage_mv) / 1000.0,
float(f.current_ma) / 1000.0,
f.temp_c,
]
if f.vesc_id == 56:
self._pub_vesc_left.publish(arr)
elif f.vesc_id == 68:
self._pub_vesc_right.publish(arr)
else:
self.get_logger().warning(f"Unknown VESC ID {f.vesc_id} in telemetry frame")
# ── Status helper ─────────────────────────────────────────────────────
def _publish_status(self, status: str) -> None:
msg = String()
msg.data = status
self._pub_status.publish(msg)
# ── Shutdown ──────────────────────────────────────────────────────────
def destroy_node(self) -> None:
# Send zero drive + estop before closing
self._write(encode_drive(0, 0), "shutdown")
self._write(encode_estop(True), "shutdown")
with self._ser_lock:
if self._ser is not None:
try:
self._ser.close()
except Exception:
pass
self._ser = None
super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = Esp32BalanceNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -1,355 +0,0 @@
"""esp32_balance_protocol.py — Binary frame codec for Orin↔ESP32-S3 BALANCE UART/USB comms.
bd-wim1: replaces CANable2/python-can ESP32 interface with USB-CDC serial.
Counterpart firmware bead: bd-66hx (hal's ESP32-S3 BALANCE side).
Frame layout:
SYNC LEN TYPE PAYLOAD CRC8
0xAA 1B 1B LEN bytes 1B
LEN = number of payload bytes (062)
CRC8 = CRC8-SMBUS (poly=0x07, init=0x00), computed over LEN+TYPE+PAYLOAD
Physical layer
Device : /dev/esp32-balance (CH343 USB-CDC, VID:PID 1a86:55d3)
Baud : 460800, 8N1
Command types (Orin ESP32-S3 BALANCE):
0x01 CMD_HEARTBEAT no payload len=0
0x02 CMD_DRIVE int16 speed_units, int16 steer_units len=4 range 1000..+1000
0x03 CMD_ESTOP uint8 (1=assert, 0=clear) len=1
0x04 CMD_ARM uint8 (1=arm, 0=disarm) len=1
0x05 CMD_PID float32 kp, float32 ki, float32 kd len=12
Telemetry types (ESP32-S3 BALANCE Orin):
0x80 TELEM_STATUS int16 pitch_x10, int16 motor_cmd,
uint16 vbat_mv, uint8 balance_state, uint8 flags len=8
0x81 TELEM_VESC_LEFT int32 erpm, uint16 voltage_mv,
int16 current_ma, uint16 temp_c_x10 len=10
0x82 TELEM_VESC_RIGHT same layout as TELEM_VESC_LEFT len=10
0xA0 RESP_ACK uint8 echoed cmd type len=1
0xA1 RESP_NACK uint8 cmd type, uint8 error_code len=2
Balance state values (TELEM_STATUS.balance_state):
0 DISARMED
1 ARMED
2 TILT_FAULT
3 ESTOP
NACK error codes:
0x01 ERR_BAD_CRC
0x02 ERR_BAD_LEN
0x03 ERR_ESTOP_ACTIVE
0x04 ERR_DISARMED
NOTE: Message type IDs coordinated with hal (bd-66hx) on 2026-04-17.
Confirm against bd-66hx PR before deploying on hardware.
"""
from __future__ import annotations
import struct
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
# ── Physical layer constants ──────────────────────────────────────────────────
SERIAL_PORT_DEFAULT = "/dev/esp32-balance"
BAUD_RATE = 460800
MAX_PAYLOAD_LEN = 62 # hard cap; larger frames are considered corrupt
# ── Framing ───────────────────────────────────────────────────────────────────
SYNC_BYTE = 0xAA
# ── Command type codes (Orin → ESP32-S3 BALANCE) ─────────────────────────────
class CmdType(IntEnum):
HEARTBEAT = 0x01
DRIVE = 0x02
ESTOP = 0x03
ARM = 0x04
PID = 0x05
# ── Telemetry type codes (ESP32-S3 BALANCE → Orin) ───────────────────────────
class TelType(IntEnum):
STATUS = 0x80
VESC_LEFT = 0x81
VESC_RIGHT = 0x82
ACK = 0xA0
NACK = 0xA1
# ── Balance state enum ────────────────────────────────────────────────────────
class BalanceState(IntEnum):
DISARMED = 0
ARMED = 1
TILT_FAULT = 2
ESTOP = 3
BALANCE_STATE_LABEL = {
BalanceState.DISARMED: "DISARMED",
BalanceState.ARMED: "ARMED",
BalanceState.TILT_FAULT: "TILT_FAULT",
BalanceState.ESTOP: "ESTOP",
}
# ── Parsed telemetry dataclasses ─────────────────────────────────────────────
@dataclass
class StatusFrame:
"""TELEM_STATUS (0x80): balance controller status at ~10 Hz."""
pitch_deg: float # degrees (positive = forward tilt); raw pitch_x10 / 10.0
motor_cmd: int # ESC command, 1000..+1000
vbat_mv: int # battery voltage, millivolts
balance_state: int # BalanceState enum
flags: int # bitmask — bit 0: estop_active, bit 1: heartbeat_timeout
@dataclass
class VescFrame:
"""TELEM_VESC_LEFT (0x81) or TELEM_VESC_RIGHT (0x82): VESC proxied telemetry at ~10 Hz."""
vesc_id: int # 56 = left, 68 = right (maps to CAN IDs from hardware spec)
erpm: int # electrical RPM (signed)
voltage_mv: int # bus voltage, millivolts
current_ma: int # phase current, milliamps (signed)
temp_c: float # motor temperature, °C; raw temp_c_x10 / 10.0
@dataclass
class AckFrame:
"""RESP_ACK (0xA0): acknowledgement for a successfully parsed command."""
echoed_type: int
@dataclass
class NackFrame:
"""RESP_NACK (0xA1): rejection; contains original cmd type and error code."""
cmd_type: int
error_code: int
TelemetryFrame = StatusFrame | VescFrame | AckFrame | NackFrame
# ── CRC8-SMBUS ────────────────────────────────────────────────────────────────
def _crc8(data: bytes) -> int:
"""CRC8-SMBUS: polynomial 0x07, init 0x00, no final XOR.
Coverage: LEN + TYPE + PAYLOAD bytes.
"""
crc = 0
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x07
else:
crc <<= 1
crc &= 0xFF
return crc
# ── Frame encoder ─────────────────────────────────────────────────────────────
def _build_frame(cmd_type: int, payload: bytes) -> bytes:
"""Build a complete binary frame: [0xAA][LEN][TYPE][PAYLOAD][CRC8]."""
if len(payload) > MAX_PAYLOAD_LEN:
raise ValueError(f"Payload length {len(payload)} exceeds max {MAX_PAYLOAD_LEN}")
length = len(payload)
crc_data = bytes([length, cmd_type]) + payload
crc = _crc8(crc_data)
return bytes([SYNC_BYTE, length, cmd_type]) + payload + bytes([crc])
def encode_heartbeat() -> bytes:
"""CMD_HEARTBEAT — no payload. Send every ≤500 ms to keep watchdog alive."""
return _build_frame(CmdType.HEARTBEAT, b"")
def encode_drive(speed: int, steer: int) -> bytes:
"""CMD_DRIVE — int16 speed_units + int16 steer_units (big-endian, 1000..+1000).
Parameters
----------
speed : target longitudinal speed in motor units (positive = forward)
steer : steering demand in motor units (positive = right turn)
"""
speed = max(-1000, min(1000, int(speed)))
steer = max(-1000, min(1000, int(steer)))
return _build_frame(CmdType.DRIVE, struct.pack(">hh", speed, steer))
def encode_estop(assert_stop: bool) -> bytes:
"""CMD_ESTOP — uint8, 1=assert, 0=clear."""
return _build_frame(CmdType.ESTOP, struct.pack("B", 1 if assert_stop else 0))
def encode_arm(arm: bool) -> bytes:
"""CMD_ARM — uint8, 1=arm, 0=disarm."""
return _build_frame(CmdType.ARM, struct.pack("B", 1 if arm else 0))
def encode_pid(kp: float, ki: float, kd: float) -> bytes:
"""CMD_PID — three float32 values, big-endian."""
if kp < 0.0 or ki < 0.0 or kd < 0.0:
raise ValueError("PID gains must be non-negative")
return _build_frame(CmdType.PID, struct.pack(">fff", kp, ki, kd))
# ── Frame decoder (streaming state-machine) ───────────────────────────────────
class _ParseState(IntEnum):
WAIT_SYNC = 0
WAIT_LEN = 1
WAIT_TYPE = 2
PAYLOAD = 3
WAIT_CRC = 4
class FrameParser:
"""Byte-by-byte streaming parser for ESP32-S3 BALANCE telemetry frames.
Feed individual bytes via ``feed()``; returns a decoded dataclass when a
complete, CRC-valid frame arrives, otherwise returns ``None``.
Not thread-safe guard with a lock if accessed from multiple threads.
Counters
--------
frames_ok : successfully decoded frames
frames_error : CRC failures or oversized payloads
"""
def __init__(self) -> None:
self.frames_ok = 0
self.frames_error = 0
self._reset()
def reset(self) -> None:
"""Reset to initial state — call after serial reconnect."""
self._reset()
def _reset(self) -> None:
self._state = _ParseState.WAIT_SYNC
self._length = 0
self._type = 0
self._payload = bytearray()
def feed(self, byte: int) -> Optional[TelemetryFrame | tuple]:
"""Process one byte.
Returns a decoded dataclass on success, ``None`` while waiting for more
bytes, or a ``(type_code, raw_payload)`` tuple for unrecognised types.
"""
s = self._state
if s == _ParseState.WAIT_SYNC:
if byte == SYNC_BYTE:
self._state = _ParseState.WAIT_LEN
return None
if s == _ParseState.WAIT_LEN:
if byte > MAX_PAYLOAD_LEN:
self.frames_error += 1
self._reset()
return None
self._length = byte
self._state = _ParseState.WAIT_TYPE
return None
if s == _ParseState.WAIT_TYPE:
self._type = byte
self._payload = bytearray()
if self._length == 0:
self._state = _ParseState.WAIT_CRC
else:
self._state = _ParseState.PAYLOAD
return None
if s == _ParseState.PAYLOAD:
self._payload.append(byte)
if len(self._payload) == self._length:
self._state = _ParseState.WAIT_CRC
return None
if s == _ParseState.WAIT_CRC:
# Verify CRC before resetting parser state
crc_data = bytes([self._length, self._type]) + self._payload
expected = _crc8(crc_data)
if byte != expected:
self.frames_error += 1
self._reset()
return None
self.frames_ok += 1
result = _decode(self._type, bytes(self._payload))
self._reset()
return result
self._reset()
return None
# ── Telemetry decoder ─────────────────────────────────────────────────────────
# Maps VESC_LEFT/RIGHT type codes to hardware CAN IDs (from memory)
_VESC_ID_MAP = {
TelType.VESC_LEFT: 56, # left VESC CAN ID
TelType.VESC_RIGHT: 68, # right VESC CAN ID
}
def _decode(type_code: int, payload: bytes) -> Optional[TelemetryFrame | tuple]:
"""Decode a validated telemetry payload into a typed dataclass."""
try:
if type_code == TelType.STATUS:
# int16 pitch_x10, int16 motor_cmd, uint16 vbat_mv, uint8 state, uint8 flags
if len(payload) < 8:
return None
pitch_x10, motor_cmd, vbat_mv, state, flags = struct.unpack_from(">hhHBB", payload)
return StatusFrame(
pitch_deg=pitch_x10 / 10.0,
motor_cmd=motor_cmd,
vbat_mv=vbat_mv,
balance_state=state,
flags=flags,
)
if type_code in (TelType.VESC_LEFT, TelType.VESC_RIGHT):
# int32 erpm, uint16 voltage_mv, int16 current_ma, uint16 temp_c_x10
if len(payload) < 10:
return None
erpm, voltage_mv, current_ma, temp_x10 = struct.unpack_from(">iHhH", payload)
return VescFrame(
vesc_id=_VESC_ID_MAP[type_code],
erpm=erpm,
voltage_mv=voltage_mv,
current_ma=current_ma,
temp_c=temp_x10 / 10.0,
)
if type_code == TelType.ACK:
if len(payload) < 1:
return None
return AckFrame(echoed_type=payload[0])
if type_code == TelType.NACK:
if len(payload) < 2:
return None
return NackFrame(cmd_type=payload[0], error_code=payload[1])
except struct.error:
return None
# Unknown type — return raw for forward-compatibility
return (type_code, payload)

View File

@ -1,5 +0,0 @@
[develop]
script_dir=$base/lib/saltybot_esp32_serial
[install]
install_scripts=$base/lib/saltybot_esp32_serial

View File

@ -1,27 +0,0 @@
from setuptools import setup
package_name = "saltybot_esp32_serial"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/config", ["config/esp32_balance_params.yaml"]),
(f"share/{package_name}/launch", ["launch/esp32_balance.launch.py"]),
],
install_requires=["setuptools", "pyserial"],
zip_safe=True,
maintainer="sl-perception",
maintainer_email="sl-perception@saltylab.local",
description="UART/USB-CDC serial bridge for ESP32-S3 BALANCE (bd-wim1)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"esp32_balance_node = saltybot_esp32_serial.esp32_balance_node:main",
],
},
)

View File

@ -1,311 +0,0 @@
#!/usr/bin/env python3
"""Unit tests for saltybot_esp32_serial.esp32_balance_protocol.
No ROS2 or hardware required exercises encode/decode round-trips,
framing, CRC8, and parser state-machine in pure Python.
Run with: pytest test/ -v
"""
import struct
import unittest
from saltybot_esp32_serial.esp32_balance_protocol import (
SYNC_BYTE,
CmdType,
TelType,
BalanceState,
StatusFrame,
VescFrame,
AckFrame,
NackFrame,
FrameParser,
_crc8,
encode_heartbeat,
encode_drive,
encode_estop,
encode_arm,
encode_pid,
)
# ── CRC8 tests ────────────────────────────────────────────────────────────────
class TestCrc8(unittest.TestCase):
"""Verify CRC8-SMBUS implementation."""
def test_empty(self):
self.assertEqual(_crc8(b""), 0x00)
def test_single_byte_zero(self):
# CRC of 0x00: no XOR, no poly — stays 0
self.assertEqual(_crc8(b"\x00"), 0x00)
def test_single_byte_nonzero(self):
# Pre-computed expected value for 0xFF
result = _crc8(b"\xFF")
self.assertIsInstance(result, int)
self.assertGreaterEqual(result, 0)
self.assertLessEqual(result, 0xFF)
def test_deterministic(self):
data = b"\x01\x02\x04\x08"
self.assertEqual(_crc8(data), _crc8(data))
def test_different_data_different_crc(self):
self.assertNotEqual(_crc8(b"\x01"), _crc8(b"\x02"))
def test_byte_range(self):
for b in range(256):
r = _crc8(bytes([b]))
self.assertGreaterEqual(r, 0)
self.assertLessEqual(r, 255)
# ── Frame structure tests ─────────────────────────────────────────────────────
class TestFrameStructure(unittest.TestCase):
"""Verify encode functions produce correctly structured frames."""
def _check_frame(self, data: bytes, expected_type: int, expected_payload: bytes):
"""Assert frame matches [SYNC][LEN][TYPE][PAYLOAD][CRC8]."""
self.assertGreaterEqual(len(data), 4)
self.assertEqual(data[0], SYNC_BYTE, "SYNC byte mismatch")
length = data[1]
self.assertEqual(data[2], expected_type, "TYPE byte mismatch")
self.assertEqual(length, len(expected_payload), "LEN byte mismatch")
self.assertEqual(data[3:3 + length], expected_payload, "PAYLOAD mismatch")
# Verify CRC over [LEN][TYPE][PAYLOAD]
crc_data = bytes([length, expected_type]) + expected_payload
expected_crc = _crc8(crc_data)
self.assertEqual(data[-1], expected_crc, "CRC8 mismatch")
def test_heartbeat_frame(self):
frame = encode_heartbeat()
self._check_frame(frame, CmdType.HEARTBEAT, b"")
def test_drive_frame_zeros(self):
frame = encode_drive(0, 0)
payload = struct.pack(">hh", 0, 0)
self._check_frame(frame, CmdType.DRIVE, payload)
def test_drive_frame_values(self):
frame = encode_drive(500, -300)
payload = struct.pack(">hh", 500, -300)
self._check_frame(frame, CmdType.DRIVE, payload)
def test_drive_clamping(self):
# Values outside ±1000 must be clamped
frame = encode_drive(5000, -9999)
_, _, _, p0, p1, p2, p3, _ = frame
speed, steer = struct.unpack(">hh", bytes([p0, p1, p2, p3]))
self.assertEqual(speed, 1000)
self.assertEqual(steer, -1000)
def test_estop_assert(self):
frame = encode_estop(True)
self._check_frame(frame, CmdType.ESTOP, b"\x01")
def test_estop_clear(self):
frame = encode_estop(False)
self._check_frame(frame, CmdType.ESTOP, b"\x00")
def test_arm_frame(self):
for arm_val in (True, False):
frame = encode_arm(arm_val)
self._check_frame(frame, CmdType.ARM, bytes([1 if arm_val else 0]))
def test_pid_frame(self):
frame = encode_pid(1.0, 0.1, 0.01)
payload = struct.pack(">fff", 1.0, 0.1, 0.01)
self._check_frame(frame, CmdType.PID, payload)
def test_pid_negative_raises(self):
with self.assertRaises(ValueError):
encode_pid(-1.0, 0.1, 0.01)
# ── Parser round-trip tests ───────────────────────────────────────────────────
class TestFrameParser(unittest.TestCase):
"""Feed encoded frames back through the parser and verify decode."""
def _parse_all(self, data: bytes):
"""Feed all bytes; return list of decoded frames."""
parser = FrameParser()
results = []
for byte in data:
r = parser.feed(byte)
if r is not None:
results.append(r)
return parser, results
# ── Command echo round-trips (parser recognises unknown cmd types as raw tuples) ──
def test_heartbeat_roundtrip(self):
frame = encode_heartbeat()
parser, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
# HEARTBEAT (0x01) is a command type — no dataclass decoder, returns raw tuple
self.assertIsInstance(results[0], tuple)
type_code, payload = results[0]
self.assertEqual(type_code, CmdType.HEARTBEAT)
self.assertEqual(payload, b"")
def test_drive_roundtrip(self):
frame = encode_drive(250, -100)
parser, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
type_code, payload = results[0]
self.assertEqual(type_code, CmdType.DRIVE)
speed, steer = struct.unpack(">hh", payload)
self.assertEqual(speed, 250)
self.assertEqual(steer, -100)
# ── Telemetry decode round-trips ────────────────────────────────────────
def _build_telem_frame(self, type_code: int, payload: bytes) -> bytes:
length = len(payload)
crc = _crc8(bytes([length, type_code]) + payload)
return bytes([SYNC_BYTE, length, type_code]) + payload + bytes([crc])
def test_status_frame_decode(self):
raw = struct.pack(">hhHBB", 152, 300, 11400, BalanceState.ARMED, 0x00)
frame = self._build_telem_frame(TelType.STATUS, raw)
_, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
f = results[0]
self.assertIsInstance(f, StatusFrame)
self.assertAlmostEqual(f.pitch_deg, 15.2, places=5)
self.assertEqual(f.motor_cmd, 300)
self.assertEqual(f.vbat_mv, 11400)
self.assertEqual(f.balance_state, BalanceState.ARMED)
self.assertEqual(f.flags, 0x00)
def test_status_estop_flag(self):
raw = struct.pack(">hhHBB", 0, 0, 10000, BalanceState.ESTOP, 0x01)
frame = self._build_telem_frame(TelType.STATUS, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, StatusFrame)
self.assertTrue(bool(f.flags & 0x01)) # estop_active flag set
def test_vesc_left_frame_decode(self):
# erpm=12000, voltage_mv=22400, current_ma=4500, temp_x10=352
raw = struct.pack(">iHhH", 12000, 22400, 4500, 352)
frame = self._build_telem_frame(TelType.VESC_LEFT, raw)
_, results = self._parse_all(frame)
self.assertEqual(len(results), 1)
f = results[0]
self.assertIsInstance(f, VescFrame)
self.assertEqual(f.vesc_id, 56)
self.assertEqual(f.erpm, 12000)
self.assertEqual(f.voltage_mv, 22400)
self.assertEqual(f.current_ma, 4500)
self.assertAlmostEqual(f.temp_c, 35.2, places=5)
def test_vesc_right_frame_decode(self):
raw = struct.pack(">iHhH", -6000, 22000, -200, 280)
frame = self._build_telem_frame(TelType.VESC_RIGHT, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, VescFrame)
self.assertEqual(f.vesc_id, 68)
self.assertEqual(f.erpm, -6000)
self.assertAlmostEqual(f.temp_c, 28.0, places=5)
def test_ack_frame_decode(self):
raw = bytes([CmdType.DRIVE])
frame = self._build_telem_frame(TelType.ACK, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, AckFrame)
self.assertEqual(f.echoed_type, CmdType.DRIVE)
def test_nack_frame_decode(self):
raw = bytes([CmdType.DRIVE, 0x03]) # ERR_ESTOP_ACTIVE
frame = self._build_telem_frame(TelType.NACK, raw)
_, results = self._parse_all(frame)
f = results[0]
self.assertIsInstance(f, NackFrame)
self.assertEqual(f.cmd_type, CmdType.DRIVE)
self.assertEqual(f.error_code, 0x03)
# ── Parser robustness ───────────────────────────────────────────────────
def test_bad_crc_discarded(self):
frame = bytearray(encode_heartbeat())
frame[-1] ^= 0xFF # corrupt CRC
parser, results = self._parse_all(bytes(frame))
self.assertEqual(results, [])
self.assertEqual(parser.frames_error, 1)
self.assertEqual(parser.frames_ok, 0)
def test_garbage_before_sync(self):
garbage = bytes([0x00, 0xFF, 0x01, 0x22])
frame = encode_heartbeat()
parser, results = self._parse_all(garbage + frame)
self.assertEqual(len(results), 1)
def test_two_frames_sequential(self):
frames = encode_heartbeat() + encode_drive(100, 50)
parser, results = self._parse_all(frames)
self.assertEqual(len(results), 2)
self.assertEqual(parser.frames_ok, 2)
def test_oversized_payload_rejected(self):
# Craft a frame claiming LEN=63 (> MAX_PAYLOAD_LEN=62)
bad = bytes([SYNC_BYTE, 63, CmdType.DRIVE])
parser, results = self._parse_all(bad)
self.assertEqual(results, [])
self.assertEqual(parser.frames_error, 1)
def test_parser_counter_increments(self):
parser = FrameParser()
frame = encode_drive(10, 10)
for b in frame:
parser.feed(b)
self.assertEqual(parser.frames_ok, 1)
self.assertEqual(parser.frames_error, 0)
def test_reset_clears_state(self):
parser = FrameParser()
# Feed partial frame then reset
partial = encode_drive(0, 0)[:-2]
for b in partial:
parser.feed(b)
parser.reset()
# Now a clean frame should decode correctly
for b in encode_heartbeat():
r = parser.feed(b)
self.assertEqual(parser.frames_ok, 1)
# ── Encode parameter edge cases ───────────────────────────────────────────────
class TestEncodeEdgeCases(unittest.TestCase):
def test_drive_clamp_positive(self):
frame = encode_drive(9999, 9999)
payload = frame[3:7]
speed, steer = struct.unpack(">hh", payload)
self.assertEqual(speed, 1000)
self.assertEqual(steer, 1000)
def test_drive_clamp_negative(self):
frame = encode_drive(-9999, -9999)
payload = frame[3:7]
speed, steer = struct.unpack(">hh", payload)
self.assertEqual(speed, -1000)
self.assertEqual(steer, -1000)
def test_pid_zero_gains_valid(self):
frame = encode_pid(0.0, 0.0, 0.0)
self.assertIsNotNone(frame)
def test_pid_large_gains_valid(self):
frame = encode_pid(100.0, 50.0, 25.0)
self.assertIsNotNone(frame)
if __name__ == "__main__":
unittest.main()