feat: Orin CANable 2.0 bridge for Mamba and VESC CAN bus (Issue #674)

Adds slcan setup script and saltybot_can_bridge ROS2 package implementing
full CAN bus integration between the Orin and the Mamba motor controller /
VESC motor controllers via a CANable 2.0 USB dongle (slcan interface).

- jetson/scripts/setup_can.sh: slcand-based bring-up/tear-down for slcan0
  at 500 kbps with error handling (already up, device missing, retry)
- saltybot_can_bridge/mamba_protocol.py: CAN message ID constants and
  encode/decode helpers for velocity, mode, e-stop, IMU, battery, VESC state
- saltybot_can_bridge/can_bridge_node.py: ROS2 node subscribing to /cmd_vel
  and /estop, publishing /can/imu, /can/battery, /can/vesc/{left,right}/state
  and /can/connection_status; background reader thread, watchdog zero-vel,
  auto-reconnect every 5 s on CAN error
- config/can_bridge_params.yaml: default params (slcan0, VESC IDs 56/68,
  Mamba ID 1, 0.5 s command timeout)
- test/test_can_bridge.py: 30 unit tests covering encode/decode round-trips
  and edge cases — all pass without ROS2 or CAN hardware

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-jetson 2026-03-17 18:54:20 -04:00
parent 7f67fc6abe
commit 92c0628c62
11 changed files with 1038 additions and 0 deletions

View File

@ -0,0 +1,7 @@
can_bridge_node:
ros__parameters:
can_interface: slcan0
left_vesc_can_id: 56
right_vesc_can_id: 68
mamba_can_id: 1
command_timeout_s: 0.5

View File

@ -0,0 +1,35 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_can_bridge</name>
<version>0.1.0</version>
<description>
CAN bus bridge for SaltyBot Orin — interfaces with the Mamba motor
controller and VESC motor controllers via CANable 2.0 (slcan interface).
Publishes IMU, battery, and VESC telemetry to ROS2 topics and forwards
cmd_vel / estop commands to the CAN bus.
System dependency: python3-can (apt: python3-can or pip: python-can)
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<!-- python-can: install via apt install python3-can or pip install python-can -->
<exec_depend>python3-can</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
"""SaltyBot CAN bridge package — Mamba controller and VESC telemetry via python-can."""

View File

@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
can_bridge_node.py ROS2 node bridging the SaltyBot Orin to the Mamba motor
controller and VESC motor controllers over CAN bus.
The node opens the SocketCAN interface (slcan0 by default), spawns a background
reader thread to process incoming telemetry, and exposes the following interface:
Subscriptions
-------------
/cmd_vel geometry_msgs/Twist VESC speed commands (CAN)
/estop std_msgs/Bool Mamba e-stop (CAN)
Publications
------------
/can/imu sensor_msgs/Imu Mamba IMU telemetry
/can/battery sensor_msgs/BatteryState Mamba battery telemetry
/can/vesc/left/state std_msgs/Float32MultiArray Left VESC state
/can/vesc/right/state std_msgs/Float32MultiArray Right VESC state
/can/connection_status std_msgs/String "connected" | "disconnected"
Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
"""
import threading
import time
from typing import Optional
import can
import rclpy
from geometry_msgs.msg import Twist
from rclpy.node import Node
from sensor_msgs.msg import BatteryState, Imu
from std_msgs.msg import Bool, Float32MultiArray, String
from saltybot_can_bridge.mamba_protocol import (
MAMBA_CMD_ESTOP,
MAMBA_CMD_MODE,
MAMBA_CMD_VELOCITY,
MAMBA_TELEM_BATTERY,
MAMBA_TELEM_IMU,
VESC_TELEM_STATE,
MODE_DRIVE,
MODE_ESTOP,
MODE_IDLE,
encode_estop_cmd,
encode_mode_cmd,
encode_velocity_cmd,
decode_battery_telem,
decode_imu_telem,
decode_vesc_state,
)
# Reconnect attempt interval when CAN bus is lost
_RECONNECT_INTERVAL_S: float = 5.0
# Watchdog timer tick rate (Hz)
_WATCHDOG_HZ: float = 10.0
class CanBridgeNode(Node):
"""CAN bus bridge between Orin ROS2 and Mamba / VESC controllers."""
def __init__(self) -> None:
super().__init__("can_bridge_node")
# ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("can_interface", "slcan0")
self.declare_parameter("left_vesc_can_id", 56)
self.declare_parameter("right_vesc_can_id", 68)
self.declare_parameter("mamba_can_id", 1)
self.declare_parameter("command_timeout_s", 0.5)
self._iface: str = self.get_parameter("can_interface").value
self._left_vesc_id: int = self.get_parameter("left_vesc_can_id").value
self._right_vesc_id: int = self.get_parameter("right_vesc_can_id").value
self._mamba_id: int = self.get_parameter("mamba_can_id").value
self._cmd_timeout: float = self.get_parameter("command_timeout_s").value
# ── State ─────────────────────────────────────────────────────────
self._bus: Optional[can.BusABC] = None
self._connected: bool = False
self._last_cmd_time: float = time.monotonic()
self._lock = threading.Lock() # protects _bus / _connected
# ── Publishers ────────────────────────────────────────────────────
self._pub_imu = self.create_publisher(Imu, "/can/imu", 10)
self._pub_battery = self.create_publisher(BatteryState, "/can/battery", 10)
self._pub_vesc_left = self.create_publisher(
Float32MultiArray, "/can/vesc/left/state", 10
)
self._pub_vesc_right = self.create_publisher(
Float32MultiArray, "/can/vesc/right/state", 10
)
self._pub_status = self.create_publisher(
String, "/can/connection_status", 10
)
# ── Subscriptions ─────────────────────────────────────────────────
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
self.create_subscription(Bool, "/estop", self._estop_cb, 10)
# ── Timers ────────────────────────────────────────────────────────
self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb)
self.create_timer(_RECONNECT_INTERVAL_S, self._reconnect_cb)
# ── Open CAN ──────────────────────────────────────────────────────
self._try_connect()
# ── Background reader thread ──────────────────────────────────────
self._reader_thread = threading.Thread(
target=self._reader_loop, daemon=True, name="can_reader"
)
self._reader_thread.start()
self.get_logger().info(
f"can_bridge_node ready — iface={self._iface} "
f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id} "
f"mamba={self._mamba_id}"
)
# ── Connection management ──────────────────────────────────────────────
def _try_connect(self) -> None:
"""Attempt to open the CAN interface; silently skip if already connected."""
with self._lock:
if self._connected:
return
try:
bus = can.interface.Bus(
channel=self._iface,
bustype="socketcan",
)
self._bus = bus
self._connected = True
self.get_logger().info(f"CAN bus connected: {self._iface}")
self._publish_status("connected")
except Exception as exc:
self.get_logger().warning(
f"CAN bus not available ({self._iface}): {exc} "
f"— will retry every {_RECONNECT_INTERVAL_S:.0f} s"
)
self._connected = False
self._publish_status("disconnected")
def _reconnect_cb(self) -> None:
"""Periodic timer: try to reconnect when disconnected."""
if not self._connected:
self._try_connect()
def _handle_can_error(self, exc: Exception, context: str) -> None:
"""Mark bus as disconnected on any CAN error."""
self.get_logger().warning(f"CAN error in {context}: {exc}")
with self._lock:
if self._bus is not None:
try:
self._bus.shutdown()
except Exception:
pass
self._bus = None
self._connected = False
self._publish_status("disconnected")
# ── ROS callbacks ─────────────────────────────────────────────────────
def _cmd_vel_cb(self, msg: Twist) -> None:
"""Convert /cmd_vel Twist to VESC speed commands over CAN."""
self._last_cmd_time = time.monotonic()
if not self._connected:
return
# Differential drive decomposition — individual wheel speeds in m/s.
# The VESC nodes interpret linear velocity directly; angular is handled
# by the sign difference between left and right.
linear = msg.linear.x
angular = msg.angular.z
# Forward left = forward right for pure translation; for rotation
# left slows and right speeds up (positive angular = CCW = left turn).
# The Mamba velocity command carries both wheels independently.
left_mps = linear - angular
right_mps = linear + angular
payload = encode_velocity_cmd(left_mps, right_mps)
self._send_can(MAMBA_CMD_VELOCITY, payload, "cmd_vel")
# Keep Mamba in DRIVE mode while receiving commands
self._send_can(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE), "cmd_vel mode")
def _estop_cb(self, msg: Bool) -> None:
"""Forward /estop to Mamba over CAN."""
if not self._connected:
return
payload = encode_estop_cmd(msg.data)
self._send_can(MAMBA_CMD_ESTOP, payload, "estop")
if msg.data:
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP), "estop mode"
)
self.get_logger().warning("E-stop asserted — sent ESTOP to Mamba")
# ── Watchdog ──────────────────────────────────────────────────────────
def _watchdog_cb(self) -> None:
"""If no /cmd_vel arrives within the timeout, send zero velocity."""
if not self._connected:
return
elapsed = time.monotonic() - self._last_cmd_time
if elapsed > self._cmd_timeout:
self._send_can(
MAMBA_CMD_VELOCITY,
encode_velocity_cmd(0.0, 0.0),
"watchdog zero-vel",
)
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "watchdog idle"
)
# ── CAN send helper ───────────────────────────────────────────────────
def _send_can(self, arb_id: int, data: bytes, context: str) -> None:
"""Send a standard CAN frame; handle errors gracefully."""
with self._lock:
if not self._connected or self._bus is None:
return
bus = self._bus
msg = can.Message(
arbitration_id=arb_id,
data=data,
is_extended_id=False,
)
try:
bus.send(msg, timeout=0.05)
except can.CanError as exc:
self._handle_can_error(exc, f"send({context})")
# ── Background CAN reader ─────────────────────────────────────────────
def _reader_loop(self) -> None:
"""
Blocking CAN read loop executed in a daemon thread.
Dispatches incoming frames to the appropriate handler.
"""
while rclpy.ok():
with self._lock:
connected = self._connected
bus = self._bus
if not connected or bus is None:
time.sleep(0.1)
continue
try:
frame = bus.recv(timeout=0.5)
except can.CanError as exc:
self._handle_can_error(exc, "reader_loop recv")
continue
if frame is None:
# Timeout — no frame within 0.5 s, loop again
continue
self._dispatch_frame(frame)
def _dispatch_frame(self, frame: can.Message) -> None:
"""Route an incoming CAN frame to the correct publisher."""
arb_id = frame.arbitration_id
data = bytes(frame.data)
try:
if arb_id == MAMBA_TELEM_IMU:
self._handle_imu(data, frame.timestamp)
elif arb_id == MAMBA_TELEM_BATTERY:
self._handle_battery(data, frame.timestamp)
elif arb_id == VESC_TELEM_STATE + self._left_vesc_id:
self._handle_vesc_state(data, frame.timestamp, side="left")
elif arb_id == VESC_TELEM_STATE + self._right_vesc_id:
self._handle_vesc_state(data, frame.timestamp, side="right")
except Exception as exc:
self.get_logger().warning(
f"Error parsing CAN frame 0x{arb_id:03X}: {exc}"
)
# ── Frame handlers ────────────────────────────────────────────────────
def _handle_imu(self, data: bytes, timestamp: float) -> None:
telem = decode_imu_telem(data)
msg = Imu()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "imu_link"
msg.linear_acceleration.x = telem.accel_x
msg.linear_acceleration.y = telem.accel_y
msg.linear_acceleration.z = telem.accel_z
msg.angular_velocity.x = telem.gyro_x
msg.angular_velocity.y = telem.gyro_y
msg.angular_velocity.z = telem.gyro_z
# Covariance unknown; mark as -1 per REP-145
msg.orientation_covariance[0] = -1.0
self._pub_imu.publish(msg)
def _handle_battery(self, data: bytes, timestamp: float) -> None:
telem = decode_battery_telem(data)
msg = BatteryState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.voltage = telem.voltage
msg.current = telem.current
msg.present = True
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
self._pub_battery.publish(msg)
def _handle_vesc_state(
self, data: bytes, timestamp: float, side: str
) -> None:
telem = decode_vesc_state(data)
msg = Float32MultiArray()
# Layout: [erpm, duty, voltage, current]
msg.data = [telem.erpm, telem.duty, telem.voltage, telem.current]
if side == "left":
self._pub_vesc_left.publish(msg)
else:
self._pub_vesc_right.publish(msg)
# ── Status helper ─────────────────────────────────────────────────────
def _publish_status(self, status: str) -> None:
msg = String()
msg.data = status
self._pub_status.publish(msg)
# ── Shutdown ──────────────────────────────────────────────────────────
def destroy_node(self) -> None:
"""Send zero velocity and shut down the CAN bus cleanly."""
if self._connected and self._bus is not None:
try:
self._send_can(
MAMBA_CMD_VELOCITY,
encode_velocity_cmd(0.0, 0.0),
"shutdown",
)
self._send_can(
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "shutdown"
)
except Exception:
pass
try:
self._bus.shutdown()
except Exception:
pass
super().destroy_node()
# ---------------------------------------------------------------------------
def main(args=None) -> None:
rclpy.init(args=args)
node = CanBridgeNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
mamba_protocol.py CAN message encoding/decoding for the Mamba motor controller
and VESC telemetry.
CAN message layout
------------------
Command frames (Orin Mamba / VESC):
MAMBA_CMD_VELOCITY 0x100 8 bytes left_speed (f32, m/s) | right_speed (f32, m/s)
MAMBA_CMD_MODE 0x101 1 byte mode (0=idle, 1=drive, 2=estop)
MAMBA_CMD_ESTOP 0x102 1 byte 0x01 = stop
Telemetry frames (Mamba Orin):
MAMBA_TELEM_IMU 0x200 24 bytes accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z (f32 each)
MAMBA_TELEM_BATTERY 0x201 8 bytes voltage (f32, V) | current (f32, A)
VESC telemetry frame (VESC Orin):
VESC_TELEM_STATE 0x300 16 bytes erpm (f32) | duty (f32) | voltage (f32) | current (f32)
All multi-byte fields are big-endian.
Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
"""
import struct
from dataclasses import dataclass
from typing import Tuple
# ---------------------------------------------------------------------------
# CAN message IDs
# ---------------------------------------------------------------------------
MAMBA_CMD_VELOCITY: int = 0x100
MAMBA_CMD_MODE: int = 0x101
MAMBA_CMD_ESTOP: int = 0x102
MAMBA_TELEM_IMU: int = 0x200
MAMBA_TELEM_BATTERY: int = 0x201
VESC_TELEM_STATE: int = 0x300
# ---------------------------------------------------------------------------
# Mode constants
# ---------------------------------------------------------------------------
MODE_IDLE: int = 0
MODE_DRIVE: int = 1
MODE_ESTOP: int = 2
# ---------------------------------------------------------------------------
# Data classes for decoded telemetry
# ---------------------------------------------------------------------------
@dataclass
class ImuTelemetry:
"""Decoded IMU telemetry from Mamba (MAMBA_TELEM_IMU)."""
accel_x: float = 0.0 # m/s²
accel_y: float = 0.0
accel_z: float = 0.0
gyro_x: float = 0.0 # rad/s
gyro_y: float = 0.0
gyro_z: float = 0.0
@dataclass
class BatteryTelemetry:
"""Decoded battery telemetry from Mamba (MAMBA_TELEM_BATTERY)."""
voltage: float = 0.0 # V
current: float = 0.0 # A
@dataclass
class VescStateTelemetry:
"""Decoded VESC state telemetry (VESC_TELEM_STATE)."""
erpm: float = 0.0 # electrical RPM
duty: float = 0.0 # duty cycle [-1.0, 1.0]
voltage: float = 0.0 # bus voltage, V
current: float = 0.0 # phase current, A
# ---------------------------------------------------------------------------
# Encode helpers
# ---------------------------------------------------------------------------
_FMT_VEL = ">ff" # 2 × float32, big-endian
_FMT_MODE = ">B" # 1 × uint8
_FMT_ESTOP = ">B" # 1 × uint8
_FMT_IMU = ">ffffff" # 6 × float32
_FMT_BAT = ">ff" # 2 × float32
_FMT_VESC = ">ffff" # 4 × float32
def encode_velocity_cmd(left_mps: float, right_mps: float) -> bytes:
"""
Encode a MAMBA_CMD_VELOCITY payload.
Parameters
----------
left_mps: target left wheel speed in m/s (positive = forward)
right_mps: target right wheel speed in m/s (positive = forward)
Returns
-------
8-byte big-endian payload suitable for a CAN frame.
"""
return struct.pack(_FMT_VEL, float(left_mps), float(right_mps))
def encode_mode_cmd(mode: int) -> bytes:
"""
Encode a MAMBA_CMD_MODE payload.
Parameters
----------
mode: one of MODE_IDLE (0), MODE_DRIVE (1), MODE_ESTOP (2)
Returns
-------
1-byte payload.
"""
if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
raise ValueError(f"Invalid mode {mode!r}; expected 0, 1, or 2")
return struct.pack(_FMT_MODE, mode)
def encode_estop_cmd(stop: bool = True) -> bytes:
"""
Encode a MAMBA_CMD_ESTOP payload.
Parameters
----------
stop: True to assert e-stop, False to clear.
Returns
-------
1-byte payload (0x01 = stop, 0x00 = clear).
"""
return struct.pack(_FMT_ESTOP, 0x01 if stop else 0x00)
# ---------------------------------------------------------------------------
# Decode helpers
# ---------------------------------------------------------------------------
def decode_imu_telem(data: bytes) -> ImuTelemetry:
"""
Decode a MAMBA_TELEM_IMU payload.
Parameters
----------
data: exactly 24 bytes (6 × float32, big-endian).
Returns
-------
ImuTelemetry dataclass instance.
Raises
------
struct.error if data is the wrong length.
"""
ax, ay, az, gx, gy, gz = struct.unpack(_FMT_IMU, data)
return ImuTelemetry(
accel_x=ax, accel_y=ay, accel_z=az,
gyro_x=gx, gyro_y=gy, gyro_z=gz,
)
def decode_battery_telem(data: bytes) -> BatteryTelemetry:
"""
Decode a MAMBA_TELEM_BATTERY payload.
Parameters
----------
data: exactly 8 bytes (2 × float32, big-endian).
Returns
-------
BatteryTelemetry dataclass instance.
"""
voltage, current = struct.unpack(_FMT_BAT, data)
return BatteryTelemetry(voltage=voltage, current=current)
def decode_vesc_state(data: bytes) -> VescStateTelemetry:
"""
Decode a VESC_TELEM_STATE payload.
Parameters
----------
data: exactly 16 bytes (4 × float32, big-endian).
Returns
-------
VescStateTelemetry dataclass instance.
"""
erpm, duty, voltage, current = struct.unpack(_FMT_VESC, data)
return VescStateTelemetry(erpm=erpm, duty=duty, voltage=voltage, current=current)

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_can_bridge
[install]
install_scripts=$base/lib/saltybot_can_bridge

View File

@ -0,0 +1,26 @@
from setuptools import setup
package_name = "saltybot_can_bridge"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/config", ["config/can_bridge_params.yaml"]),
],
install_requires=["setuptools", "python-can"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="CAN bus bridge for Mamba controller and VESC telemetry",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"can_bridge_node = saltybot_can_bridge.can_bridge_node:main",
],
},
)

