feat: Orin CANable 2.0 bridge for Mamba and VESC CAN bus (Issue #674)
Adds slcan setup script and saltybot_can_bridge ROS2 package implementing
full CAN bus integration between the Orin and the Mamba motor controller /
VESC motor controllers via a CANable 2.0 USB dongle (slcan interface).
- jetson/scripts/setup_can.sh: slcand-based bring-up/tear-down for slcan0
at 500 kbps with error handling (already up, device missing, retry)
- saltybot_can_bridge/mamba_protocol.py: CAN message ID constants and
encode/decode helpers for velocity, mode, e-stop, IMU, battery, VESC state
- saltybot_can_bridge/can_bridge_node.py: ROS2 node subscribing to /cmd_vel
and /estop, publishing /can/imu, /can/battery, /can/vesc/{left,right}/state
and /can/connection_status; background reader thread, watchdog zero-vel,
auto-reconnect every 5 s on CAN error
- config/can_bridge_params.yaml: default params (slcan0, VESC IDs 56/68,
Mamba ID 1, 0.5 s command timeout)
- test/test_can_bridge.py: 30 unit tests covering encode/decode round-trips
and edge cases — all pass without ROS2 or CAN hardware
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d9b4b10b90
commit
99344fae8a
@ -0,0 +1,7 @@
|
|||||||
|
can_bridge_node:
|
||||||
|
ros__parameters:
|
||||||
|
can_interface: slcan0
|
||||||
|
left_vesc_can_id: 56
|
||||||
|
right_vesc_can_id: 68
|
||||||
|
mamba_can_id: 1
|
||||||
|
command_timeout_s: 0.5
|
||||||
35
jetson/ros2_ws/src/saltybot_can_bridge/package.xml
Normal file
35
jetson/ros2_ws/src/saltybot_can_bridge/package.xml
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_can_bridge</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
CAN bus bridge for SaltyBot Orin — interfaces with the Mamba motor
|
||||||
|
controller and VESC motor controllers via CANable 2.0 (slcan interface).
|
||||||
|
Publishes IMU, battery, and VESC telemetry to ROS2 topics and forwards
|
||||||
|
cmd_vel / estop commands to the CAN bus.
|
||||||
|
|
||||||
|
System dependency: python3-can (apt: python3-can or pip: python-can)
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<exec_depend>rclpy</exec_depend>
|
||||||
|
<exec_depend>std_msgs</exec_depend>
|
||||||
|
<exec_depend>geometry_msgs</exec_depend>
|
||||||
|
<exec_depend>sensor_msgs</exec_depend>
|
||||||
|
|
||||||
|
<!-- python-can: install via apt install python3-can or pip install python-can -->
|
||||||
|
<exec_depend>python3-can</exec_depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1 @@
|
|||||||
|
"""SaltyBot CAN bridge package — Mamba controller and VESC telemetry via python-can."""
|
||||||
@ -0,0 +1,383 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
can_bridge_node.py — ROS2 node bridging the SaltyBot Orin to the Mamba motor
|
||||||
|
controller and VESC motor controllers over CAN bus.
|
||||||
|
|
||||||
|
The node opens the SocketCAN interface (slcan0 by default), spawns a background
|
||||||
|
reader thread to process incoming telemetry, and exposes the following interface:
|
||||||
|
|
||||||
|
Subscriptions
|
||||||
|
-------------
|
||||||
|
/cmd_vel geometry_msgs/Twist → VESC speed commands (CAN)
|
||||||
|
/estop std_msgs/Bool → Mamba e-stop (CAN)
|
||||||
|
|
||||||
|
Publications
|
||||||
|
------------
|
||||||
|
/can/imu sensor_msgs/Imu Mamba IMU telemetry
|
||||||
|
/can/battery sensor_msgs/BatteryState Mamba battery telemetry
|
||||||
|
/can/vesc/left/state std_msgs/Float32MultiArray Left VESC state
|
||||||
|
/can/vesc/right/state std_msgs/Float32MultiArray Right VESC state
|
||||||
|
/can/connection_status std_msgs/String "connected" | "disconnected"
|
||||||
|
|
||||||
|
Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
|
||||||
|
"""
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import can
|
||||||
|
import rclpy
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from rclpy.node import Node
|
||||||
|
from sensor_msgs.msg import BatteryState, Imu
|
||||||
|
from std_msgs.msg import Bool, Float32MultiArray, String
|
||||||
|
|
||||||
|
from saltybot_can_bridge.mamba_protocol import (
|
||||||
|
MAMBA_CMD_ESTOP,
|
||||||
|
MAMBA_CMD_MODE,
|
||||||
|
MAMBA_CMD_VELOCITY,
|
||||||
|
MAMBA_TELEM_BATTERY,
|
||||||
|
MAMBA_TELEM_IMU,
|
||||||
|
VESC_TELEM_STATE,
|
||||||
|
MODE_DRIVE,
|
||||||
|
MODE_ESTOP,
|
||||||
|
MODE_IDLE,
|
||||||
|
encode_estop_cmd,
|
||||||
|
encode_mode_cmd,
|
||||||
|
encode_velocity_cmd,
|
||||||
|
decode_battery_telem,
|
||||||
|
decode_imu_telem,
|
||||||
|
decode_vesc_state,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Reconnect attempt interval when CAN bus is lost
|
||||||
|
_RECONNECT_INTERVAL_S: float = 5.0
|
||||||
|
|
||||||
|
# Watchdog timer tick rate (Hz)
|
||||||
|
_WATCHDOG_HZ: float = 10.0
|
||||||
|
|
||||||
|
|
||||||
|
class CanBridgeNode(Node):
|
||||||
|
"""CAN bus bridge between Orin ROS2 and Mamba / VESC controllers."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("can_bridge_node")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("can_interface", "slcan0")
|
||||||
|
self.declare_parameter("left_vesc_can_id", 56)
|
||||||
|
self.declare_parameter("right_vesc_can_id", 68)
|
||||||
|
self.declare_parameter("mamba_can_id", 1)
|
||||||
|
self.declare_parameter("command_timeout_s", 0.5)
|
||||||
|
|
||||||
|
self._iface: str = self.get_parameter("can_interface").value
|
||||||
|
self._left_vesc_id: int = self.get_parameter("left_vesc_can_id").value
|
||||||
|
self._right_vesc_id: int = self.get_parameter("right_vesc_can_id").value
|
||||||
|
self._mamba_id: int = self.get_parameter("mamba_can_id").value
|
||||||
|
self._cmd_timeout: float = self.get_parameter("command_timeout_s").value
|
||||||
|
|
||||||
|
# ── State ─────────────────────────────────────────────────────────
|
||||||
|
self._bus: Optional[can.BusABC] = None
|
||||||
|
self._connected: bool = False
|
||||||
|
self._last_cmd_time: float = time.monotonic()
|
||||||
|
self._lock = threading.Lock() # protects _bus / _connected
|
||||||
|
|
||||||
|
# ── Publishers ────────────────────────────────────────────────────
|
||||||
|
self._pub_imu = self.create_publisher(Imu, "/can/imu", 10)
|
||||||
|
self._pub_battery = self.create_publisher(BatteryState, "/can/battery", 10)
|
||||||
|
self._pub_vesc_left = self.create_publisher(
|
||||||
|
Float32MultiArray, "/can/vesc/left/state", 10
|
||||||
|
)
|
||||||
|
self._pub_vesc_right = self.create_publisher(
|
||||||
|
Float32MultiArray, "/can/vesc/right/state", 10
|
||||||
|
)
|
||||||
|
self._pub_status = self.create_publisher(
|
||||||
|
String, "/can/connection_status", 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscriptions ─────────────────────────────────────────────────
|
||||||
|
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, 10)
|
||||||
|
self.create_subscription(Bool, "/estop", self._estop_cb, 10)
|
||||||
|
|
||||||
|
# ── Timers ────────────────────────────────────────────────────────
|
||||||
|
self.create_timer(1.0 / _WATCHDOG_HZ, self._watchdog_cb)
|
||||||
|
self.create_timer(_RECONNECT_INTERVAL_S, self._reconnect_cb)
|
||||||
|
|
||||||
|
# ── Open CAN ──────────────────────────────────────────────────────
|
||||||
|
self._try_connect()
|
||||||
|
|
||||||
|
# ── Background reader thread ──────────────────────────────────────
|
||||||
|
self._reader_thread = threading.Thread(
|
||||||
|
target=self._reader_loop, daemon=True, name="can_reader"
|
||||||
|
)
|
||||||
|
self._reader_thread.start()
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"can_bridge_node ready — iface={self._iface} "
|
||||||
|
f"left_vesc={self._left_vesc_id} right_vesc={self._right_vesc_id} "
|
||||||
|
f"mamba={self._mamba_id}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Connection management ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _try_connect(self) -> None:
|
||||||
|
"""Attempt to open the CAN interface; silently skip if already connected."""
|
||||||
|
with self._lock:
|
||||||
|
if self._connected:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
bus = can.interface.Bus(
|
||||||
|
channel=self._iface,
|
||||||
|
bustype="socketcan",
|
||||||
|
)
|
||||||
|
self._bus = bus
|
||||||
|
self._connected = True
|
||||||
|
self.get_logger().info(f"CAN bus connected: {self._iface}")
|
||||||
|
self._publish_status("connected")
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warning(
|
||||||
|
f"CAN bus not available ({self._iface}): {exc} "
|
||||||
|
f"— will retry every {_RECONNECT_INTERVAL_S:.0f} s"
|
||||||
|
)
|
||||||
|
self._connected = False
|
||||||
|
self._publish_status("disconnected")
|
||||||
|
|
||||||
|
def _reconnect_cb(self) -> None:
|
||||||
|
"""Periodic timer: try to reconnect when disconnected."""
|
||||||
|
if not self._connected:
|
||||||
|
self._try_connect()
|
||||||
|
|
||||||
|
def _handle_can_error(self, exc: Exception, context: str) -> None:
|
||||||
|
"""Mark bus as disconnected on any CAN error."""
|
||||||
|
self.get_logger().warning(f"CAN error in {context}: {exc}")
|
||||||
|
with self._lock:
|
||||||
|
if self._bus is not None:
|
||||||
|
try:
|
||||||
|
self._bus.shutdown()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._bus = None
|
||||||
|
self._connected = False
|
||||||
|
self._publish_status("disconnected")
|
||||||
|
|
||||||
|
# ── ROS callbacks ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _cmd_vel_cb(self, msg: Twist) -> None:
|
||||||
|
"""Convert /cmd_vel Twist to VESC speed commands over CAN."""
|
||||||
|
self._last_cmd_time = time.monotonic()
|
||||||
|
|
||||||
|
if not self._connected:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Differential drive decomposition — individual wheel speeds in m/s.
|
||||||
|
# The VESC nodes interpret linear velocity directly; angular is handled
|
||||||
|
# by the sign difference between left and right.
|
||||||
|
linear = msg.linear.x
|
||||||
|
angular = msg.angular.z
|
||||||
|
|
||||||
|
# Forward left = forward right for pure translation; for rotation
|
||||||
|
# left slows and right speeds up (positive angular = CCW = left turn).
|
||||||
|
# The Mamba velocity command carries both wheels independently.
|
||||||
|
left_mps = linear - angular
|
||||||
|
right_mps = linear + angular
|
||||||
|
|
||||||
|
payload = encode_velocity_cmd(left_mps, right_mps)
|
||||||
|
self._send_can(MAMBA_CMD_VELOCITY, payload, "cmd_vel")
|
||||||
|
|
||||||
|
# Keep Mamba in DRIVE mode while receiving commands
|
||||||
|
self._send_can(MAMBA_CMD_MODE, encode_mode_cmd(MODE_DRIVE), "cmd_vel mode")
|
||||||
|
|
||||||
|
def _estop_cb(self, msg: Bool) -> None:
|
||||||
|
"""Forward /estop to Mamba over CAN."""
|
||||||
|
if not self._connected:
|
||||||
|
return
|
||||||
|
payload = encode_estop_cmd(msg.data)
|
||||||
|
self._send_can(MAMBA_CMD_ESTOP, payload, "estop")
|
||||||
|
if msg.data:
|
||||||
|
self._send_can(
|
||||||
|
MAMBA_CMD_MODE, encode_mode_cmd(MODE_ESTOP), "estop mode"
|
||||||
|
)
|
||||||
|
self.get_logger().warning("E-stop asserted — sent ESTOP to Mamba")
|
||||||
|
|
||||||
|
# ── Watchdog ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _watchdog_cb(self) -> None:
|
||||||
|
"""If no /cmd_vel arrives within the timeout, send zero velocity."""
|
||||||
|
if not self._connected:
|
||||||
|
return
|
||||||
|
elapsed = time.monotonic() - self._last_cmd_time
|
||||||
|
if elapsed > self._cmd_timeout:
|
||||||
|
self._send_can(
|
||||||
|
MAMBA_CMD_VELOCITY,
|
||||||
|
encode_velocity_cmd(0.0, 0.0),
|
||||||
|
"watchdog zero-vel",
|
||||||
|
)
|
||||||
|
self._send_can(
|
||||||
|
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "watchdog idle"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── CAN send helper ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _send_can(self, arb_id: int, data: bytes, context: str) -> None:
|
||||||
|
"""Send a standard CAN frame; handle errors gracefully."""
|
||||||
|
with self._lock:
|
||||||
|
if not self._connected or self._bus is None:
|
||||||
|
return
|
||||||
|
bus = self._bus
|
||||||
|
|
||||||
|
msg = can.Message(
|
||||||
|
arbitration_id=arb_id,
|
||||||
|
data=data,
|
||||||
|
is_extended_id=False,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
bus.send(msg, timeout=0.05)
|
||||||
|
except can.CanError as exc:
|
||||||
|
self._handle_can_error(exc, f"send({context})")
|
||||||
|
|
||||||
|
# ── Background CAN reader ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _reader_loop(self) -> None:
|
||||||
|
"""
|
||||||
|
Blocking CAN read loop executed in a daemon thread.
|
||||||
|
Dispatches incoming frames to the appropriate handler.
|
||||||
|
"""
|
||||||
|
while rclpy.ok():
|
||||||
|
with self._lock:
|
||||||
|
connected = self._connected
|
||||||
|
bus = self._bus
|
||||||
|
|
||||||
|
if not connected or bus is None:
|
||||||
|
time.sleep(0.1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
frame = bus.recv(timeout=0.5)
|
||||||
|
except can.CanError as exc:
|
||||||
|
self._handle_can_error(exc, "reader_loop recv")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if frame is None:
|
||||||
|
# Timeout — no frame within 0.5 s, loop again
|
||||||
|
continue
|
||||||
|
|
||||||
|
self._dispatch_frame(frame)
|
||||||
|
|
||||||
|
def _dispatch_frame(self, frame: can.Message) -> None:
|
||||||
|
"""Route an incoming CAN frame to the correct publisher."""
|
||||||
|
arb_id = frame.arbitration_id
|
||||||
|
data = bytes(frame.data)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if arb_id == MAMBA_TELEM_IMU:
|
||||||
|
self._handle_imu(data, frame.timestamp)
|
||||||
|
|
||||||
|
elif arb_id == MAMBA_TELEM_BATTERY:
|
||||||
|
self._handle_battery(data, frame.timestamp)
|
||||||
|
|
||||||
|
elif arb_id == VESC_TELEM_STATE + self._left_vesc_id:
|
||||||
|
self._handle_vesc_state(data, frame.timestamp, side="left")
|
||||||
|
|
||||||
|
elif arb_id == VESC_TELEM_STATE + self._right_vesc_id:
|
||||||
|
self._handle_vesc_state(data, frame.timestamp, side="right")
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warning(
|
||||||
|
f"Error parsing CAN frame 0x{arb_id:03X}: {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Frame handlers ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _handle_imu(self, data: bytes, timestamp: float) -> None:
|
||||||
|
telem = decode_imu_telem(data)
|
||||||
|
|
||||||
|
msg = Imu()
|
||||||
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
msg.header.frame_id = "imu_link"
|
||||||
|
|
||||||
|
msg.linear_acceleration.x = telem.accel_x
|
||||||
|
msg.linear_acceleration.y = telem.accel_y
|
||||||
|
msg.linear_acceleration.z = telem.accel_z
|
||||||
|
|
||||||
|
msg.angular_velocity.x = telem.gyro_x
|
||||||
|
msg.angular_velocity.y = telem.gyro_y
|
||||||
|
msg.angular_velocity.z = telem.gyro_z
|
||||||
|
|
||||||
|
# Covariance unknown; mark as -1 per REP-145
|
||||||
|
msg.orientation_covariance[0] = -1.0
|
||||||
|
|
||||||
|
self._pub_imu.publish(msg)
|
||||||
|
|
||||||
|
def _handle_battery(self, data: bytes, timestamp: float) -> None:
|
||||||
|
telem = decode_battery_telem(data)
|
||||||
|
|
||||||
|
msg = BatteryState()
|
||||||
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
msg.voltage = telem.voltage
|
||||||
|
msg.current = telem.current
|
||||||
|
msg.present = True
|
||||||
|
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
|
||||||
|
|
||||||
|
self._pub_battery.publish(msg)
|
||||||
|
|
||||||
|
def _handle_vesc_state(
|
||||||
|
self, data: bytes, timestamp: float, side: str
|
||||||
|
) -> None:
|
||||||
|
telem = decode_vesc_state(data)
|
||||||
|
|
||||||
|
msg = Float32MultiArray()
|
||||||
|
# Layout: [erpm, duty, voltage, current]
|
||||||
|
msg.data = [telem.erpm, telem.duty, telem.voltage, telem.current]
|
||||||
|
|
||||||
|
if side == "left":
|
||||||
|
self._pub_vesc_left.publish(msg)
|
||||||
|
else:
|
||||||
|
self._pub_vesc_right.publish(msg)
|
||||||
|
|
||||||
|
# ── Status helper ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_status(self, status: str) -> None:
|
||||||
|
msg = String()
|
||||||
|
msg.data = status
|
||||||
|
self._pub_status.publish(msg)
|
||||||
|
|
||||||
|
# ── Shutdown ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
"""Send zero velocity and shut down the CAN bus cleanly."""
|
||||||
|
if self._connected and self._bus is not None:
|
||||||
|
try:
|
||||||
|
self._send_can(
|
||||||
|
MAMBA_CMD_VELOCITY,
|
||||||
|
encode_velocity_cmd(0.0, 0.0),
|
||||||
|
"shutdown",
|
||||||
|
)
|
||||||
|
self._send_can(
|
||||||
|
MAMBA_CMD_MODE, encode_mode_cmd(MODE_IDLE), "shutdown"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
self._bus.shutdown()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = CanBridgeNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,201 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
mamba_protocol.py — CAN message encoding/decoding for the Mamba motor controller
|
||||||
|
and VESC telemetry.
|
||||||
|
|
||||||
|
CAN message layout
|
||||||
|
------------------
|
||||||
|
Command frames (Orin → Mamba / VESC):
|
||||||
|
MAMBA_CMD_VELOCITY 0x100 8 bytes left_speed (f32, m/s) | right_speed (f32, m/s)
|
||||||
|
MAMBA_CMD_MODE 0x101 1 byte mode (0=idle, 1=drive, 2=estop)
|
||||||
|
MAMBA_CMD_ESTOP 0x102 1 byte 0x01 = stop
|
||||||
|
|
||||||
|
Telemetry frames (Mamba → Orin):
|
||||||
|
MAMBA_TELEM_IMU 0x200 24 bytes accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z (f32 each)
|
||||||
|
MAMBA_TELEM_BATTERY 0x201 8 bytes voltage (f32, V) | current (f32, A)
|
||||||
|
|
||||||
|
VESC telemetry frame (VESC → Orin):
|
||||||
|
VESC_TELEM_STATE 0x300 16 bytes erpm (f32) | duty (f32) | voltage (f32) | current (f32)
|
||||||
|
|
||||||
|
All multi-byte fields are big-endian.
|
||||||
|
|
||||||
|
Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
|
||||||
|
"""
|
||||||
|
|
||||||
|
import struct
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# CAN message IDs
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
MAMBA_CMD_VELOCITY: int = 0x100
|
||||||
|
MAMBA_CMD_MODE: int = 0x101
|
||||||
|
MAMBA_CMD_ESTOP: int = 0x102
|
||||||
|
|
||||||
|
MAMBA_TELEM_IMU: int = 0x200
|
||||||
|
MAMBA_TELEM_BATTERY: int = 0x201
|
||||||
|
|
||||||
|
VESC_TELEM_STATE: int = 0x300
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Mode constants
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
MODE_IDLE: int = 0
|
||||||
|
MODE_DRIVE: int = 1
|
||||||
|
MODE_ESTOP: int = 2
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Data classes for decoded telemetry
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ImuTelemetry:
|
||||||
|
"""Decoded IMU telemetry from Mamba (MAMBA_TELEM_IMU)."""
|
||||||
|
|
||||||
|
accel_x: float = 0.0 # m/s²
|
||||||
|
accel_y: float = 0.0
|
||||||
|
accel_z: float = 0.0
|
||||||
|
gyro_x: float = 0.0 # rad/s
|
||||||
|
gyro_y: float = 0.0
|
||||||
|
gyro_z: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class BatteryTelemetry:
|
||||||
|
"""Decoded battery telemetry from Mamba (MAMBA_TELEM_BATTERY)."""
|
||||||
|
|
||||||
|
voltage: float = 0.0 # V
|
||||||
|
current: float = 0.0 # A
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class VescStateTelemetry:
|
||||||
|
"""Decoded VESC state telemetry (VESC_TELEM_STATE)."""
|
||||||
|
|
||||||
|
erpm: float = 0.0 # electrical RPM
|
||||||
|
duty: float = 0.0 # duty cycle [-1.0, 1.0]
|
||||||
|
voltage: float = 0.0 # bus voltage, V
|
||||||
|
current: float = 0.0 # phase current, A
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Encode helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_FMT_VEL = ">ff" # 2 × float32, big-endian
|
||||||
|
_FMT_MODE = ">B" # 1 × uint8
|
||||||
|
_FMT_ESTOP = ">B" # 1 × uint8
|
||||||
|
_FMT_IMU = ">ffffff" # 6 × float32
|
||||||
|
_FMT_BAT = ">ff" # 2 × float32
|
||||||
|
_FMT_VESC = ">ffff" # 4 × float32
|
||||||
|
|
||||||
|
|
||||||
|
def encode_velocity_cmd(left_mps: float, right_mps: float) -> bytes:
|
||||||
|
"""
|
||||||
|
Encode a MAMBA_CMD_VELOCITY payload.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
left_mps: target left wheel speed in m/s (positive = forward)
|
||||||
|
right_mps: target right wheel speed in m/s (positive = forward)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
8-byte big-endian payload suitable for a CAN frame.
|
||||||
|
"""
|
||||||
|
return struct.pack(_FMT_VEL, float(left_mps), float(right_mps))
|
||||||
|
|
||||||
|
|
||||||
|
def encode_mode_cmd(mode: int) -> bytes:
|
||||||
|
"""
|
||||||
|
Encode a MAMBA_CMD_MODE payload.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
mode: one of MODE_IDLE (0), MODE_DRIVE (1), MODE_ESTOP (2)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
1-byte payload.
|
||||||
|
"""
|
||||||
|
if mode not in (MODE_IDLE, MODE_DRIVE, MODE_ESTOP):
|
||||||
|
raise ValueError(f"Invalid mode {mode!r}; expected 0, 1, or 2")
|
||||||
|
return struct.pack(_FMT_MODE, mode)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_estop_cmd(stop: bool = True) -> bytes:
|
||||||
|
"""
|
||||||
|
Encode a MAMBA_CMD_ESTOP payload.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
stop: True to assert e-stop, False to clear.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
1-byte payload (0x01 = stop, 0x00 = clear).
|
||||||
|
"""
|
||||||
|
return struct.pack(_FMT_ESTOP, 0x01 if stop else 0x00)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Decode helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def decode_imu_telem(data: bytes) -> ImuTelemetry:
|
||||||
|
"""
|
||||||
|
Decode a MAMBA_TELEM_IMU payload.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
data: exactly 24 bytes (6 × float32, big-endian).
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
ImuTelemetry dataclass instance.
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
struct.error if data is the wrong length.
|
||||||
|
"""
|
||||||
|
ax, ay, az, gx, gy, gz = struct.unpack(_FMT_IMU, data)
|
||||||
|
return ImuTelemetry(
|
||||||
|
accel_x=ax, accel_y=ay, accel_z=az,
|
||||||
|
gyro_x=gx, gyro_y=gy, gyro_z=gz,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def decode_battery_telem(data: bytes) -> BatteryTelemetry:
|
||||||
|
"""
|
||||||
|
Decode a MAMBA_TELEM_BATTERY payload.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
data: exactly 8 bytes (2 × float32, big-endian).
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
BatteryTelemetry dataclass instance.
|
||||||
|
"""
|
||||||
|
voltage, current = struct.unpack(_FMT_BAT, data)
|
||||||
|
return BatteryTelemetry(voltage=voltage, current=current)
|
||||||
|
|
||||||
|
|
||||||
|
def decode_vesc_state(data: bytes) -> VescStateTelemetry:
|
||||||
|
"""
|
||||||
|
Decode a VESC_TELEM_STATE payload.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
data: exactly 16 bytes (4 × float32, big-endian).
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
VescStateTelemetry dataclass instance.
|
||||||
|
"""
|
||||||
|
erpm, duty, voltage, current = struct.unpack(_FMT_VESC, data)
|
||||||
|
return VescStateTelemetry(erpm=erpm, duty=duty, voltage=voltage, current=current)
|
||||||
5
jetson/ros2_ws/src/saltybot_can_bridge/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_can_bridge/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_can_bridge
|
||||||
|
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_can_bridge
|
||||||
26
jetson/ros2_ws/src/saltybot_can_bridge/setup.py
Normal file
26
jetson/ros2_ws/src/saltybot_can_bridge/setup.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_can_bridge"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/config", ["config/can_bridge_params.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools", "python-can"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description="CAN bus bridge for Mamba controller and VESC telemetry",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"can_bridge_node = saltybot_can_bridge.can_bridge_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
254
jetson/ros2_ws/src/saltybot_can_bridge/test/test_can_bridge.py
Normal file
254
jetson/ros2_ws/src/saltybot_can_bridge/test/test_can_bridge.py
Normal file
@ -0,0 +1,254 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Unit tests for saltybot_can_bridge.mamba_protocol.
|
||||||
|
|
||||||
|
No ROS2 or CAN hardware required — tests exercise encode/decode round-trips
|
||||||
|
and boundary conditions entirely in Python.
|
||||||
|
|
||||||
|
Run with: pytest test/test_can_bridge.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import struct
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from saltybot_can_bridge.mamba_protocol import (
|
||||||
|
MAMBA_CMD_ESTOP,
|
||||||
|
MAMBA_CMD_MODE,
|
||||||
|
MAMBA_CMD_VELOCITY,
|
||||||
|
MAMBA_TELEM_BATTERY,
|
||||||
|
MAMBA_TELEM_IMU,
|
||||||
|
VESC_TELEM_STATE,
|
||||||
|
MODE_DRIVE,
|
||||||
|
MODE_ESTOP,
|
||||||
|
MODE_IDLE,
|
||||||
|
BatteryTelemetry,
|
||||||
|
ImuTelemetry,
|
||||||
|
VescStateTelemetry,
|
||||||
|
decode_battery_telem,
|
||||||
|
decode_imu_telem,
|
||||||
|
decode_vesc_state,
|
||||||
|
encode_estop_cmd,
|
||||||
|
encode_mode_cmd,
|
||||||
|
encode_velocity_cmd,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMessageIDs(unittest.TestCase):
|
||||||
|
"""Verify the CAN message ID constants are correct."""
|
||||||
|
|
||||||
|
def test_command_ids(self):
|
||||||
|
self.assertEqual(MAMBA_CMD_VELOCITY, 0x100)
|
||||||
|
self.assertEqual(MAMBA_CMD_MODE, 0x101)
|
||||||
|
self.assertEqual(MAMBA_CMD_ESTOP, 0x102)
|
||||||
|
|
||||||
|
def test_telemetry_ids(self):
|
||||||
|
self.assertEqual(MAMBA_TELEM_IMU, 0x200)
|
||||||
|
self.assertEqual(MAMBA_TELEM_BATTERY, 0x201)
|
||||||
|
self.assertEqual(VESC_TELEM_STATE, 0x300)
|
||||||
|
|
||||||
|
|
||||||
|
class TestVelocityEncode(unittest.TestCase):
|
||||||
|
"""Tests for encode_velocity_cmd."""
|
||||||
|
|
||||||
|
def test_zero_velocity(self):
|
||||||
|
payload = encode_velocity_cmd(0.0, 0.0)
|
||||||
|
self.assertEqual(len(payload), 8)
|
||||||
|
left, right = struct.unpack(">ff", payload)
|
||||||
|
self.assertAlmostEqual(left, 0.0, places=5)
|
||||||
|
self.assertAlmostEqual(right, 0.0, places=5)
|
||||||
|
|
||||||
|
def test_forward_velocity(self):
|
||||||
|
payload = encode_velocity_cmd(1.5, 1.5)
|
||||||
|
left, right = struct.unpack(">ff", payload)
|
||||||
|
self.assertAlmostEqual(left, 1.5, places=5)
|
||||||
|
self.assertAlmostEqual(right, 1.5, places=5)
|
||||||
|
|
||||||
|
def test_differential_velocity(self):
|
||||||
|
payload = encode_velocity_cmd(-0.5, 0.5)
|
||||||
|
left, right = struct.unpack(">ff", payload)
|
||||||
|
self.assertAlmostEqual(left, -0.5, places=5)
|
||||||
|
self.assertAlmostEqual(right, 0.5, places=5)
|
||||||
|
|
||||||
|
def test_large_velocity(self):
|
||||||
|
# No clamping at the protocol layer — values are sent as-is
|
||||||
|
payload = encode_velocity_cmd(10.0, -10.0)
|
||||||
|
left, right = struct.unpack(">ff", payload)
|
||||||
|
self.assertAlmostEqual(left, 10.0, places=3)
|
||||||
|
self.assertAlmostEqual(right, -10.0, places=3)
|
||||||
|
|
||||||
|
def test_payload_is_big_endian(self):
|
||||||
|
# Sanity check: first 4 bytes encode left speed
|
||||||
|
payload = encode_velocity_cmd(1.0, 0.0)
|
||||||
|
(left,) = struct.unpack(">f", payload[:4])
|
||||||
|
self.assertAlmostEqual(left, 1.0, places=5)
|
||||||
|
|
||||||
|
|
||||||
|
class TestModeEncode(unittest.TestCase):
|
||||||
|
"""Tests for encode_mode_cmd."""
|
||||||
|
|
||||||
|
def test_idle_mode(self):
|
||||||
|
payload = encode_mode_cmd(MODE_IDLE)
|
||||||
|
self.assertEqual(payload, b"\x00")
|
||||||
|
|
||||||
|
def test_drive_mode(self):
|
||||||
|
payload = encode_mode_cmd(MODE_DRIVE)
|
||||||
|
self.assertEqual(payload, b"\x01")
|
||||||
|
|
||||||
|
def test_estop_mode(self):
|
||||||
|
payload = encode_mode_cmd(MODE_ESTOP)
|
||||||
|
self.assertEqual(payload, b"\x02")
|
||||||
|
|
||||||
|
def test_invalid_mode_raises(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
encode_mode_cmd(99)
|
||||||
|
|
||||||
|
def test_invalid_mode_negative_raises(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
encode_mode_cmd(-1)
|
||||||
|
|
||||||
|
|
||||||
|
class TestEstopEncode(unittest.TestCase):
|
||||||
|
"""Tests for encode_estop_cmd."""
|
||||||
|
|
||||||
|
def test_estop_assert(self):
|
||||||
|
self.assertEqual(encode_estop_cmd(True), b"\x01")
|
||||||
|
|
||||||
|
def test_estop_clear(self):
|
||||||
|
self.assertEqual(encode_estop_cmd(False), b"\x00")
|
||||||
|
|
||||||
|
def test_estop_default_is_stop(self):
|
||||||
|
self.assertEqual(encode_estop_cmd(), b"\x01")
|
||||||
|
|
||||||
|
|
||||||
|
class TestImuDecodeRoundTrip(unittest.TestCase):
|
||||||
|
"""Round-trip tests for IMU telemetry."""
|
||||||
|
|
||||||
|
def _encode_imu(self, ax, ay, az, gx, gy, gz) -> bytes:
|
||||||
|
return struct.pack(">ffffff", ax, ay, az, gx, gy, gz)
|
||||||
|
|
||||||
|
def test_zero_imu(self):
|
||||||
|
data = self._encode_imu(0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
|
||||||
|
telem = decode_imu_telem(data)
|
||||||
|
self.assertIsInstance(telem, ImuTelemetry)
|
||||||
|
self.assertAlmostEqual(telem.accel_x, 0.0, places=5)
|
||||||
|
self.assertAlmostEqual(telem.gyro_z, 0.0, places=5)
|
||||||
|
|
||||||
|
def test_nominal_imu(self):
|
||||||
|
data = self._encode_imu(0.1, -0.2, 9.81, 0.01, -0.02, 0.03)
|
||||||
|
telem = decode_imu_telem(data)
|
||||||
|
self.assertAlmostEqual(telem.accel_x, 0.1, places=4)
|
||||||
|
self.assertAlmostEqual(telem.accel_y, -0.2, places=4)
|
||||||
|
self.assertAlmostEqual(telem.accel_z, 9.81, places=3)
|
||||||
|
self.assertAlmostEqual(telem.gyro_x, 0.01, places=5)
|
||||||
|
self.assertAlmostEqual(telem.gyro_y, -0.02, places=5)
|
||||||
|
self.assertAlmostEqual(telem.gyro_z, 0.03, places=5)
|
||||||
|
|
||||||
|
def test_imu_bad_length_raises(self):
|
||||||
|
with self.assertRaises(struct.error):
|
||||||
|
decode_imu_telem(b"\x00" * 10) # too short
|
||||||
|
|
||||||
|
|
||||||
|
class TestBatteryDecodeRoundTrip(unittest.TestCase):
|
||||||
|
"""Round-trip tests for battery telemetry."""
|
||||||
|
|
||||||
|
def _encode_bat(self, voltage, current) -> bytes:
|
||||||
|
return struct.pack(">ff", voltage, current)
|
||||||
|
|
||||||
|
def test_nominal_battery(self):
|
||||||
|
data = self._encode_bat(24.6, 3.2)
|
||||||
|
telem = decode_battery_telem(data)
|
||||||
|
self.assertIsInstance(telem, BatteryTelemetry)
|
||||||
|
self.assertAlmostEqual(telem.voltage, 24.6, places=3)
|
||||||
|
self.assertAlmostEqual(telem.current, 3.2, places=4)
|
||||||
|
|
||||||
|
def test_zero_battery(self):
|
||||||
|
data = self._encode_bat(0.0, 0.0)
|
||||||
|
telem = decode_battery_telem(data)
|
||||||
|
self.assertAlmostEqual(telem.voltage, 0.0, places=5)
|
||||||
|
self.assertAlmostEqual(telem.current, 0.0, places=5)
|
||||||
|
|
||||||
|
def test_max_voltage(self):
|
||||||
|
# 6S LiPo max ~25.2 V; test with a high value
|
||||||
|
data = self._encode_bat(25.2, 0.0)
|
||||||
|
telem = decode_battery_telem(data)
|
||||||
|
self.assertAlmostEqual(telem.voltage, 25.2, places=3)
|
||||||
|
|
||||||
|
def test_battery_bad_length_raises(self):
|
||||||
|
with self.assertRaises(struct.error):
|
||||||
|
decode_battery_telem(b"\x00" * 4) # too short
|
||||||
|
|
||||||
|
|
||||||
|
class TestVescStateDecodeRoundTrip(unittest.TestCase):
|
||||||
|
"""Round-trip tests for VESC state telemetry."""
|
||||||
|
|
||||||
|
def _encode_vesc(self, erpm, duty, voltage, current) -> bytes:
|
||||||
|
return struct.pack(">ffff", erpm, duty, voltage, current)
|
||||||
|
|
||||||
|
def test_nominal_vesc(self):
|
||||||
|
data = self._encode_vesc(3000.0, 0.25, 24.0, 5.5)
|
||||||
|
telem = decode_vesc_state(data)
|
||||||
|
self.assertIsInstance(telem, VescStateTelemetry)
|
||||||
|
self.assertAlmostEqual(telem.erpm, 3000.0, places=2)
|
||||||
|
self.assertAlmostEqual(telem.duty, 0.25, places=5)
|
||||||
|
self.assertAlmostEqual(telem.voltage, 24.0, places=4)
|
||||||
|
self.assertAlmostEqual(telem.current, 5.5, places=4)
|
||||||
|
|
||||||
|
def test_zero_vesc(self):
|
||||||
|
data = self._encode_vesc(0.0, 0.0, 0.0, 0.0)
|
||||||
|
telem = decode_vesc_state(data)
|
||||||
|
self.assertAlmostEqual(telem.erpm, 0.0)
|
||||||
|
self.assertAlmostEqual(telem.duty, 0.0)
|
||||||
|
|
||||||
|
def test_reverse_erpm(self):
|
||||||
|
data = self._encode_vesc(-1500.0, -0.15, 23.0, 2.0)
|
||||||
|
telem = decode_vesc_state(data)
|
||||||
|
self.assertAlmostEqual(telem.erpm, -1500.0, places=2)
|
||||||
|
self.assertAlmostEqual(telem.duty, -0.15, places=5)
|
||||||
|
|
||||||
|
def test_vesc_bad_length_raises(self):
|
||||||
|
with self.assertRaises(struct.error):
|
||||||
|
decode_vesc_state(b"\x00" * 8) # too short (need 16)
|
||||||
|
|
||||||
|
|
||||||
|
class TestEncodeDecodeIdentity(unittest.TestCase):
|
||||||
|
"""Cross-module identity tests: encode then decode must be identity."""
|
||||||
|
|
||||||
|
def test_velocity_identity(self):
|
||||||
|
"""encode_velocity_cmd raw bytes must decode to the same floats."""
|
||||||
|
left, right = 0.75, -0.3
|
||||||
|
payload = encode_velocity_cmd(left, right)
|
||||||
|
decoded_l, decoded_r = struct.unpack(">ff", payload)
|
||||||
|
self.assertAlmostEqual(decoded_l, left, places=5)
|
||||||
|
self.assertAlmostEqual(decoded_r, right, places=5)
|
||||||
|
|
||||||
|
def test_imu_identity(self):
|
||||||
|
accel = (0.5, -0.5, 9.8)
|
||||||
|
gyro = (0.1, -0.1, 0.2)
|
||||||
|
raw = struct.pack(">ffffff", *accel, *gyro)
|
||||||
|
telem = decode_imu_telem(raw)
|
||||||
|
self.assertAlmostEqual(telem.accel_x, accel[0], places=4)
|
||||||
|
self.assertAlmostEqual(telem.accel_y, accel[1], places=4)
|
||||||
|
self.assertAlmostEqual(telem.accel_z, accel[2], places=3)
|
||||||
|
self.assertAlmostEqual(telem.gyro_x, gyro[0], places=5)
|
||||||
|
self.assertAlmostEqual(telem.gyro_y, gyro[1], places=5)
|
||||||
|
self.assertAlmostEqual(telem.gyro_z, gyro[2], places=5)
|
||||||
|
|
||||||
|
def test_battery_identity(self):
|
||||||
|
voltage, current = 22.4, 8.1
|
||||||
|
raw = struct.pack(">ff", voltage, current)
|
||||||
|
telem = decode_battery_telem(raw)
|
||||||
|
self.assertAlmostEqual(telem.voltage, voltage, places=3)
|
||||||
|
self.assertAlmostEqual(telem.current, current, places=4)
|
||||||
|
|
||||||
|
def test_vesc_identity(self):
|
||||||
|
erpm, duty, voltage, current = 5000.0, 0.5, 24.5, 10.0
|
||||||
|
raw = struct.pack(">ffff", erpm, duty, voltage, current)
|
||||||
|
telem = decode_vesc_state(raw)
|
||||||
|
self.assertAlmostEqual(telem.erpm, erpm, places=2)
|
||||||
|
self.assertAlmostEqual(telem.duty, duty, places=5)
|
||||||
|
self.assertAlmostEqual(telem.voltage, voltage, places=3)
|
||||||
|
self.assertAlmostEqual(telem.current, current, places=4)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
126
jetson/scripts/setup_can.sh
Executable file
126
jetson/scripts/setup_can.sh
Executable file
@ -0,0 +1,126 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# setup_can.sh — Bring up CANable 2.0 (slcan/ttyACM0) as slcan0 at 500 kbps
|
||||||
|
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/674
|
||||||
|
#
|
||||||
|
# This script uses slcand to expose the CANable 2.0 (USB CDC / slcan firmware)
|
||||||
|
# as a SocketCAN interface. It is NOT used for the gs_usb (native SocketCAN)
|
||||||
|
# firmware variant — use jetson/scripts/can_setup.sh for that.
|
||||||
|
#
|
||||||
|
# Usage:
|
||||||
|
# sudo ./setup_can.sh # bring up slcan0
|
||||||
|
# sudo ./setup_can.sh down # bring down slcan0 and kill slcand
|
||||||
|
# sudo ./setup_can.sh verify # candump one-shot check (Ctrl-C to stop)
|
||||||
|
#
|
||||||
|
# Environment overrides:
|
||||||
|
# CAN_DEVICE — serial device (default: /dev/ttyACM0)
|
||||||
|
# CAN_IFACE — SocketCAN name (default: slcan0)
|
||||||
|
# CAN_BITRATE — bit rate (default: 500000)
|
||||||
|
#
|
||||||
|
# Mamba CAN ID: 1 (0x001)
|
||||||
|
# VESC left: 56 (0x038)
|
||||||
|
# VESC right: 68 (0x044)
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
DEVICE="${CAN_DEVICE:-/dev/ttyACM0}"
|
||||||
|
IFACE="${CAN_IFACE:-slcan0}"
|
||||||
|
BITRATE="${CAN_BITRATE:-500000}"
|
||||||
|
|
||||||
|
log() { echo "[setup_can] $*"; }
|
||||||
|
warn() { echo "[setup_can] WARNING: $*" >&2; }
|
||||||
|
die() { echo "[setup_can] ERROR: $*" >&2; exit 1; }
|
||||||
|
|
||||||
|
# Map numeric bitrate to slcand -s flag (0-8 map to standard CAN speeds)
|
||||||
|
bitrate_flag() {
|
||||||
|
case "$1" in
|
||||||
|
10000) echo "0" ;;
|
||||||
|
20000) echo "1" ;;
|
||||||
|
50000) echo "2" ;;
|
||||||
|
100000) echo "3" ;;
|
||||||
|
125000) echo "4" ;;
|
||||||
|
250000) echo "5" ;;
|
||||||
|
500000) echo "6" ;;
|
||||||
|
800000) echo "7" ;;
|
||||||
|
1000000) echo "8" ;;
|
||||||
|
*) die "Unsupported bitrate: $1 (choose 10k–1M)" ;;
|
||||||
|
esac
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── up ─────────────────────────────────────────────────────────────────────
|
||||||
|
cmd_up() {
|
||||||
|
# Verify serial device is present
|
||||||
|
if [[ ! -c "$DEVICE" ]]; then
|
||||||
|
die "$DEVICE not found — is CANable 2.0 plugged in?"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# If interface already exists, leave it alone
|
||||||
|
if ip link show "$IFACE" &>/dev/null; then
|
||||||
|
log "$IFACE is already up — nothing to do."
|
||||||
|
ip -details link show "$IFACE"
|
||||||
|
return 0
|
||||||
|
fi
|
||||||
|
|
||||||
|
local sflag
|
||||||
|
sflag=$(bitrate_flag "$BITRATE")
|
||||||
|
|
||||||
|
log "Starting slcand on $DEVICE → $IFACE at ${BITRATE} bps (flag -s${sflag}) …"
|
||||||
|
# -o open device, -c close on exit, -f forced, -s<N> speed, -S<baud> serial baud
|
||||||
|
slcand -o -c -f -s"${sflag}" -S 3000000 "$DEVICE" "$IFACE" \
|
||||||
|
|| die "slcand failed to start"
|
||||||
|
|
||||||
|
# Give slcand a moment to create the netdev
|
||||||
|
local retries=0
|
||||||
|
while ! ip link show "$IFACE" &>/dev/null; do
|
||||||
|
retries=$((retries + 1))
|
||||||
|
if [[ $retries -ge 10 ]]; then
|
||||||
|
die "Timed out waiting for $IFACE to appear after slcand start"
|
||||||
|
fi
|
||||||
|
sleep 0.2
|
||||||
|
done
|
||||||
|
|
||||||
|
log "Bringing up $IFACE …"
|
||||||
|
ip link set "$IFACE" up \
|
||||||
|
|| die "ip link set $IFACE up failed"
|
||||||
|
|
||||||
|
log "$IFACE is up."
|
||||||
|
ip -details link show "$IFACE"
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── down ───────────────────────────────────────────────────────────────────
|
||||||
|
cmd_down() {
|
||||||
|
log "Bringing down $IFACE …"
|
||||||
|
if ip link show "$IFACE" &>/dev/null; then
|
||||||
|
ip link set "$IFACE" down || warn "Could not set $IFACE down"
|
||||||
|
else
|
||||||
|
warn "$IFACE not found — already down?"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Kill any running slcand instances bound to our device
|
||||||
|
if pgrep -f "slcand.*${DEVICE}" &>/dev/null; then
|
||||||
|
log "Stopping slcand for $DEVICE …"
|
||||||
|
pkill -f "slcand.*${DEVICE}" || warn "pkill returned non-zero"
|
||||||
|
fi
|
||||||
|
log "Done."
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── verify ─────────────────────────────────────────────────────────────────
|
||||||
|
cmd_verify() {
|
||||||
|
if ! ip link show "$IFACE" &>/dev/null; then
|
||||||
|
die "$IFACE is not up — run '$0 up' first"
|
||||||
|
fi
|
||||||
|
log "Listening on $IFACE — expecting frames from Mamba (0x001), VESC left (0x038), VESC right (0x044)"
|
||||||
|
log "Press Ctrl-C to stop."
|
||||||
|
exec candump "$IFACE"
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── main ───────────────────────────────────────────────────────────────────
|
||||||
|
CMD="${1:-up}"
|
||||||
|
case "$CMD" in
|
||||||
|
up) cmd_up ;;
|
||||||
|
down) cmd_down ;;
|
||||||
|
verify) cmd_verify ;;
|
||||||
|
*)
|
||||||
|
echo "Usage: $0 [up|down|verify]"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
Loading…
x
Reference in New Issue
Block a user