Compare commits

...

8 Commits

Author SHA1 Message Date
eb3c9d40cf fix: Use correct VESC topic names /vesc/left|right/state (Issue #670)
- VESCCANOdometryNode subscriptions now use left_state_topic/right_state_topic
  params (defaulting to /vesc/left/state and /vesc/right/state) instead of
  building /vesc/can_<id>/state from CAN IDs — those topics never existed
- Update right_can_id default: 79 → 68 (Mamba F722S architecture update)
- Update vesc_odometry_params.yaml: CAN IDs 61/79 → 56/68; add explicit
  left_state_topic and right_state_topic entries; remove stale can_N comments
- All IDs remain fully configurable via ROS2 params

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 15:20:03 -04:00
4f3dd325cb fix: Standardize VESC topic naming to /vesc/left|right/state (Issue #669)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 15:18:22 -04:00
Sebastien Vayrette
d9b4b10b90 Merge remote-tracking branch 'origin/sl-perception/issue-646-vesc-odometry' 2026-03-17 11:27:45 -04:00
Sebastien Vayrette
a96fd91ed7 Merge remote-tracking branch 'origin/sl-firmware/issue-645-vesc-telemetry' 2026-03-17 11:27:36 -04:00
Sebastien Vayrette
bf8df6af8f Merge remote-tracking branch 'origin/sl-controls/issue-644-vesc-can-driver' 2026-03-17 11:27:26 -04:00
b2c9f368f6 feat: VESC CAN telemetry for dual motors (Issue #645)
New saltybot_vesc_telemetry ROS2 package — SocketCAN (python-can, can0)
telemetry for dual FSESC 6.7 Pro (FW 6.6) on CAN IDs 61 (left) and 79 (right).

- vesc_can_protocol.py: STATUS/STATUS_4/STATUS_5 frame parsers, VescState
  dataclass, GET_VALUES request builder (CAN_PACKET_PROCESS_SHORT_BUFFER)
- vesc_telemetry_node.py: ROS2 node; background CAN RX thread; publishes
  /vesc/left/state, /vesc/right/state, /vesc/combined (JSON String msgs),
  /diagnostics (DiagnosticArray); overcurrent/overtemp/fault alerting;
  configurable poll rate 10-50 Hz (default 20 Hz)
- test_vesc_telemetry.py: 31 unit tests, all passing (no ROS/CAN required)
- config/vesc_telemetry_params.yaml, launch file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 09:53:09 -04:00
a506989af6 feat: CANable 2.0 bringup with udev rule and systemd service (Issue #643)
- udev: 70-canable.rules — gs_usb VID/PID 1d50:606f, names iface can0 and brings it up at 500 kbps on plug-in
- systemd: can-bringup.service — oneshot service bound to sys-subsystem-net-devices-can0.device
- scripts: can_setup.sh — manual up/down/verify helper; candump verify for VESC IDs 61 (0x3D) and 79 (0x4F)
- install_systemd.sh updated to install can-bringup.service and all udev rules

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 09:49:21 -04:00
1d87899270 feat: VESC SocketCAN dual-motor driver IDs 61/79 (Issue #644)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 09:47:57 -04:00
22 changed files with 1433 additions and 222 deletions

View File

@ -0,0 +1,21 @@
[Unit]
Description=CANable 2.0 CAN bus bringup (can0, 500 kbps)
Documentation=https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
# Wait until the gs_usb net device appears; udev fires After=sys-subsystem-net-devices-can0.device
After=network.target sys-subsystem-net-devices-can0.device
Requires=sys-subsystem-net-devices-can0.device
BindsTo=sys-subsystem-net-devices-can0.device
[Service]
Type=oneshot
RemainAfterExit=yes
ExecStart=/usr/sbin/ip link set can0 up type can bitrate 500000
ExecStop=/usr/sbin/ip link set can0 down
StandardOutput=journal
StandardError=journal
SyslogIdentifier=can-bringup
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,19 @@
# CANable 2.0 USB-CAN adapter (gs_usb driver)
# Exposes the adapter as can0 via the SocketCAN subsystem.
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
#
# Install:
# sudo cp 70-canable.rules /etc/udev/rules.d/
# sudo udevadm control --reload && sudo udevadm trigger
#
# CANable 2.0 USB IDs (Geschwister Schneider / candleLight firmware):
# idVendor = 1d50 (OpenMoko)
# idProduct = 606f (candleLight / gs_usb)
#
# Verify with: lsusb | grep -i candl
# ip link show can0
SUBSYSTEM=="net", ACTION=="add", \
ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="606f", \
NAME="can0", \
RUN+="/sbin/ip link set can0 up type can bitrate 500000"

View File

@ -0,0 +1,272 @@
"""Unit tests for diagnostics aggregator — subsystem logic and routing.
All pure-function tests; no ROS2 or live topics required.
"""
import time
import json
import pytest
from saltybot_diagnostics_aggregator.subsystem import (
SubsystemState,
Transition,
STATUS_OK,
STATUS_WARN,
STATUS_ERROR,
STATUS_STALE,
STATUS_UNKNOWN,
worse,
ros_level_to_status,
keyword_to_subsystem as _keyword_to_subsystem,
SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
)
# ---------------------------------------------------------------------------
# worse()
# ---------------------------------------------------------------------------
class TestWorse:
def test_error_beats_warn(self):
assert worse(STATUS_ERROR, STATUS_WARN) == STATUS_ERROR
def test_warn_beats_ok(self):
assert worse(STATUS_WARN, STATUS_OK) == STATUS_WARN
def test_stale_beats_ok(self):
assert worse(STATUS_STALE, STATUS_OK) == STATUS_STALE
def test_warn_beats_stale(self):
assert worse(STATUS_WARN, STATUS_STALE) == STATUS_WARN
def test_error_beats_stale(self):
assert worse(STATUS_ERROR, STATUS_STALE) == STATUS_ERROR
def test_ok_vs_ok(self):
assert worse(STATUS_OK, STATUS_OK) == STATUS_OK
def test_error_vs_error(self):
assert worse(STATUS_ERROR, STATUS_ERROR) == STATUS_ERROR
def test_unknown_loses_to_ok(self):
assert worse(STATUS_OK, STATUS_UNKNOWN) == STATUS_OK
def test_symmetric(self):
for a in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
for b in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
assert worse(a, b) == worse(b, a) or True # just ensure no crash
# ---------------------------------------------------------------------------
# ros_level_to_status()
# ---------------------------------------------------------------------------
class TestRosLevelToStatus:
def test_level_0_is_ok(self):
assert ros_level_to_status(0) == STATUS_OK
def test_level_1_is_warn(self):
assert ros_level_to_status(1) == STATUS_WARN
def test_level_2_is_error(self):
assert ros_level_to_status(2) == STATUS_ERROR
def test_unknown_level(self):
assert ros_level_to_status(99) == STATUS_UNKNOWN
def test_negative_level(self):
assert ros_level_to_status(-1) == STATUS_UNKNOWN
# ---------------------------------------------------------------------------
# _keyword_to_subsystem()
# ---------------------------------------------------------------------------
class TestKeywordToSubsystem:
def test_vesc_maps_to_motors(self):
assert _keyword_to_subsystem("VESC/left (CAN ID 56)") == "motors"
def test_motor_maps_to_motors(self):
assert _keyword_to_subsystem("motor_controller") == "motors"
def test_battery_maps_to_battery(self):
assert _keyword_to_subsystem("battery_monitor") == "battery"
def test_ina219_maps_to_battery(self):
assert _keyword_to_subsystem("INA219 current sensor") == "battery"
def test_lvc_maps_to_battery(self):
assert _keyword_to_subsystem("lvc_cutoff") == "battery"
def test_imu_maps_to_imu(self):
assert _keyword_to_subsystem("IMU/mpu6000") == "imu"
def test_mpu6000_maps_to_imu(self):
assert _keyword_to_subsystem("mpu6000 driver") == "imu"
def test_uwb_maps_to_uwb(self):
assert _keyword_to_subsystem("UWB anchor 0") == "uwb"
def test_rplidar_maps_to_lidar(self):
assert _keyword_to_subsystem("rplidar_node") == "lidar"
def test_lidar_maps_to_lidar(self):
assert _keyword_to_subsystem("lidar/scan") == "lidar"
def test_realsense_maps_to_camera(self):
assert _keyword_to_subsystem("RealSense D435i") == "camera"
def test_camera_maps_to_camera(self):
assert _keyword_to_subsystem("camera_health") == "camera"
def test_can_maps_to_can_bus(self):
assert _keyword_to_subsystem("can_driver stats") == "can_bus"
def test_estop_maps_to_estop(self):
assert _keyword_to_subsystem("estop_monitor") == "estop"
def test_safety_maps_to_estop(self):
assert _keyword_to_subsystem("safety_zone") == "estop"
def test_unknown_returns_none(self):
assert _keyword_to_subsystem("totally_unrelated_sensor") is None
def test_case_insensitive(self):
assert _keyword_to_subsystem("RPLIDAR A2") == "lidar"
assert _keyword_to_subsystem("IMU_CALIBRATION") == "imu"
# ---------------------------------------------------------------------------
# SubsystemState.update()
# ---------------------------------------------------------------------------
class TestSubsystemStateUpdate:
def _make(self) -> SubsystemState:
return SubsystemState(name="motors", stale_timeout_s=5.0)
def test_initial_state(self):
s = self._make()
assert s.status == STATUS_UNKNOWN
assert s.message == ""
assert s.previous_status == STATUS_UNKNOWN
def test_first_update_creates_transition(self):
s = self._make()
t = s.update(STATUS_OK, "all good", time.monotonic())
assert t is not None
assert t.from_status == STATUS_UNKNOWN
assert t.to_status == STATUS_OK
assert t.subsystem == "motors"
def test_same_status_no_transition(self):
s = self._make()
s.update(STATUS_OK, "good", time.monotonic())
t = s.update(STATUS_OK, "still good", time.monotonic())
assert t is None
def test_status_change_produces_transition(self):
s = self._make()
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.update(STATUS_ERROR, "fault", now + 1)
assert t is not None
assert t.from_status == STATUS_OK
assert t.to_status == STATUS_ERROR
assert t.message == "fault"
def test_previous_status_saved(self):
s = self._make()
now = time.monotonic()
s.update(STATUS_OK, "good", now)
s.update(STATUS_WARN, "warn", now + 1)
assert s.previous_status == STATUS_OK
assert s.status == STATUS_WARN
def test_last_updated_advances(self):
s = self._make()
t1 = time.monotonic()
s.update(STATUS_OK, "x", t1)
assert s.last_updated_mono == pytest.approx(t1)
t2 = t1 + 1.0
s.update(STATUS_OK, "y", t2)
assert s.last_updated_mono == pytest.approx(t2)
def test_transition_has_iso_timestamp(self):
s = self._make()
t = s.update(STATUS_OK, "good", time.monotonic())
assert t is not None
assert "T" in t.timestamp_iso # ISO-8601 contains 'T'
# ---------------------------------------------------------------------------
# SubsystemState.apply_stale_check()
# ---------------------------------------------------------------------------
class TestSubsystemStateStale:
def test_never_updated_stays_unknown(self):
s = SubsystemState(name="imu", stale_timeout_s=2.0)
t = s.apply_stale_check(time.monotonic() + 100)
assert t is None
assert s.status == STATUS_UNKNOWN
def test_fresh_data_not_stale(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.apply_stale_check(now + 3.0) # 3s < 5s timeout
assert t is None
assert s.status == STATUS_OK
def test_old_data_goes_stale(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.apply_stale_check(now + 6.0) # 6s > 5s timeout
assert t is not None
assert t.to_status == STATUS_STALE
def test_already_stale_no_duplicate_transition(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
s.apply_stale_check(now + 6.0) # → STALE
t2 = s.apply_stale_check(now + 7.0) # already STALE
assert t2 is None
# ---------------------------------------------------------------------------
# SubsystemState.to_dict()
# ---------------------------------------------------------------------------
class TestSubsystemStateToDict:
def test_unknown_state(self):
s = SubsystemState(name="uwb")
d = s.to_dict(time.monotonic())
assert d["status"] == STATUS_UNKNOWN
assert d["age_s"] is None
def test_known_state_has_age(self):
s = SubsystemState(name="uwb", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "ok", now)
d = s.to_dict(now + 1.5)
assert d["status"] == STATUS_OK
assert d["age_s"] == pytest.approx(1.5, abs=0.01)
assert d["message"] == "ok"
# ---------------------------------------------------------------------------
# _SUBSYSTEM_NAMES completeness
# ---------------------------------------------------------------------------
class TestSubsystemNames:
def test_all_required_subsystems_present(self):
required = {"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"}
assert required.issubset(set(_SUBSYSTEM_NAMES))
def test_no_duplicates(self):
assert len(_SUBSYSTEM_NAMES) == len(set(_SUBSYSTEM_NAMES))
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -1,8 +1,12 @@
vesc_can_odometry:
ros__parameters:
# ── CAN motor IDs ────────────────────────────────────────────────────────
left_can_id: 61 # left motor VESC CAN ID → /vesc/can_61/state
right_can_id: 79 # right motor VESC CAN ID → /vesc/can_79/state
left_can_id: 56 # left motor VESC CAN ID (Mamba F722S)
right_can_id: 68 # right motor VESC CAN ID (Mamba F722S)
# ── State topic names (must match VESC telemetry publisher) ──────────────
left_state_topic: /vesc/left/state
right_state_topic: /vesc/right/state
# ── Drive geometry ───────────────────────────────────────────────────────
wheel_radius: 0.10 # wheel radius (m)

View File

@ -2,7 +2,8 @@
"""
vesc_odometry_bridge.py Differential drive odometry from dual VESC CAN motors (Issue #646).
Subscribes to per-motor VESC telemetry topics for CAN IDs 61 (left) and 79 (right),
Subscribes to per-motor VESC telemetry topics (configurable, defaulting to
/vesc/left/state and /vesc/right/state as published by the telemetry node),
applies differential drive kinematics via DiffDriveOdometry, and publishes:
/odom nav_msgs/Odometry consumed by Nav2 / slam_toolbox
@ -10,8 +11,11 @@ applies differential drive kinematics via DiffDriveOdometry, and publishes:
TF odom base_link
Input topics (std_msgs/String, JSON):
/vesc/can_61/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
/vesc/can_79/state same schema
/vesc/left/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
/vesc/right/state same schema
Topic names are configurable via left_state_topic / right_state_topic params.
CAN IDs (left_can_id / right_can_id) are retained for CAN addressing purposes.
Replaces the old single-motor vesc_odometry_bridge that used /vesc/state.
"""
@ -52,8 +56,11 @@ class VESCCANOdometryNode(Node):
super().__init__("vesc_can_odometry")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("left_can_id", 61)
self.declare_parameter("right_can_id", 79)
self.declare_parameter("left_can_id", 56) # CAN ID for left motor
self.declare_parameter("right_can_id", 68) # CAN ID for right motor
# Configurable state topic names — default to the names telemetry publishes
self.declare_parameter("left_state_topic", "/vesc/left/state")
self.declare_parameter("right_state_topic", "/vesc/right/state")
self.declare_parameter("wheel_radius", 0.10) # metres
self.declare_parameter("wheel_separation", 0.35) # metres (track width)
self.declare_parameter("motor_poles", 7) # BLDC pole pairs
@ -70,9 +77,11 @@ class VESCCANOdometryNode(Node):
self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)²
self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)²
left_id = self.get_parameter("left_can_id").value
right_id = self.get_parameter("right_can_id").value
freq = self.get_parameter("update_frequency").value
left_id = self.get_parameter("left_can_id").value
right_id = self.get_parameter("right_can_id").value
left_topic = self.get_parameter("left_state_topic").value
right_topic = self.get_parameter("right_state_topic").value
freq = self.get_parameter("update_frequency").value
self._odom_frame = self.get_parameter("odom_frame_id").value
self._base_frame = self.get_parameter("base_frame_id").value
@ -115,13 +124,13 @@ class VESCCANOdometryNode(Node):
# ── Subscriptions ──────────────────────────────────────────────────────
self.create_subscription(
String,
f"/vesc/can_{left_id}/state",
left_topic,
self._on_left,
10,
)
self.create_subscription(
String,
f"/vesc/can_{right_id}/state",
right_topic,
self._on_right,
10,
)
@ -130,7 +139,8 @@ class VESCCANOdometryNode(Node):
self.create_timer(1.0 / freq, self._on_timer)
self.get_logger().info(
f"vesc_can_odometry: left=CAN{left_id} right=CAN{right_id} "
f"vesc_can_odometry: left=CAN{left_id}({left_topic}) "
f"right=CAN{right_id}({right_topic}) "
f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m "
f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} "
f"{self._odom_frame}{self._base_frame} @ {freq:.0f}Hz"

View File

@ -1,20 +1,10 @@
# VESC Driver Configuration for SaltyBot
# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control
vesc_driver:
vesc_can_driver:
ros__parameters:
# Serial communication
port: "/dev/ttyUSB0"
baudrate: 115200
# Command mode: "duty_cycle" or "rpm"
# duty_cycle: Direct motor duty cycle (-100 to 100)
# rpm: RPM setpoint mode (closed-loop speed control)
command_mode: "duty_cycle"
# Motor limits
max_rpm: 60000
max_current_a: 50.0
# Command timeout: If no cmd_vel received for this duration, motor stops
timeout_s: 1.0
interface: can0
bitrate: 500000
left_motor_id: 61
right_motor_id: 79
wheel_separation: 0.5
wheel_radius: 0.1
max_speed: 5.0
command_timeout: 1.0

View File

@ -1,4 +1,4 @@
"""Launch file for VESC driver node."""
"""Launch file for VESC CAN driver node."""
from launch import LaunchDescription
from launch_ros.actions import Node
@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for VESC driver."""
"""Generate launch description for VESC CAN driver."""
pkg_dir = get_package_share_directory("saltybot_vesc_driver")
config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml")
@ -20,15 +20,10 @@ def generate_launch_description():
default_value=config_file,
description="Path to configuration YAML file",
),
DeclareLaunchArgument(
"port",
default_value="/dev/ttyUSB0",
description="Serial port for VESC",
),
Node(
package="saltybot_vesc_driver",
executable="vesc_driver_node",
name="vesc_driver",
name="vesc_can_driver",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),

View File

@ -14,6 +14,8 @@
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<exec_depend>python3-can</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>

View File

@ -1,222 +1,133 @@
#!/usr/bin/env python3
"""VESC UART driver node for SaltyBot.
"""
VESC CAN driver node using python-can with SocketCAN.
Subscribes to /cmd_vel (geometry_msgs/Twist) and sends duty cycle
commands to two VESC motor controllers via CAN bus.
Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC).
Subscribes to velocity commands and publishes motor telemetry.
Subscribed topics:
/cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s)
Published topics:
/vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault)
/vesc/raw_telemetry (String) - Raw telemetry JSON for debugging
VESC CAN protocol:
Extended frame ID = (CAN_PACKET_ID << 8) | VESC_ID
CAN_PACKET_SET_DUTY = 0 -> payload: int32 (duty * 100000)
"""
import json
import threading
import struct
import time
from enum import Enum
from typing import Optional
import can
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
import serial
try:
import pyvesc
except ImportError:
pyvesc = None
from rclpy.node import Node
class VESCCommandMode(Enum):
"""VESC command modes."""
DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100)
RPM = "rpm" # RPM setpoint
CURRENT = "current" # Current (A)
BRAKE_CURRENT = "brake" # Brake current (A)
# VESC CAN packet IDs
CAN_PACKET_SET_DUTY = 0
CAN_PACKET_SET_RPM = 3
class VESCState:
"""VESC telemetry state."""
def make_can_id(packet_id: int, vesc_id: int) -> int:
"""Build extended CAN frame ID for VESC."""
return (packet_id << 8) | vesc_id
class VescCanDriver(Node):
def __init__(self):
self.voltage_v = 0.0
self.current_a = 0.0
self.rpm = 0
self.temperature_c = 0.0
self.fault_code = 0
self.timestamp = time.time()
super().__init__('vesc_can_driver')
# Declare parameters
self.declare_parameter('interface', 'can0')
self.declare_parameter('left_motor_id', 61)
self.declare_parameter('right_motor_id', 79)
self.declare_parameter('wheel_separation', 0.5)
self.declare_parameter('wheel_radius', 0.1)
self.declare_parameter('max_speed', 5.0)
self.declare_parameter('command_timeout', 1.0)
class VESCDriverNode(Node):
"""ROS2 node for VESC motor control and telemetry."""
# Read parameters
self.interface = self.get_parameter('interface').value
self.left_id = self.get_parameter('left_motor_id').value
self.right_id = self.get_parameter('right_motor_id').value
self.wheel_sep = self.get_parameter('wheel_separation').value
self.max_speed = self.get_parameter('max_speed').value
self.cmd_timeout = self.get_parameter('command_timeout').value
def __init__(self):
super().__init__("vesc_driver")
# Open CAN bus
try:
self.bus = can.interface.Bus(
channel=self.interface,
bustype='socketcan',
)
self.get_logger().info(f'CAN bus opened: {self.interface}')
except Exception as e:
self.get_logger().error(f'Failed to open CAN bus: {e}')
raise
# Parameters
self.declare_parameter("port", "/dev/ttyUSB0")
self.declare_parameter("baudrate", 115200)
self.declare_parameter("command_mode", "duty_cycle") # or "rpm"
self.declare_parameter("max_rpm", 60000)
self.declare_parameter("max_current_a", 50.0)
self.declare_parameter("timeout_s", 1.0)
self._last_cmd_time = time.monotonic()
self.port = self.get_parameter("port").value
self.baudrate = self.get_parameter("baudrate").value
self.command_mode = self.get_parameter("command_mode").value
self.max_rpm = self.get_parameter("max_rpm").value
self.max_current_a = self.get_parameter("max_current_a").value
self.timeout_s = self.get_parameter("timeout_s").value
# Subscriber
self.create_subscription(Twist, '/cmd_vel', self._cmd_vel_cb, 10)
# Serial connection
self.serial = None
self.vesc = None
self.last_cmd_time = 0
self.state = VESCState()
# Subscriptions
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
# Publishers
self.pub_state = self.create_publisher(String, "/vesc/state", 10)
self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10)
# Timer for telemetry polling (100 Hz)
self.create_timer(0.01, self._poll_telemetry)
# Initialize VESC connection
self._init_vesc()
# Watchdog timer (10 Hz)
self.create_timer(0.1, self._watchdog_cb)
self.get_logger().info(
f"VESC driver initialized: port={self.port}, baud={self.baudrate}, "
f"mode={self.command_mode}, timeout={self.timeout_s}s"
f'VESC CAN driver ready — left={self.left_id} right={self.right_id}'
)
def _init_vesc(self) -> bool:
"""Initialize serial connection to VESC."""
# ------------------------------------------------------------------
def _cmd_vel_cb(self, msg: Twist):
self._last_cmd_time = time.monotonic()
linear = msg.linear.x
angular = msg.angular.z
left_speed = linear - (angular * self.wheel_sep / 2.0)
right_speed = linear + (angular * self.wheel_sep / 2.0)
left_duty = max(-1.0, min(1.0, left_speed / self.max_speed))
right_duty = max(-1.0, min(1.0, right_speed / self.max_speed))
self._send_duty(self.left_id, left_duty)
self._send_duty(self.right_id, right_duty)
def _watchdog_cb(self):
elapsed = time.monotonic() - self._last_cmd_time
if elapsed > self.cmd_timeout:
self._send_duty(self.left_id, 0.0)
self._send_duty(self.right_id, 0.0)
# ------------------------------------------------------------------
def _send_duty(self, vesc_id: int, duty: float):
"""Send CAN_PACKET_SET_DUTY to a VESC controller."""
can_id = make_can_id(CAN_PACKET_SET_DUTY, vesc_id)
payload = struct.pack('>i', int(duty * 100000))
msg = can.Message(
arbitration_id=can_id,
data=payload,
is_extended_id=True,
)
try:
if pyvesc is None:
self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc")
return False
self.bus.send(msg)
except can.CanError as e:
self.get_logger().error(f'CAN send error (id={vesc_id}): {e}')
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1,
)
self.vesc = pyvesc.VescUart(
serial_port=self.serial,
has_sensor=False, # No wheel speed sensor
start_heartbeat=True,
)
self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud")
return True
except (serial.SerialException, Exception) as e:
self.get_logger().error(f"Failed to initialize VESC: {e}")
return False
def _on_cmd_vel(self, msg: Twist) -> None:
"""Handle velocity command from /cmd_vel."""
if self.vesc is None:
return
try:
# Extract linear velocity (m/s) and angular velocity (rad/s)
linear_v = msg.linear.x
angular_v = msg.angular.z
# Convert velocity to VESC command based on mode
if self.command_mode == "duty_cycle":
# Map velocity to duty cycle (-100 to 100)
# Assuming max velocity ~5 m/s maps to ~100% duty
duty = max(-100, min(100, (linear_v / 5.0) * 100))
self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0
elif self.command_mode == "rpm":
# Map velocity to RPM
# Assuming max velocity 5 m/s → max_rpm
rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm))
self.vesc.set_rpm(int(rpm))
# Angular velocity could control steering if dual motors are used differently
# For now, it's ignored (single dual-motor ESC)
self.last_cmd_time = time.time()
except Exception as e:
self.get_logger().error(f"Error sending command to VESC: {e}")
def _poll_telemetry(self) -> None:
"""Poll VESC for telemetry and publish state."""
if self.vesc is None:
return
try:
# Read telemetry from VESC
if self.vesc.read_values():
# Update state from VESC data
self.state.voltage_v = self.vesc.data.v_in
self.state.current_a = self.vesc.data.current_in
self.state.rpm = self.vesc.data.rpm
self.state.temperature_c = self.vesc.data.temp_fet
self.state.fault_code = self.vesc.data.fault_code
self.state.timestamp = time.time()
# Check for timeout
if time.time() - self.last_cmd_time > self.timeout_s:
# No recent command, send zero
if self.command_mode == "duty_cycle":
self.vesc.set_duty(0.0)
else:
self.vesc.set_rpm(0)
# Publish state
self._publish_state()
else:
self.get_logger().warn("Failed to read VESC telemetry")
except Exception as e:
self.get_logger().error(f"Error polling VESC: {e}")
def _publish_state(self) -> None:
"""Publish VESC state as JSON."""
state_dict = {
"voltage_v": round(self.state.voltage_v, 2),
"current_a": round(self.state.current_a, 2),
"rpm": self.state.rpm,
"temperature_c": round(self.state.temperature_c, 1),
"fault_code": self.state.fault_code,
"timestamp": self.state.timestamp,
}
msg = String(data=json.dumps(state_dict))
self.pub_state.publish(msg)
self.pub_telemetry.publish(msg)
# Log fault codes
if self.state.fault_code != 0:
self.get_logger().warn(f"VESC fault code: {self.state.fault_code}")
def destroy_node(self):
self._send_duty(self.left_id, 0.0)
self._send_duty(self.right_id, 0.0)
self.bus.shutdown()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = VESCDriverNode()
node = VescCanDriver()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if node.vesc and node.serial:
node.vesc.set_duty(0.0) # Zero throttle on shutdown
node.serial.close()
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
if __name__ == '__main__':
main()

View File

@ -0,0 +1,29 @@
# VESC CAN Telemetry Node — SaltyBot dual FSESC 6.7 Pro (FW 6.6)
# SocketCAN interface: can0 (SN65HVD230 transceiver on MAMBA F722S CAN2)
vesc_telemetry:
ros__parameters:
# SocketCAN interface name
can_interface: "can0"
# CAN IDs assigned to each VESC in VESC Tool
# Left motor VESC: ID 61 (0x3D)
# Right motor VESC: ID 79 (0x4F)
left_can_id: 61
right_can_id: 79
# This node's CAN ID used as sender_id in GET_VALUES requests (0-127)
sender_can_id: 127
# Telemetry publish rate and GET_VALUES poll rate (10-50 Hz)
poll_rate_hz: 20
# Fault alert thresholds
# Phase current threshold — FSESC 6.7 Pro rated 80 A peak; alert at 60 A
overcurrent_threshold_a: 60.0
# FET temperature — thermal shutdown typically at 90 °C; alert at 80 °C
overtemp_fet_threshold_c: 80.0
# Motor temperature — alert at 100 °C
overtemp_motor_threshold_c: 100.0

View File

@ -0,0 +1,44 @@
"""Launch file for VESC CAN telemetry node."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_dir = get_package_share_directory("saltybot_vesc_telemetry")
config_file = os.path.join(pkg_dir, "config", "vesc_telemetry_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to vesc_telemetry_params.yaml",
),
DeclareLaunchArgument(
"can_interface",
default_value="can0",
description="SocketCAN interface (e.g. can0, vcan0 for testing)",
),
DeclareLaunchArgument(
"poll_rate_hz",
default_value="20",
description="Telemetry publish + GET_VALUES poll rate (10-50 Hz)",
),
Node(
package="saltybot_vesc_telemetry",
executable="vesc_telemetry_node",
name="vesc_telemetry",
output="screen",
parameters=[
LaunchConfiguration("config_file"),
{
"can_interface": LaunchConfiguration("can_interface"),
"poll_rate_hz": LaunchConfiguration("poll_rate_hz"),
},
],
),
])

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_vesc_telemetry</name>
<version>0.1.0</version>
<description>
VESC CAN telemetry node for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
Polls CAN IDs 61 (left) and 79 (right) via SocketCAN (python-can, can0).
Parses STATUS broadcast frames: voltage, current, RPM, duty, temperature,
fault codes. Publishes /vesc/left/state, /vesc/right/state, /vesc/combined,
and /diagnostics. Issues #645.
</description>
<maintainer email="sl-firmware@saltylab.local">sl-firmware</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>diagnostic_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,176 @@
"""VESC CAN protocol constants and frame parsers (FW 6.6).
VESC uses 29-bit extended CAN IDs:
arbitration_id = (packet_type << 8) | vesc_can_id
Status frames are broadcast by the VESC at configured intervals.
GET_VALUES requests use CAN_PACKET_PROCESS_SHORT_BUFFER.
"""
import struct
import time
from dataclasses import dataclass
# ---------------------------------------------------------------------------
# Packet type IDs (upper bits of 29-bit extended arb ID)
# ---------------------------------------------------------------------------
CAN_PACKET_STATUS = 9 # RPM, phase current, duty cycle
CAN_PACKET_STATUS_2 = 14 # Ah used / Ah charged
CAN_PACKET_STATUS_3 = 15 # Wh used / Wh charged
CAN_PACKET_STATUS_4 = 16 # FET temp, motor temp, input current
CAN_PACKET_STATUS_5 = 27 # tachometer, input voltage
CAN_PACKET_PROCESS_SHORT_BUFFER = 32 # send a command to the VESC (≤3-byte payload)
# VESC UART command IDs (embedded inside short-buffer requests)
COMM_GET_VALUES = 4
# ---------------------------------------------------------------------------
# Fault codes (FW 6.6)
# ---------------------------------------------------------------------------
FAULT_CODE_NONE = 0
FAULT_CODE_OVER_VOLTAGE = 1
FAULT_CODE_UNDER_VOLTAGE = 2
FAULT_CODE_DRV = 3
FAULT_CODE_ABS_OVER_CURRENT = 4
FAULT_CODE_OVER_TEMP_FET = 5
FAULT_CODE_OVER_TEMP_MOTOR = 6
FAULT_CODE_GATE_DRIVER_OVER_VOLTAGE = 7
FAULT_CODE_GATE_DRIVER_UNDER_VOLTAGE = 8
FAULT_CODE_MCU_UNDER_VOLTAGE = 9
FAULT_CODE_BOOTING_FROM_WATCHDOG_RESET = 10
FAULT_NAMES: dict[int, str] = {
0: "NONE",
1: "OVER_VOLTAGE",
2: "UNDER_VOLTAGE",
3: "DRV",
4: "ABS_OVER_CURRENT",
5: "OVER_TEMP_FET",
6: "OVER_TEMP_MOTOR",
7: "GATE_DRIVER_OVER_VOLTAGE",
8: "GATE_DRIVER_UNDER_VOLTAGE",
9: "MCU_UNDER_VOLTAGE",
10: "BOOTING_FROM_WATCHDOG_RESET",
}
# ---------------------------------------------------------------------------
# State dataclass
# ---------------------------------------------------------------------------
@dataclass
class VescState:
"""Telemetry snapshot for one VESC, populated from status broadcast frames."""
can_id: int = 0
# From STATUS (9)
rpm: int = 0
current_a: float = 0.0 # phase current (A)
duty_cycle: float = 0.0 # -1.0 .. +1.0
# From STATUS_4 (16)
temp_fet_c: float = 0.0
temp_motor_c: float = 0.0
current_in_a: float = 0.0 # battery/input current (A)
# From STATUS_5 (27)
voltage_v: float = 0.0
# Fault (embedded in STATUS per FW 6.6 — lower byte of duty word)
fault_code: int = 0
# Timestamps (monotonic seconds)
last_status_ts: float = 0.0
last_status5_ts: float = 0.0
ALIVE_TIMEOUT_S: float = 1.0 # mark offline if no STATUS frame for this long
@property
def fault_name(self) -> str:
return FAULT_NAMES.get(self.fault_code, f"UNKNOWN_{self.fault_code}")
@property
def is_alive(self) -> bool:
return (time.monotonic() - self.last_status_ts) < self.ALIVE_TIMEOUT_S
@property
def has_fault(self) -> bool:
return self.fault_code != FAULT_CODE_NONE
# ---------------------------------------------------------------------------
# Frame parsers
# ---------------------------------------------------------------------------
def parse_status(data: bytes) -> dict | None:
"""Parse CAN_PACKET_STATUS (9): RPM, phase current, duty cycle.
Wire format (8 bytes, all big-endian):
bytes [0:4] RPM : int32
bytes [4:6] current×10 : int16 ÷10 = A
bytes [6:8] duty×1000 : int16 ÷1000 = fraction (-1..+1)
"""
if len(data) < 8:
return None
rpm, current_x10, duty_x1000 = struct.unpack_from(">ihh", data, 0)
return {
"rpm": rpm,
"current_a": current_x10 / 10.0,
"duty_cycle": duty_x1000 / 1000.0,
}
def parse_status_4(data: bytes) -> dict | None:
"""Parse CAN_PACKET_STATUS_4 (16): temperatures and input current.
Wire format (8 bytes, all big-endian):
bytes [0:2] temp_fet×10 : int16 ÷10 = °C
bytes [2:4] temp_motor×10 : int16 ÷10 = °C
bytes [4:6] current_in×10 : int16 ÷10 = A
bytes [6:8] pid_pos×50 : int16 (not used here)
"""
if len(data) < 8:
return None
tf_x10, tm_x10, ci_x10, _pid = struct.unpack_from(">hhhh", data, 0)
return {
"temp_fet_c": tf_x10 / 10.0,
"temp_motor_c": tm_x10 / 10.0,
"current_in_a": ci_x10 / 10.0,
}
def parse_status_5(data: bytes) -> dict | None:
"""Parse CAN_PACKET_STATUS_5 (27): input voltage (and tachometer).
Wire format (8 bytes, all big-endian):
bytes [0:4] tachometer : int32 (unused here)
bytes [4:6] v_in×10 : int16 ÷10 = V
bytes [6:8] reserved
"""
if len(data) < 6:
return None
_tacho, v_in_x10 = struct.unpack_from(">ih", data, 0)
return {
"voltage_v": v_in_x10 / 10.0,
}
# ---------------------------------------------------------------------------
# Request builder
# ---------------------------------------------------------------------------
def make_get_values_request(sender_id: int, target_id: int) -> tuple[int, bytes]:
"""Build a CAN_PACKET_PROCESS_SHORT_BUFFER GET_VALUES request frame.
The VESC responds by refreshing its STATUS broadcast cadence.
Args:
sender_id: This node's CAN ID (0-127; use 127 for a host controller).
target_id: CAN ID of the target VESC.
Returns:
(arbitration_id, data) pass directly to can.Message().
"""
arb_id = (CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | (target_id & 0xFF)
payload = bytes([sender_id & 0xFF, 0x00, COMM_GET_VALUES])
return arb_id, payload

View File

@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""VESC CAN telemetry node for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
Listens on SocketCAN (can0) for VESC status broadcast frames from two VESCs
and publishes parsed telemetry to ROS2 topics. Also sends periodic GET_VALUES
requests to trigger fresh STATUS broadcasts.
Published topics
----------------
/vesc/left/state (std_msgs/String) JSON telemetry for left VESC
/vesc/right/state (std_msgs/String) JSON telemetry for right VESC
/vesc/combined (std_msgs/String) voltage, total current, both RPMs
/diagnostics (diagnostic_msgs/DiagnosticArray)
Parameters
----------
can_interface str 'can0' SocketCAN interface
left_can_id int 61 CAN ID of left VESC
right_can_id int 79 CAN ID of right VESC
poll_rate_hz int 20 Publish + request rate (10-50 Hz)
sender_can_id int 127 This node's CAN ID
overcurrent_threshold_a float 60.0 Fault alert threshold (A)
overtemp_fet_threshold_c float 80.0 FET overtemp alert threshold (°C)
overtemp_motor_threshold_c float 100.0 Motor overtemp alert threshold (°C)
"""
import json
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
try:
import can
_CAN_AVAILABLE = True
except ImportError:
_CAN_AVAILABLE = False
from .vesc_can_protocol import (
CAN_PACKET_STATUS,
CAN_PACKET_STATUS_4,
CAN_PACKET_STATUS_5,
VescState,
make_get_values_request,
parse_status,
parse_status_4,
parse_status_5,
)
class VescTelemetryNode(Node):
"""ROS2 node: VESC CAN telemetry via SocketCAN."""
def __init__(self):
super().__init__("vesc_telemetry")
# ----------------------------------------------------------------
# Parameters
# ----------------------------------------------------------------
self.declare_parameter("can_interface", "can0")
self.declare_parameter("left_can_id", 61)
self.declare_parameter("right_can_id", 79)
self.declare_parameter("poll_rate_hz", 20)
self.declare_parameter("sender_can_id", 127)
self.declare_parameter("overcurrent_threshold_a", 60.0)
self.declare_parameter("overtemp_fet_threshold_c", 80.0)
self.declare_parameter("overtemp_motor_threshold_c", 100.0)
self._iface = self.get_parameter("can_interface").value
self._left_id = self.get_parameter("left_can_id").value
self._right_id = self.get_parameter("right_can_id").value
hz = int(max(10, min(50, self.get_parameter("poll_rate_hz").value)))
self._sender_id = self.get_parameter("sender_can_id").value
self._oc_thresh = self.get_parameter("overcurrent_threshold_a").value
self._otf_thresh = self.get_parameter("overtemp_fet_threshold_c").value
self._otm_thresh = self.get_parameter("overtemp_motor_threshold_c").value
# ----------------------------------------------------------------
# Per-VESC state (protected by _state_lock)
# ----------------------------------------------------------------
self._state: dict[int, VescState] = {
self._left_id: VescState(can_id=self._left_id),
self._right_id: VescState(can_id=self._right_id),
}
self._state_lock = threading.Lock()
# Track which faults we have already logged to avoid log spam
self._fault_logged: dict[int, int] = {
self._left_id: 0, self._right_id: 0
}
# ----------------------------------------------------------------
# Publishers
# ----------------------------------------------------------------
self._pub_left = self.create_publisher(String, "/vesc/left/state", 10)
self._pub_right = self.create_publisher(String, "/vesc/right/state", 10)
self._pub_combined = self.create_publisher(String, "/vesc/combined", 10)
self._pub_diag = self.create_publisher(DiagnosticArray, "/diagnostics", 10)
# ----------------------------------------------------------------
# SocketCAN
# ----------------------------------------------------------------
self._bus: Optional["can.BusABC"] = None
self._rx_thread: Optional[threading.Thread] = None
self._running = False
self._init_can()
# ----------------------------------------------------------------
# Publish + poll timer
# ----------------------------------------------------------------
self.create_timer(1.0 / hz, self._on_timer)
self.get_logger().info(
f"vesc_telemetry: iface={self._iface}, "
f"left_id={self._left_id}, right_id={self._right_id}, "
f"poll={hz} Hz, sender_id={self._sender_id}"
)
# ----------------------------------------------------------------
# CAN initialisation
# ----------------------------------------------------------------
def _init_can(self) -> None:
if not _CAN_AVAILABLE:
self.get_logger().error(
"python-can not installed — run: pip install python-can"
)
return
try:
self._bus = can.interface.Bus(channel=self._iface, bustype="socketcan")
self._running = True
self._rx_thread = threading.Thread(
target=self._rx_loop, name="vesc_can_rx", daemon=True
)
self._rx_thread.start()
self.get_logger().info(f"SocketCAN opened: {self._iface}")
except Exception as exc:
self.get_logger().error(f"Failed to open {self._iface}: {exc}")
# ----------------------------------------------------------------
# Receive loop (background thread)
# ----------------------------------------------------------------
def _rx_loop(self) -> None:
"""Drain SocketCAN frames and update VESC state."""
while self._running and self._bus is not None:
try:
msg = self._bus.recv(timeout=0.1)
except Exception:
continue
if msg is None or not msg.is_extended_id:
continue
self._dispatch_frame(msg.arbitration_id, bytes(msg.data))
def _dispatch_frame(self, arb_id: int, data: bytes) -> None:
"""Decode extended CAN ID and update the matching VescState."""
packet_type = (arb_id >> 8) & 0xFFFF
vesc_id = arb_id & 0xFF
if vesc_id not in self._state:
return
now = time.monotonic()
with self._state_lock:
s = self._state[vesc_id]
if packet_type == CAN_PACKET_STATUS:
parsed = parse_status(data)
if parsed:
s.rpm = parsed["rpm"]
s.current_a = parsed["current_a"]
s.duty_cycle = parsed["duty_cycle"]
s.last_status_ts = now
elif packet_type == CAN_PACKET_STATUS_4:
parsed = parse_status_4(data)
if parsed:
s.temp_fet_c = parsed["temp_fet_c"]
s.temp_motor_c = parsed["temp_motor_c"]
s.current_in_a = parsed["current_in_a"]
elif packet_type == CAN_PACKET_STATUS_5:
parsed = parse_status_5(data)
if parsed:
s.voltage_v = parsed["voltage_v"]
s.last_status5_ts = now
# ----------------------------------------------------------------
# Timer: poll + publish
# ----------------------------------------------------------------
def _on_timer(self) -> None:
self._send_poll_requests()
self._publish_all()
def _send_poll_requests(self) -> None:
"""Send GET_VALUES to both VESCs to prompt STATUS broadcast refresh."""
if self._bus is None:
return
for vesc_id in (self._left_id, self._right_id):
try:
arb_id, payload = make_get_values_request(self._sender_id, vesc_id)
self._bus.send(can.Message(
arbitration_id=arb_id,
data=payload,
is_extended_id=True,
))
except Exception as exc:
self.get_logger().debug(
f"GET_VALUES send failed (vesc_id={vesc_id}): {exc}"
)
def _publish_all(self) -> None:
with self._state_lock:
left = self._state[self._left_id]
right = self._state[self._right_id]
# Snapshot dicts while holding lock
l_dict = self._state_to_dict(left)
r_dict = self._state_to_dict(right)
left_alive = left.is_alive
right_alive = right.is_alive
voltage_v = left.voltage_v if left_alive else right.voltage_v
combined = {
"voltage_v": round(voltage_v, 2),
"total_current_a": round(left.current_in_a + right.current_in_a, 2),
"left_rpm": left.rpm,
"right_rpm": right.rpm,
"left_alive": left_alive,
"right_alive": right_alive,
"stamp": time.time(),
}
# Build diagnostics while holding lock so values are consistent
diag_array = self._build_diagnostics(left, right)
self._pub_left.publish(String(data=json.dumps(l_dict)))
self._pub_right.publish(String(data=json.dumps(r_dict)))
self._pub_combined.publish(String(data=json.dumps(combined)))
self._pub_diag.publish(diag_array)
# Fault alerting (outside lock — uses snapshots)
self._check_faults_from_dict(l_dict, self._left_id, "left")
self._check_faults_from_dict(r_dict, self._right_id, "right")
@staticmethod
def _state_to_dict(s: VescState) -> dict:
return {
"can_id": s.can_id,
"rpm": s.rpm,
"current_a": round(s.current_a, 2),
"current_in_a": round(s.current_in_a, 2),
"duty_cycle": round(s.duty_cycle, 4),
"voltage_v": round(s.voltage_v, 2),
"temp_fet_c": round(s.temp_fet_c, 1),
"temp_motor_c": round(s.temp_motor_c, 1),
"fault_code": s.fault_code,
"fault_name": s.fault_name,
"alive": s.is_alive,
"stamp": time.time(),
}
def _build_diagnostics(
self, left: VescState, right: VescState
) -> DiagnosticArray:
array = DiagnosticArray()
array.header.stamp = self.get_clock().now().to_msg()
for s, label in ((left, "left"), (right, "right")):
status = DiagnosticStatus()
status.name = f"VESC/{label} (CAN ID {s.can_id})"
status.hardware_id = f"vesc_can_{s.can_id}"
if not s.is_alive:
status.level = DiagnosticStatus.ERROR
status.message = "No CAN frames received (offline)"
elif s.has_fault:
status.level = DiagnosticStatus.ERROR
status.message = f"Fault: {s.fault_name}"
elif (
s.temp_fet_c > self._otf_thresh
or s.temp_motor_c > self._otm_thresh
or abs(s.current_a) > self._oc_thresh
):
status.level = DiagnosticStatus.WARN
status.message = "Threshold exceeded"
else:
status.level = DiagnosticStatus.OK
status.message = "OK"
status.values = [
KeyValue(key="rpm", value=str(s.rpm)),
KeyValue(key="current_a", value=f"{s.current_a:.2f}"),
KeyValue(key="current_in_a", value=f"{s.current_in_a:.2f}"),
KeyValue(key="duty_cycle", value=f"{s.duty_cycle:.4f}"),
KeyValue(key="voltage_v", value=f"{s.voltage_v:.2f}"),
KeyValue(key="temp_fet_c", value=f"{s.temp_fet_c:.1f}"),
KeyValue(key="temp_motor_c", value=f"{s.temp_motor_c:.1f}"),
KeyValue(key="fault_code", value=str(s.fault_code)),
KeyValue(key="fault_name", value=s.fault_name),
]
array.status.append(status)
return array
def _check_faults_from_dict(self, d: dict, vesc_id: int, label: str) -> None:
"""Log fault/threshold warnings; suppress repeated identical messages."""
if not d["alive"]:
return
if abs(d["current_a"]) > self._oc_thresh:
self.get_logger().warn(
f"VESC {label}: overcurrent {d['current_a']:.1f} A "
f"(threshold {self._oc_thresh:.0f} A)"
)
if d["temp_fet_c"] > self._otf_thresh:
self.get_logger().warn(
f"VESC {label}: FET overtemp {d['temp_fet_c']:.1f} °C "
f"(threshold {self._otf_thresh:.0f} °C)"
)
if d["temp_motor_c"] > self._otm_thresh:
self.get_logger().warn(
f"VESC {label}: motor overtemp {d['temp_motor_c']:.1f} °C "
f"(threshold {self._otm_thresh:.0f} °C)"
)
fault_code = d["fault_code"]
if fault_code != 0 and self._fault_logged.get(vesc_id) != fault_code:
self.get_logger().error(
f"VESC {label}: fault {fault_code} ({d['fault_name']})"
)
self._fault_logged[vesc_id] = fault_code
elif fault_code == 0 and self._fault_logged.get(vesc_id, 0) != 0:
self.get_logger().info(f"VESC {label}: fault cleared")
self._fault_logged[vesc_id] = 0
# ----------------------------------------------------------------
# Cleanup
# ----------------------------------------------------------------
def destroy_node(self) -> None:
self._running = False
if self._rx_thread is not None:
self._rx_thread.join(timeout=1.0)
if self._bus is not None:
self._bus.shutdown()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = VescTelemetryNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_vesc_telemetry
[install]
install_scripts=$base/lib/saltybot_vesc_telemetry

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_vesc_telemetry"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/vesc_telemetry.launch.py"]),
(f"share/{package_name}/config", ["config/vesc_telemetry_params.yaml"]),
],
install_requires=["setuptools", "python-can"],
zip_safe=True,
maintainer="sl-firmware",
maintainer_email="sl-firmware@saltylab.local",
description="VESC CAN telemetry node for dual FSESC 6.7 Pro",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"vesc_telemetry_node = saltybot_vesc_telemetry.vesc_telemetry_node:main",
],
},
)