View File

@ -0,0 +1,254 @@
#!/usr/bin/env python3
"""
Unit tests for saltybot_can_bridge.mamba_protocol.
No ROS2 or CAN hardware required tests exercise encode/decode round-trips
and boundary conditions entirely in Python.
Run with: pytest test/test_can_bridge.py -v
"""
import struct
import unittest
from saltybot_can_bridge.mamba_protocol import (
MAMBA_CMD_ESTOP,
MAMBA_CMD_MODE,
MAMBA_CMD_VELOCITY,
MAMBA_TELEM_BATTERY,
MAMBA_TELEM_IMU,
VESC_TELEM_STATE,
MODE_DRIVE,
MODE_ESTOP,
MODE_IDLE,
BatteryTelemetry,
ImuTelemetry,
VescStateTelemetry,
decode_battery_telem,
decode_imu_telem,
decode_vesc_state,
encode_estop_cmd,
encode_mode_cmd,
encode_velocity_cmd,
)
class TestMessageIDs(unittest.TestCase):
"""Verify the CAN message ID constants are correct."""
def test_command_ids(self):
self.assertEqual(MAMBA_CMD_VELOCITY, 0x100)
self.assertEqual(MAMBA_CMD_MODE, 0x101)
self.assertEqual(MAMBA_CMD_ESTOP, 0x102)
def test_telemetry_ids(self):
self.assertEqual(MAMBA_TELEM_IMU, 0x200)
self.assertEqual(MAMBA_TELEM_BATTERY, 0x201)
self.assertEqual(VESC_TELEM_STATE, 0x300)
class TestVelocityEncode(unittest.TestCase):
"""Tests for encode_velocity_cmd."""
def test_zero_velocity(self):
payload = encode_velocity_cmd(0.0, 0.0)
self.assertEqual(len(payload), 8)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, 0.0, places=5)
self.assertAlmostEqual(right, 0.0, places=5)
def test_forward_velocity(self):
payload = encode_velocity_cmd(1.5, 1.5)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, 1.5, places=5)
self.assertAlmostEqual(right, 1.5, places=5)
def test_differential_velocity(self):
payload = encode_velocity_cmd(-0.5, 0.5)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, -0.5, places=5)
self.assertAlmostEqual(right, 0.5, places=5)
def test_large_velocity(self):
# No clamping at the protocol layer — values are sent as-is
payload = encode_velocity_cmd(10.0, -10.0)
left, right = struct.unpack(">ff", payload)
self.assertAlmostEqual(left, 10.0, places=3)
self.assertAlmostEqual(right, -10.0, places=3)
def test_payload_is_big_endian(self):
# Sanity check: first 4 bytes encode left speed
payload = encode_velocity_cmd(1.0, 0.0)
(left,) = struct.unpack(">f", payload[:4])
self.assertAlmostEqual(left, 1.0, places=5)
class TestModeEncode(unittest.TestCase):
"""Tests for encode_mode_cmd."""
def test_idle_mode(self):
payload = encode_mode_cmd(MODE_IDLE)
self.assertEqual(payload, b"\x00")
def test_drive_mode(self):
payload = encode_mode_cmd(MODE_DRIVE)
self.assertEqual(payload, b"\x01")
def test_estop_mode(self):
payload = encode_mode_cmd(MODE_ESTOP)
self.assertEqual(payload, b"\x02")
def test_invalid_mode_raises(self):
with self.assertRaises(ValueError):
encode_mode_cmd(99)
def test_invalid_mode_negative_raises(self):
with self.assertRaises(ValueError):
encode_mode_cmd(-1)
class TestEstopEncode(unittest.TestCase):
"""Tests for encode_estop_cmd."""
def test_estop_assert(self):
self.assertEqual(encode_estop_cmd(True), b"\x01")
def test_estop_clear(self):
self.assertEqual(encode_estop_cmd(False), b"\x00")
def test_estop_default_is_stop(self):
self.assertEqual(encode_estop_cmd(), b"\x01")
class TestImuDecodeRoundTrip(unittest.TestCase):
"""Round-trip tests for IMU telemetry."""
def _encode_imu(self, ax, ay, az, gx, gy, gz) -> bytes:
return struct.pack(">ffffff", ax, ay, az, gx, gy, gz)
def test_zero_imu(self):
data = self._encode_imu(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
telem = decode_imu_telem(data)
self.assertIsInstance(telem, ImuTelemetry)
self.assertAlmostEqual(telem.accel_x, 0.0, places=5)
self.assertAlmostEqual(telem.gyro_z, 0.0, places=5)
def test_nominal_imu(self):
data = self._encode_imu(0.1, -0.2, 9.81, 0.01, -0.02, 0.03)
telem = decode_imu_telem(data)
self.assertAlmostEqual(telem.accel_x, 0.1, places=4)
self.assertAlmostEqual(telem.accel_y, -0.2, places=4)
self.assertAlmostEqual(telem.accel_z, 9.81, places=3)
self.assertAlmostEqual(telem.gyro_x, 0.01, places=5)
self.assertAlmostEqual(telem.gyro_y, -0.02, places=5)
self.assertAlmostEqual(telem.gyro_z, 0.03, places=5)
def test_imu_bad_length_raises(self):
with self.assertRaises(struct.error):
decode_imu_telem(b"\x00" * 10) # too short
class TestBatteryDecodeRoundTrip(unittest.TestCase):
"""Round-trip tests for battery telemetry."""
def _encode_bat(self, voltage, current) -> bytes:
return struct.pack(">ff", voltage, current)
def test_nominal_battery(self):
data = self._encode_bat(24.6, 3.2)
telem = decode_battery_telem(data)
self.assertIsInstance(telem, BatteryTelemetry)
self.assertAlmostEqual(telem.voltage, 24.6, places=3)
self.assertAlmostEqual(telem.current, 3.2, places=4)
def test_zero_battery(self):
data = self._encode_bat(0.0, 0.0)
telem = decode_battery_telem(data)
self.assertAlmostEqual(telem.voltage, 0.0, places=5)
self.assertAlmostEqual(telem.current, 0.0, places=5)
def test_max_voltage(self):
# 6S LiPo max ~25.2 V; test with a high value
data = self._encode_bat(25.2, 0.0)
telem = decode_battery_telem(data)
self.assertAlmostEqual(telem.voltage, 25.2, places=3)
def test_battery_bad_length_raises(self):
with self.assertRaises(struct.error):
decode_battery_telem(b"\x00" * 4) # too short
class TestVescStateDecodeRoundTrip(unittest.TestCase):
"""Round-trip tests for VESC state telemetry."""
def _encode_vesc(self, erpm, duty, voltage, current) -> bytes:
return struct.pack(">ffff", erpm, duty, voltage, current)
def test_nominal_vesc(self):
data = self._encode_vesc(3000.0, 0.25, 24.0, 5.5)
telem = decode_vesc_state(data)
self.assertIsInstance(telem, VescStateTelemetry)
self.assertAlmostEqual(telem.erpm, 3000.0, places=2)
self.assertAlmostEqual(telem.duty, 0.25, places=5)
self.assertAlmostEqual(telem.voltage, 24.0, places=4)
self.assertAlmostEqual(telem.current, 5.5, places=4)
def test_zero_vesc(self):
data = self._encode_vesc(0.0, 0.0, 0.0, 0.0)
telem = decode_vesc_state(data)
self.assertAlmostEqual(telem.erpm, 0.0)
self.assertAlmostEqual(telem.duty, 0.0)
def test_reverse_erpm(self):
data = self._encode_vesc(-1500.0, -0.15, 23.0, 2.0)
telem = decode_vesc_state(data)
self.assertAlmostEqual(telem.erpm, -1500.0, places=2)
self.assertAlmostEqual(telem.duty, -0.15, places=5)
def test_vesc_bad_length_raises(self):
with self.assertRaises(struct.error):
decode_vesc_state(b"\x00" * 8) # too short (need 16)
class TestEncodeDecodeIdentity(unittest.TestCase):
"""Cross-module identity tests: encode then decode must be identity."""
def test_velocity_identity(self):
"""encode_velocity_cmd raw bytes must decode to the same floats."""
left, right = 0.75, -0.3
payload = encode_velocity_cmd(left, right)
decoded_l, decoded_r = struct.unpack(">ff", payload)
self.assertAlmostEqual(decoded_l, left, places=5)
self.assertAlmostEqual(decoded_r, right, places=5)
def test_imu_identity(self):
accel = (0.5, -0.5, 9.8)
gyro = (0.1, -0.1, 0.2)
raw = struct.pack(">ffffff", *accel, *gyro)
telem = decode_imu_telem(raw)
self.assertAlmostEqual(telem.accel_x, accel[0], places=4)
self.assertAlmostEqual(telem.accel_y, accel[1], places=4)
self.assertAlmostEqual(telem.accel_z, accel[2], places=3)
self.assertAlmostEqual(telem.gyro_x, gyro[0], places=5)
self.assertAlmostEqual(telem.gyro_y, gyro[1], places=5)
self.assertAlmostEqual(telem.gyro_z, gyro[2], places=5)
def test_battery_identity(self):
voltage, current = 22.4, 8.1
raw = struct.pack(">ff", voltage, current)
telem = decode_battery_telem(raw)
self.assertAlmostEqual(telem.voltage, voltage, places=3)
self.assertAlmostEqual(telem.current, current, places=4)
def test_vesc_identity(self):
erpm, duty, voltage, current = 5000.0, 0.5, 24.5, 10.0
raw = struct.pack(">ffff", erpm, duty, voltage, current)
telem = decode_vesc_state(raw)
self.assertAlmostEqual(telem.erpm, erpm, places=2)
self.assertAlmostEqual(telem.duty, duty, places=5)
self.assertAlmostEqual(telem.voltage, voltage, places=3)
self.assertAlmostEqual(telem.current, current, places=4)
if __name__ == "__main__":
unittest.main()

126
jetson/scripts/setup_can.sh Executable file
View File

@ -0,0 +1,126 @@
#!/usr/bin/env bash
# setup_can.sh — Bring up CANable 2.0 (slcan/ttyACM0) as slcan0 at 500 kbps
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
#
# This script uses slcand to expose the CANable 2.0 (USB CDC / slcan firmware)
# as a SocketCAN interface. It is NOT used for the gs_usb (native SocketCAN)
# firmware variant — use jetson/scripts/can_setup.sh for that.
#
# Usage:
# sudo ./setup_can.sh # bring up slcan0
# sudo ./setup_can.sh down # bring down slcan0 and kill slcand
# sudo ./setup_can.sh verify # candump one-shot check (Ctrl-C to stop)
#
# Environment overrides:
# CAN_DEVICE — serial device (default: /dev/ttyACM0)
# CAN_IFACE — SocketCAN name (default: slcan0)
# CAN_BITRATE — bit rate (default: 500000)
#
# Mamba CAN ID: 1 (0x001)
# VESC left: 56 (0x038)
# VESC right: 68 (0x044)
set -euo pipefail
DEVICE="${CAN_DEVICE:-/dev/ttyACM0}"
IFACE="${CAN_IFACE:-slcan0}"
BITRATE="${CAN_BITRATE:-500000}"
log() { echo "[setup_can] $*"; }
warn() { echo "[setup_can] WARNING: $*" >&2; }
die() { echo "[setup_can] ERROR: $*" >&2; exit 1; }
# Map numeric bitrate to slcand -s flag (0-8 map to standard CAN speeds)
bitrate_flag() {
case "$1" in
10000) echo "0" ;;
20000) echo "1" ;;
50000) echo "2" ;;
100000) echo "3" ;;
125000) echo "4" ;;
250000) echo "5" ;;
500000) echo "6" ;;
800000) echo "7" ;;
1000000) echo "8" ;;
*) die "Unsupported bitrate: $1 (choose 10k1M)" ;;
esac
}
# ── up ─────────────────────────────────────────────────────────────────────
cmd_up() {
# Verify serial device is present
if [[ ! -c "$DEVICE" ]]; then
die "$DEVICE not found — is CANable 2.0 plugged in?"
fi
# If interface already exists, leave it alone
if ip link show "$IFACE" &>/dev/null; then
log "$IFACE is already up — nothing to do."
ip -details link show "$IFACE"
return 0
fi
local sflag
sflag=$(bitrate_flag "$BITRATE")
log "Starting slcand on $DEVICE$IFACE at ${BITRATE} bps (flag -s${sflag}) …"
# -o open device, -c close on exit, -f forced, -s<N> speed, -S<baud> serial baud
slcand -o -c -f -s"${sflag}" -S 3000000 "$DEVICE" "$IFACE" \
|| die "slcand failed to start"
# Give slcand a moment to create the netdev
local retries=0
while ! ip link show "$IFACE" &>/dev/null; do
retries=$((retries + 1))
if [[ $retries -ge 10 ]]; then
die "Timed out waiting for $IFACE to appear after slcand start"
fi
sleep 0.2
done
log "Bringing up $IFACE"
ip link set "$IFACE" up \
|| die "ip link set $IFACE up failed"
log "$IFACE is up."
ip -details link show "$IFACE"
}
# ── down ───────────────────────────────────────────────────────────────────
cmd_down() {
log "Bringing down $IFACE"
if ip link show "$IFACE" &>/dev/null; then
ip link set "$IFACE" down || warn "Could not set $IFACE down"
else
warn "$IFACE not found — already down?"
fi
# Kill any running slcand instances bound to our device
if pgrep -f "slcand.*${DEVICE}" &>/dev/null; then
log "Stopping slcand for $DEVICE"
pkill -f "slcand.*${DEVICE}" || warn "pkill returned non-zero"
fi
log "Done."
}
# ── verify ─────────────────────────────────────────────────────────────────
cmd_verify() {
if ! ip link show "$IFACE" &>/dev/null; then
die "$IFACE is not up — run '$0 up' first"
fi
log "Listening on $IFACE — expecting frames from Mamba (0x001), VESC left (0x038), VESC right (0x044)"
log "Press Ctrl-C to stop."
exec candump "$IFACE"
}
# ── main ───────────────────────────────────────────────────────────────────
CMD="${1:-up}"
case "$CMD" in
up) cmd_up ;;
down) cmd_down ;;
verify) cmd_verify ;;
*)
echo "Usage: $0 [up|down|verify]"
exit 1
;;
esac