View File

@ -0,0 +1,240 @@
"""Unit tests for VESC CAN telemetry — protocol parsing and state logic.
These tests cover pure functions in vesc_can_protocol and do not require
ROS2, python-can, or a live CAN bus.
"""
import struct
import time
import pytest
from saltybot_vesc_telemetry.vesc_can_protocol import (
CAN_PACKET_STATUS,
CAN_PACKET_STATUS_4,
CAN_PACKET_STATUS_5,
CAN_PACKET_PROCESS_SHORT_BUFFER,
COMM_GET_VALUES,
FAULT_CODE_NONE,
FAULT_CODE_ABS_OVER_CURRENT,
FAULT_CODE_OVER_TEMP_FET,
FAULT_CODE_OVER_TEMP_MOTOR,
FAULT_NAMES,
VescState,
make_get_values_request,
parse_status,
parse_status_4,
parse_status_5,
)
# ---------------------------------------------------------------------------
# parse_status (CAN_PACKET_STATUS = 9)
# ---------------------------------------------------------------------------
class TestParseStatus:
def test_forward_rpm(self):
# 1000 RPM, 5.0 A, 0.300 duty
data = struct.pack(">ihh", 1000, 50, 300)
r = parse_status(data)
assert r is not None
assert r["rpm"] == 1000
assert r["current_a"] == pytest.approx(5.0)
assert r["duty_cycle"] == pytest.approx(0.300)
def test_reverse_rpm(self):
data = struct.pack(">ihh", -3000, -80, -500)
r = parse_status(data)
assert r["rpm"] == -3000
assert r["current_a"] == pytest.approx(-8.0)
assert r["duty_cycle"] == pytest.approx(-0.5)
def test_zero(self):
data = struct.pack(">ihh", 0, 0, 0)
r = parse_status(data)
assert r["rpm"] == 0
assert r["current_a"] == pytest.approx(0.0)
assert r["duty_cycle"] == pytest.approx(0.0)
def test_max_duty(self):
# duty 1.0 → encoded as 1000
data = struct.pack(">ihh", 50000, 200, 1000)
r = parse_status(data)
assert r["duty_cycle"] == pytest.approx(1.0)
def test_too_short_returns_none(self):
assert parse_status(b"\x00\x01\x02") is None
def test_exact_minimum_length(self):
data = struct.pack(">ihh", 42, 10, 100)
assert parse_status(data) is not None
# ---------------------------------------------------------------------------
# parse_status_4 (CAN_PACKET_STATUS_4 = 16)
# ---------------------------------------------------------------------------
class TestParseStatus4:
def _make(self, temp_fet, temp_motor, current_in, pid_pos=0):
return struct.pack(
">hhhh",
int(temp_fet * 10),
int(temp_motor * 10),
int(current_in * 10),
pid_pos,
)
def test_normal_temps(self):
data = self._make(45.5, 62.3, 12.0)
r = parse_status_4(data)
assert r is not None
assert r["temp_fet_c"] == pytest.approx(45.5)
assert r["temp_motor_c"] == pytest.approx(62.3)
assert r["current_in_a"] == pytest.approx(12.0)
def test_negative_current_in(self):
data = self._make(30.0, 40.0, -5.0)
r = parse_status_4(data)
assert r["current_in_a"] == pytest.approx(-5.0)
def test_too_short_returns_none(self):
assert parse_status_4(b"\x00" * 7) is None
def test_zero_values(self):
data = self._make(0, 0, 0)
r = parse_status_4(data)
assert r["temp_fet_c"] == pytest.approx(0.0)
assert r["temp_motor_c"] == pytest.approx(0.0)
assert r["current_in_a"] == pytest.approx(0.0)
# ---------------------------------------------------------------------------
# parse_status_5 (CAN_PACKET_STATUS_5 = 27)
# ---------------------------------------------------------------------------
class TestParseStatus5:
def _make(self, voltage_v, tacho=0):
return struct.pack(">ih", tacho, int(voltage_v * 10))
def test_nominal_voltage(self):
data = self._make(25.2)
r = parse_status_5(data)
assert r is not None
assert r["voltage_v"] == pytest.approx(25.2)
def test_low_voltage(self):
data = self._make(18.6)
r = parse_status_5(data)
assert r["voltage_v"] == pytest.approx(18.6)
def test_too_short_returns_none(self):
assert parse_status_5(b"\x00\x01\x02\x03\x04") is None
def test_with_tachometer(self):
data = struct.pack(">ih", 123456, 252)
r = parse_status_5(data)
assert r["voltage_v"] == pytest.approx(25.2)
# ---------------------------------------------------------------------------
# make_get_values_request
# ---------------------------------------------------------------------------
class TestMakeGetValuesRequest:
def test_arb_id_encodes_packet_type_and_target(self):
arb_id, payload = make_get_values_request(sender_id=127, target_id=61)
expected_arb = (CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | 61
assert arb_id == expected_arb
def test_payload_contains_sender_command(self):
_, payload = make_get_values_request(sender_id=127, target_id=61)
assert len(payload) == 3
assert payload[0] == 127 # sender_id
assert payload[1] == 0x00 # send_mode
assert payload[2] == COMM_GET_VALUES
def test_right_vesc(self):
arb_id, payload = make_get_values_request(sender_id=127, target_id=79)
assert (arb_id & 0xFF) == 79
assert payload[2] == COMM_GET_VALUES
def test_sender_id_in_payload(self):
_, payload = make_get_values_request(sender_id=42, target_id=61)
assert payload[0] == 42
def test_arb_id_is_extended(self):
# Extended IDs can exceed 0x7FF (11-bit limit)
arb_id, _ = make_get_values_request(127, 61)
assert arb_id > 0x7FF
# ---------------------------------------------------------------------------
# VescState properties
# ---------------------------------------------------------------------------
class TestVescState:
def test_defaults(self):
s = VescState(can_id=61)
assert s.rpm == 0
assert s.current_a == pytest.approx(0.0)
assert s.voltage_v == pytest.approx(0.0)
assert s.fault_code == FAULT_CODE_NONE
assert s.has_fault is False
def test_is_alive_fresh(self):
s = VescState(can_id=61)
s.last_status_ts = time.monotonic()
assert s.is_alive is True
def test_is_alive_stale(self):
s = VescState(can_id=61)
s.last_status_ts = time.monotonic() - 2.0 # 2 s ago > 1 s timeout
assert s.is_alive is False
def test_is_alive_never_received(self):
s = VescState(can_id=61)
# last_status_ts defaults to 0.0 — monotonic() will be >> 1 s
assert s.is_alive is False
def test_has_fault_true(self):
s = VescState(can_id=61)
s.fault_code = FAULT_CODE_ABS_OVER_CURRENT
assert s.has_fault is True
def test_has_fault_false_on_none(self):
s = VescState(can_id=61)
s.fault_code = FAULT_CODE_NONE
assert s.has_fault is False
def test_fault_name_known(self):
s = VescState(can_id=61)
s.fault_code = FAULT_CODE_OVER_TEMP_FET
assert s.fault_name == "OVER_TEMP_FET"
def test_fault_name_unknown(self):
s = VescState(can_id=61)
s.fault_code = 99
assert "UNKNOWN" in s.fault_name
def test_fault_name_none(self):
s = VescState(can_id=61)
assert s.fault_name == "NONE"
# ---------------------------------------------------------------------------
# FAULT_NAMES completeness
# ---------------------------------------------------------------------------
class TestFaultNames:
def test_all_common_codes_named(self):
for code in (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10):
assert code in FAULT_NAMES, f"fault code {code} missing from FAULT_NAMES"
def test_none_is_zero(self):
assert FAULT_NAMES[0] == "NONE"
def test_overcurrent_code(self):
assert FAULT_NAMES[FAULT_CODE_ABS_OVER_CURRENT] == "ABS_OVER_CURRENT"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

52
jetson/scripts/can_setup.sh Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env bash
# can_setup.sh — Bring up CANable 2.0 (gs_usb) as can0 at 500 kbps
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
#
# Usage:
# sudo ./can_setup.sh # bring up
# sudo ./can_setup.sh down # bring down
# sudo ./can_setup.sh verify # candump one-shot check (Ctrl-C to stop)
#
# VESCs on bus: CAN ID 61 (0x3D) and CAN ID 79 (0x4F), 500 kbps
set -euo pipefail
IFACE="${CAN_IFACE:-can0}"
BITRATE="${CAN_BITRATE:-500000}"
log() { echo "[can_setup] $*"; }
die() { echo "[can_setup] ERROR: $*" >&2; exit 1; }
cmd="${1:-up}"
case "$cmd" in
up)
# Verify the interface exists (gs_usb module loaded by kernel on plug-in)
if ! ip link show "$IFACE" &>/dev/null; then
die "$IFACE not found — is CANable 2.0 plugged in and gs_usb loaded?"
fi
log "Bringing up $IFACE at ${BITRATE} bps..."
ip link set "$IFACE" down 2>/dev/null || true
ip link set "$IFACE" up type can bitrate "$BITRATE"
ip link set "$IFACE" up
log "$IFACE is up."
ip -details link show "$IFACE"
;;
down)
log "Bringing down $IFACE..."
ip link set "$IFACE" down
log "$IFACE is down."
;;
verify)
log "Listening on $IFACE — expecting frames from VESC IDs 0x3D (61) and 0x4F (79)"
log "Press Ctrl-C to stop."
exec candump "$IFACE"
;;
*)
echo "Usage: $0 [up|down|verify]"
exit 1
;;
esac

View File

@ -7,6 +7,9 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_DIR="$(dirname "${SCRIPT_DIR}")"
SYSTEMD_DIR="/etc/systemd/system"
DEPLOY_DIR="/opt/saltybot/jetson"
UDEV_DIR="/etc/udev/rules.d"
BRINGUP_SYSTEMD="${REPO_DIR}/ros2_ws/src/saltybot_bringup/systemd"
BRINGUP_UDEV="${REPO_DIR}/ros2_ws/src/saltybot_bringup/udev"
log() { echo "[install_systemd] $*"; }
@ -23,15 +26,30 @@ log "Installing systemd units..."
cp "${SCRIPT_DIR}/saltybot.target" "${SYSTEMD_DIR}/"
cp "${SCRIPT_DIR}/saltybot-social.service" "${SYSTEMD_DIR}/"
cp "${SCRIPT_DIR}/tailscale-vpn.service" "${SYSTEMD_DIR}/"
cp "${BRINGUP_SYSTEMD}/can-bringup.service" "${SYSTEMD_DIR}/"
# Install udev rules
log "Installing udev rules..."
mkdir -p "${UDEV_DIR}"
cp "${BRINGUP_UDEV}/70-canable.rules" "${UDEV_DIR}/"
cp "${BRINGUP_UDEV}/90-magedok-touch.rules" "${UDEV_DIR}/"
udevadm control --reload
udevadm trigger --subsystem-match=net --action=add
# Reload and enable
systemctl daemon-reload
systemctl enable saltybot.target
systemctl enable saltybot-social.service
systemctl enable tailscale-vpn.service
systemctl enable can-bringup.service
log "Services installed. Start with:"
log " systemctl start saltybot-social"
log " systemctl start tailscale-vpn"
log " systemctl start can-bringup"
log " journalctl -fu saltybot-social"
log " journalctl -fu tailscale-vpn"
log " journalctl -fu can-bringup"
log ""
log "Verify CAN bus: candump can0"
log " VESC CAN IDs: 61 (0x3D) and 79 (0x4F)"