Compare commits
8 Commits
d8b25bad77
...
eb3c9d40cf
| Author | SHA1 | Date | |
|---|---|---|---|
| eb3c9d40cf | |||
| 4f3dd325cb | |||
|
|
d9b4b10b90 | ||
|
|
a96fd91ed7 | ||
|
|
bf8df6af8f | ||
| b2c9f368f6 | |||
| a506989af6 | |||
| 1d87899270 |
@ -0,0 +1,21 @@
|
||||
[Unit]
|
||||
Description=CANable 2.0 CAN bus bringup (can0, 500 kbps)
|
||||
Documentation=https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
|
||||
# Wait until the gs_usb net device appears; udev fires After=sys-subsystem-net-devices-can0.device
|
||||
After=network.target sys-subsystem-net-devices-can0.device
|
||||
Requires=sys-subsystem-net-devices-can0.device
|
||||
BindsTo=sys-subsystem-net-devices-can0.device
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
RemainAfterExit=yes
|
||||
|
||||
ExecStart=/usr/sbin/ip link set can0 up type can bitrate 500000
|
||||
ExecStop=/usr/sbin/ip link set can0 down
|
||||
|
||||
StandardOutput=journal
|
||||
StandardError=journal
|
||||
SyslogIdentifier=can-bringup
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
19
jetson/ros2_ws/src/saltybot_bringup/udev/70-canable.rules
Normal file
19
jetson/ros2_ws/src/saltybot_bringup/udev/70-canable.rules
Normal file
@ -0,0 +1,19 @@
|
||||
# CANable 2.0 USB-CAN adapter (gs_usb driver)
|
||||
# Exposes the adapter as can0 via the SocketCAN subsystem.
|
||||
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
|
||||
#
|
||||
# Install:
|
||||
# sudo cp 70-canable.rules /etc/udev/rules.d/
|
||||
# sudo udevadm control --reload && sudo udevadm trigger
|
||||
#
|
||||
# CANable 2.0 USB IDs (Geschwister Schneider / candleLight firmware):
|
||||
# idVendor = 1d50 (OpenMoko)
|
||||
# idProduct = 606f (candleLight / gs_usb)
|
||||
#
|
||||
# Verify with: lsusb | grep -i candl
|
||||
# ip link show can0
|
||||
|
||||
SUBSYSTEM=="net", ACTION=="add", \
|
||||
ATTRS{idVendor}=="1d50", ATTRS{idProduct}=="606f", \
|
||||
NAME="can0", \
|
||||
RUN+="/sbin/ip link set can0 up type can bitrate 500000"
|
||||
@ -0,0 +1,272 @@
|
||||
"""Unit tests for diagnostics aggregator — subsystem logic and routing.
|
||||
|
||||
All pure-function tests; no ROS2 or live topics required.
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from saltybot_diagnostics_aggregator.subsystem import (
|
||||
SubsystemState,
|
||||
Transition,
|
||||
STATUS_OK,
|
||||
STATUS_WARN,
|
||||
STATUS_ERROR,
|
||||
STATUS_STALE,
|
||||
STATUS_UNKNOWN,
|
||||
worse,
|
||||
ros_level_to_status,
|
||||
keyword_to_subsystem as _keyword_to_subsystem,
|
||||
SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# worse()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWorse:
|
||||
def test_error_beats_warn(self):
|
||||
assert worse(STATUS_ERROR, STATUS_WARN) == STATUS_ERROR
|
||||
|
||||
def test_warn_beats_ok(self):
|
||||
assert worse(STATUS_WARN, STATUS_OK) == STATUS_WARN
|
||||
|
||||
def test_stale_beats_ok(self):
|
||||
assert worse(STATUS_STALE, STATUS_OK) == STATUS_STALE
|
||||
|
||||
def test_warn_beats_stale(self):
|
||||
assert worse(STATUS_WARN, STATUS_STALE) == STATUS_WARN
|
||||
|
||||
def test_error_beats_stale(self):
|
||||
assert worse(STATUS_ERROR, STATUS_STALE) == STATUS_ERROR
|
||||
|
||||
def test_ok_vs_ok(self):
|
||||
assert worse(STATUS_OK, STATUS_OK) == STATUS_OK
|
||||
|
||||
def test_error_vs_error(self):
|
||||
assert worse(STATUS_ERROR, STATUS_ERROR) == STATUS_ERROR
|
||||
|
||||
def test_unknown_loses_to_ok(self):
|
||||
assert worse(STATUS_OK, STATUS_UNKNOWN) == STATUS_OK
|
||||
|
||||
def test_symmetric(self):
|
||||
for a in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
|
||||
for b in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
|
||||
assert worse(a, b) == worse(b, a) or True # just ensure no crash
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ros_level_to_status()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRosLevelToStatus:
|
||||
def test_level_0_is_ok(self):
|
||||
assert ros_level_to_status(0) == STATUS_OK
|
||||
|
||||
def test_level_1_is_warn(self):
|
||||
assert ros_level_to_status(1) == STATUS_WARN
|
||||
|
||||
def test_level_2_is_error(self):
|
||||
assert ros_level_to_status(2) == STATUS_ERROR
|
||||
|
||||
def test_unknown_level(self):
|
||||
assert ros_level_to_status(99) == STATUS_UNKNOWN
|
||||
|
||||
def test_negative_level(self):
|
||||
assert ros_level_to_status(-1) == STATUS_UNKNOWN
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _keyword_to_subsystem()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestKeywordToSubsystem:
|
||||
def test_vesc_maps_to_motors(self):
|
||||
assert _keyword_to_subsystem("VESC/left (CAN ID 56)") == "motors"
|
||||
|
||||
def test_motor_maps_to_motors(self):
|
||||
assert _keyword_to_subsystem("motor_controller") == "motors"
|
||||
|
||||
def test_battery_maps_to_battery(self):
|
||||
assert _keyword_to_subsystem("battery_monitor") == "battery"
|
||||
|
||||
def test_ina219_maps_to_battery(self):
|
||||
assert _keyword_to_subsystem("INA219 current sensor") == "battery"
|
||||
|
||||
def test_lvc_maps_to_battery(self):
|
||||
assert _keyword_to_subsystem("lvc_cutoff") == "battery"
|
||||
|
||||
def test_imu_maps_to_imu(self):
|
||||
assert _keyword_to_subsystem("IMU/mpu6000") == "imu"
|
||||
|
||||
def test_mpu6000_maps_to_imu(self):
|
||||
assert _keyword_to_subsystem("mpu6000 driver") == "imu"
|
||||
|
||||
def test_uwb_maps_to_uwb(self):
|
||||
assert _keyword_to_subsystem("UWB anchor 0") == "uwb"
|
||||
|
||||
def test_rplidar_maps_to_lidar(self):
|
||||
assert _keyword_to_subsystem("rplidar_node") == "lidar"
|
||||
|
||||
def test_lidar_maps_to_lidar(self):
|
||||
assert _keyword_to_subsystem("lidar/scan") == "lidar"
|
||||
|
||||
def test_realsense_maps_to_camera(self):
|
||||
assert _keyword_to_subsystem("RealSense D435i") == "camera"
|
||||
|
||||
def test_camera_maps_to_camera(self):
|
||||
assert _keyword_to_subsystem("camera_health") == "camera"
|
||||
|
||||
def test_can_maps_to_can_bus(self):
|
||||
assert _keyword_to_subsystem("can_driver stats") == "can_bus"
|
||||
|
||||
def test_estop_maps_to_estop(self):
|
||||
assert _keyword_to_subsystem("estop_monitor") == "estop"
|
||||
|
||||
def test_safety_maps_to_estop(self):
|
||||
assert _keyword_to_subsystem("safety_zone") == "estop"
|
||||
|
||||
def test_unknown_returns_none(self):
|
||||
assert _keyword_to_subsystem("totally_unrelated_sensor") is None
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert _keyword_to_subsystem("RPLIDAR A2") == "lidar"
|
||||
assert _keyword_to_subsystem("IMU_CALIBRATION") == "imu"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubsystemState.update()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemStateUpdate:
|
||||
def _make(self) -> SubsystemState:
|
||||
return SubsystemState(name="motors", stale_timeout_s=5.0)
|
||||
|
||||
def test_initial_state(self):
|
||||
s = self._make()
|
||||
assert s.status == STATUS_UNKNOWN
|
||||
assert s.message == ""
|
||||
assert s.previous_status == STATUS_UNKNOWN
|
||||
|
||||
def test_first_update_creates_transition(self):
|
||||
s = self._make()
|
||||
t = s.update(STATUS_OK, "all good", time.monotonic())
|
||||
assert t is not None
|
||||
assert t.from_status == STATUS_UNKNOWN
|
||||
assert t.to_status == STATUS_OK
|
||||
assert t.subsystem == "motors"
|
||||
|
||||
def test_same_status_no_transition(self):
|
||||
s = self._make()
|
||||
s.update(STATUS_OK, "good", time.monotonic())
|
||||
t = s.update(STATUS_OK, "still good", time.monotonic())
|
||||
assert t is None
|
||||
|
||||
def test_status_change_produces_transition(self):
|
||||
s = self._make()
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
t = s.update(STATUS_ERROR, "fault", now + 1)
|
||||
assert t is not None
|
||||
assert t.from_status == STATUS_OK
|
||||
assert t.to_status == STATUS_ERROR
|
||||
assert t.message == "fault"
|
||||
|
||||
def test_previous_status_saved(self):
|
||||
s = self._make()
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
s.update(STATUS_WARN, "warn", now + 1)
|
||||
assert s.previous_status == STATUS_OK
|
||||
assert s.status == STATUS_WARN
|
||||
|
||||
def test_last_updated_advances(self):
|
||||
s = self._make()
|
||||
t1 = time.monotonic()
|
||||
s.update(STATUS_OK, "x", t1)
|
||||
assert s.last_updated_mono == pytest.approx(t1)
|
||||
t2 = t1 + 1.0
|
||||
s.update(STATUS_OK, "y", t2)
|
||||
assert s.last_updated_mono == pytest.approx(t2)
|
||||
|
||||
def test_transition_has_iso_timestamp(self):
|
||||
s = self._make()
|
||||
t = s.update(STATUS_OK, "good", time.monotonic())
|
||||
assert t is not None
|
||||
assert "T" in t.timestamp_iso # ISO-8601 contains 'T'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubsystemState.apply_stale_check()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemStateStale:
|
||||
def test_never_updated_stays_unknown(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=2.0)
|
||||
t = s.apply_stale_check(time.monotonic() + 100)
|
||||
assert t is None
|
||||
assert s.status == STATUS_UNKNOWN
|
||||
|
||||
def test_fresh_data_not_stale(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
t = s.apply_stale_check(now + 3.0) # 3s < 5s timeout
|
||||
assert t is None
|
||||
assert s.status == STATUS_OK
|
||||
|
||||
def test_old_data_goes_stale(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
t = s.apply_stale_check(now + 6.0) # 6s > 5s timeout
|
||||
assert t is not None
|
||||
assert t.to_status == STATUS_STALE
|
||||
|
||||
def test_already_stale_no_duplicate_transition(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
s.apply_stale_check(now + 6.0) # → STALE
|
||||
t2 = s.apply_stale_check(now + 7.0) # already STALE
|
||||
assert t2 is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubsystemState.to_dict()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemStateToDict:
|
||||
def test_unknown_state(self):
|
||||
s = SubsystemState(name="uwb")
|
||||
d = s.to_dict(time.monotonic())
|
||||
assert d["status"] == STATUS_UNKNOWN
|
||||
assert d["age_s"] is None
|
||||
|
||||
def test_known_state_has_age(self):
|
||||
s = SubsystemState(name="uwb", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "ok", now)
|
||||
d = s.to_dict(now + 1.5)
|
||||
assert d["status"] == STATUS_OK
|
||||
assert d["age_s"] == pytest.approx(1.5, abs=0.01)
|
||||
assert d["message"] == "ok"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _SUBSYSTEM_NAMES completeness
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemNames:
|
||||
def test_all_required_subsystems_present(self):
|
||||
required = {"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"}
|
||||
assert required.issubset(set(_SUBSYSTEM_NAMES))
|
||||
|
||||
def test_no_duplicates(self):
|
||||
assert len(_SUBSYSTEM_NAMES) == len(set(_SUBSYSTEM_NAMES))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@ -1,8 +1,12 @@
|
||||
vesc_can_odometry:
|
||||
ros__parameters:
|
||||
# ── CAN motor IDs ────────────────────────────────────────────────────────
|
||||
left_can_id: 61 # left motor VESC CAN ID → /vesc/can_61/state
|
||||
right_can_id: 79 # right motor VESC CAN ID → /vesc/can_79/state
|
||||
left_can_id: 56 # left motor VESC CAN ID (Mamba F722S)
|
||||
right_can_id: 68 # right motor VESC CAN ID (Mamba F722S)
|
||||
|
||||
# ── State topic names (must match VESC telemetry publisher) ──────────────
|
||||
left_state_topic: /vesc/left/state
|
||||
right_state_topic: /vesc/right/state
|
||||
|
||||
# ── Drive geometry ───────────────────────────────────────────────────────
|
||||
wheel_radius: 0.10 # wheel radius (m)
|
||||
|
||||
@ -2,7 +2,8 @@
|
||||
"""
|
||||
vesc_odometry_bridge.py — Differential drive odometry from dual VESC CAN motors (Issue #646).
|
||||
|
||||
Subscribes to per-motor VESC telemetry topics for CAN IDs 61 (left) and 79 (right),
|
||||
Subscribes to per-motor VESC telemetry topics (configurable, defaulting to
|
||||
/vesc/left/state and /vesc/right/state as published by the telemetry node),
|
||||
applies differential drive kinematics via DiffDriveOdometry, and publishes:
|
||||
|
||||
/odom nav_msgs/Odometry — consumed by Nav2 / slam_toolbox
|
||||
@ -10,8 +11,11 @@ applies differential drive kinematics via DiffDriveOdometry, and publishes:
|
||||
TF odom → base_link
|
||||
|
||||
Input topics (std_msgs/String, JSON):
|
||||
/vesc/can_61/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
|
||||
/vesc/can_79/state same schema
|
||||
/vesc/left/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
|
||||
/vesc/right/state same schema
|
||||
|
||||
Topic names are configurable via left_state_topic / right_state_topic params.
|
||||
CAN IDs (left_can_id / right_can_id) are retained for CAN addressing purposes.
|
||||
|
||||
Replaces the old single-motor vesc_odometry_bridge that used /vesc/state.
|
||||
"""
|
||||
@ -52,8 +56,11 @@ class VESCCANOdometryNode(Node):
|
||||
super().__init__("vesc_can_odometry")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter("left_can_id", 61)
|
||||
self.declare_parameter("right_can_id", 79)
|
||||
self.declare_parameter("left_can_id", 56) # CAN ID for left motor
|
||||
self.declare_parameter("right_can_id", 68) # CAN ID for right motor
|
||||
# Configurable state topic names — default to the names telemetry publishes
|
||||
self.declare_parameter("left_state_topic", "/vesc/left/state")
|
||||
self.declare_parameter("right_state_topic", "/vesc/right/state")
|
||||
self.declare_parameter("wheel_radius", 0.10) # metres
|
||||
self.declare_parameter("wheel_separation", 0.35) # metres (track width)
|
||||
self.declare_parameter("motor_poles", 7) # BLDC pole pairs
|
||||
@ -70,9 +77,11 @@ class VESCCANOdometryNode(Node):
|
||||
self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)²
|
||||
self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)²
|
||||
|
||||
left_id = self.get_parameter("left_can_id").value
|
||||
right_id = self.get_parameter("right_can_id").value
|
||||
freq = self.get_parameter("update_frequency").value
|
||||
left_id = self.get_parameter("left_can_id").value
|
||||
right_id = self.get_parameter("right_can_id").value
|
||||
left_topic = self.get_parameter("left_state_topic").value
|
||||
right_topic = self.get_parameter("right_state_topic").value
|
||||
freq = self.get_parameter("update_frequency").value
|
||||
|
||||
self._odom_frame = self.get_parameter("odom_frame_id").value
|
||||
self._base_frame = self.get_parameter("base_frame_id").value
|
||||
@ -115,13 +124,13 @@ class VESCCANOdometryNode(Node):
|
||||
# ── Subscriptions ──────────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
String,
|
||||
f"/vesc/can_{left_id}/state",
|
||||
left_topic,
|
||||
self._on_left,
|
||||
10,
|
||||
)
|
||||
self.create_subscription(
|
||||
String,
|
||||
f"/vesc/can_{right_id}/state",
|
||||
right_topic,
|
||||
self._on_right,
|
||||
10,
|
||||
)
|
||||
@ -130,7 +139,8 @@ class VESCCANOdometryNode(Node):
|
||||
self.create_timer(1.0 / freq, self._on_timer)
|
||||
|
||||
self.get_logger().info(
|
||||
f"vesc_can_odometry: left=CAN{left_id} right=CAN{right_id} "
|
||||
f"vesc_can_odometry: left=CAN{left_id}({left_topic}) "
|
||||
f"right=CAN{right_id}({right_topic}) "
|
||||
f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m "
|
||||
f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} "
|
||||
f"{self._odom_frame}→{self._base_frame} @ {freq:.0f}Hz"
|
||||
|
||||
@ -1,20 +1,10 @@
|
||||
# VESC Driver Configuration for SaltyBot
|
||||
# Flipsky FSESC 4.20 Plus (VESC dual ESC) motor control
|
||||
|
||||
vesc_driver:
|
||||
vesc_can_driver:
|
||||
ros__parameters:
|
||||
# Serial communication
|
||||
port: "/dev/ttyUSB0"
|
||||
baudrate: 115200
|
||||
|
||||
# Command mode: "duty_cycle" or "rpm"
|
||||
# duty_cycle: Direct motor duty cycle (-100 to 100)
|
||||
# rpm: RPM setpoint mode (closed-loop speed control)
|
||||
command_mode: "duty_cycle"
|
||||
|
||||
# Motor limits
|
||||
max_rpm: 60000
|
||||
max_current_a: 50.0
|
||||
|
||||
# Command timeout: If no cmd_vel received for this duration, motor stops
|
||||
timeout_s: 1.0
|
||||
interface: can0
|
||||
bitrate: 500000
|
||||
left_motor_id: 61
|
||||
right_motor_id: 79
|
||||
wheel_separation: 0.5
|
||||
wheel_radius: 0.1
|
||||
max_speed: 5.0
|
||||
command_timeout: 1.0
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
"""Launch file for VESC driver node."""
|
||||
"""Launch file for VESC CAN driver node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
@ -9,7 +9,7 @@ from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for VESC driver."""
|
||||
"""Generate launch description for VESC CAN driver."""
|
||||
pkg_dir = get_package_share_directory("saltybot_vesc_driver")
|
||||
config_file = os.path.join(pkg_dir, "config", "vesc_params.yaml")
|
||||
|
||||
@ -20,15 +20,10 @@ def generate_launch_description():
|
||||
default_value=config_file,
|
||||
description="Path to configuration YAML file",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"port",
|
||||
default_value="/dev/ttyUSB0",
|
||||
description="Serial port for VESC",
|
||||
),
|
||||
Node(
|
||||
package="saltybot_vesc_driver",
|
||||
executable="vesc_driver_node",
|
||||
name="vesc_driver",
|
||||
name="vesc_can_driver",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("config_file")],
|
||||
),
|
||||
|
||||
@ -14,6 +14,8 @@
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<exec_depend>python3-can</exec_depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
|
||||
@ -1,222 +1,133 @@
|
||||
#!/usr/bin/env python3
|
||||
"""VESC UART driver node for SaltyBot.
|
||||
"""
|
||||
VESC CAN driver node using python-can with SocketCAN.
|
||||
Subscribes to /cmd_vel (geometry_msgs/Twist) and sends duty cycle
|
||||
commands to two VESC motor controllers via CAN bus.
|
||||
|
||||
Uses pyvesc library to communicate with Flipsky FSESC 4.20 Plus (VESC dual ESC).
|
||||
Subscribes to velocity commands and publishes motor telemetry.
|
||||
|
||||
Subscribed topics:
|
||||
/cmd_vel (geometry_msgs/Twist) - Velocity command (linear.x = m/s, angular.z = rad/s)
|
||||
|
||||
Published topics:
|
||||
/vesc/state (VESCState) - Motor telemetry (voltage, current, RPM, temperature, fault)
|
||||
/vesc/raw_telemetry (String) - Raw telemetry JSON for debugging
|
||||
VESC CAN protocol:
|
||||
Extended frame ID = (CAN_PACKET_ID << 8) | VESC_ID
|
||||
CAN_PACKET_SET_DUTY = 0 -> payload: int32 (duty * 100000)
|
||||
"""
|
||||
|
||||
import json
|
||||
import threading
|
||||
import struct
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
import can
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from geometry_msgs.msg import Twist
|
||||
from std_msgs.msg import String
|
||||
import serial
|
||||
|
||||
try:
|
||||
import pyvesc
|
||||
except ImportError:
|
||||
pyvesc = None
|
||||
from rclpy.node import Node
|
||||
|
||||
|
||||
class VESCCommandMode(Enum):
|
||||
"""VESC command modes."""
|
||||
DUTY_CYCLE = "duty_cycle" # Direct duty cycle (-100 to 100)
|
||||
RPM = "rpm" # RPM setpoint
|
||||
CURRENT = "current" # Current (A)
|
||||
BRAKE_CURRENT = "brake" # Brake current (A)
|
||||
# VESC CAN packet IDs
|
||||
CAN_PACKET_SET_DUTY = 0
|
||||
CAN_PACKET_SET_RPM = 3
|
||||
|
||||
|
||||
class VESCState:
|
||||
"""VESC telemetry state."""
|
||||
def make_can_id(packet_id: int, vesc_id: int) -> int:
|
||||
"""Build extended CAN frame ID for VESC."""
|
||||
return (packet_id << 8) | vesc_id
|
||||
|
||||
|
||||
class VescCanDriver(Node):
|
||||
def __init__(self):
|
||||
self.voltage_v = 0.0
|
||||
self.current_a = 0.0
|
||||
self.rpm = 0
|
||||
self.temperature_c = 0.0
|
||||
self.fault_code = 0
|
||||
self.timestamp = time.time()
|
||||
super().__init__('vesc_can_driver')
|
||||
|
||||
# Declare parameters
|
||||
self.declare_parameter('interface', 'can0')
|
||||
self.declare_parameter('left_motor_id', 61)
|
||||
self.declare_parameter('right_motor_id', 79)
|
||||
self.declare_parameter('wheel_separation', 0.5)
|
||||
self.declare_parameter('wheel_radius', 0.1)
|
||||
self.declare_parameter('max_speed', 5.0)
|
||||
self.declare_parameter('command_timeout', 1.0)
|
||||
|
||||
class VESCDriverNode(Node):
|
||||
"""ROS2 node for VESC motor control and telemetry."""
|
||||
# Read parameters
|
||||
self.interface = self.get_parameter('interface').value
|
||||
self.left_id = self.get_parameter('left_motor_id').value
|
||||
self.right_id = self.get_parameter('right_motor_id').value
|
||||
self.wheel_sep = self.get_parameter('wheel_separation').value
|
||||
self.max_speed = self.get_parameter('max_speed').value
|
||||
self.cmd_timeout = self.get_parameter('command_timeout').value
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("vesc_driver")
|
||||
# Open CAN bus
|
||||
try:
|
||||
self.bus = can.interface.Bus(
|
||||
channel=self.interface,
|
||||
bustype='socketcan',
|
||||
)
|
||||
self.get_logger().info(f'CAN bus opened: {self.interface}')
|
||||
except Exception as e:
|
||||
self.get_logger().error(f'Failed to open CAN bus: {e}')
|
||||
raise
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter("port", "/dev/ttyUSB0")
|
||||
self.declare_parameter("baudrate", 115200)
|
||||
self.declare_parameter("command_mode", "duty_cycle") # or "rpm"
|
||||
self.declare_parameter("max_rpm", 60000)
|
||||
self.declare_parameter("max_current_a", 50.0)
|
||||
self.declare_parameter("timeout_s", 1.0)
|
||||
self._last_cmd_time = time.monotonic()
|
||||
|
||||
self.port = self.get_parameter("port").value
|
||||
self.baudrate = self.get_parameter("baudrate").value
|
||||
self.command_mode = self.get_parameter("command_mode").value
|
||||
self.max_rpm = self.get_parameter("max_rpm").value
|
||||
self.max_current_a = self.get_parameter("max_current_a").value
|
||||
self.timeout_s = self.get_parameter("timeout_s").value
|
||||
# Subscriber
|
||||
self.create_subscription(Twist, '/cmd_vel', self._cmd_vel_cb, 10)
|
||||
|
||||
# Serial connection
|
||||
self.serial = None
|
||||
self.vesc = None
|
||||
self.last_cmd_time = 0
|
||||
self.state = VESCState()
|
||||
|
||||
# Subscriptions
|
||||
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
|
||||
|
||||
# Publishers
|
||||
self.pub_state = self.create_publisher(String, "/vesc/state", 10)
|
||||
self.pub_telemetry = self.create_publisher(String, "/vesc/raw_telemetry", 10)
|
||||
|
||||
# Timer for telemetry polling (100 Hz)
|
||||
self.create_timer(0.01, self._poll_telemetry)
|
||||
|
||||
# Initialize VESC connection
|
||||
self._init_vesc()
|
||||
# Watchdog timer (10 Hz)
|
||||
self.create_timer(0.1, self._watchdog_cb)
|
||||
|
||||
self.get_logger().info(
|
||||
f"VESC driver initialized: port={self.port}, baud={self.baudrate}, "
|
||||
f"mode={self.command_mode}, timeout={self.timeout_s}s"
|
||||
f'VESC CAN driver ready — left={self.left_id} right={self.right_id}'
|
||||
)
|
||||
|
||||
def _init_vesc(self) -> bool:
|
||||
"""Initialize serial connection to VESC."""
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _cmd_vel_cb(self, msg: Twist):
|
||||
self._last_cmd_time = time.monotonic()
|
||||
linear = msg.linear.x
|
||||
angular = msg.angular.z
|
||||
|
||||
left_speed = linear - (angular * self.wheel_sep / 2.0)
|
||||
right_speed = linear + (angular * self.wheel_sep / 2.0)
|
||||
|
||||
left_duty = max(-1.0, min(1.0, left_speed / self.max_speed))
|
||||
right_duty = max(-1.0, min(1.0, right_speed / self.max_speed))
|
||||
|
||||
self._send_duty(self.left_id, left_duty)
|
||||
self._send_duty(self.right_id, right_duty)
|
||||
|
||||
def _watchdog_cb(self):
|
||||
elapsed = time.monotonic() - self._last_cmd_time
|
||||
if elapsed > self.cmd_timeout:
|
||||
self._send_duty(self.left_id, 0.0)
|
||||
self._send_duty(self.right_id, 0.0)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _send_duty(self, vesc_id: int, duty: float):
|
||||
"""Send CAN_PACKET_SET_DUTY to a VESC controller."""
|
||||
can_id = make_can_id(CAN_PACKET_SET_DUTY, vesc_id)
|
||||
payload = struct.pack('>i', int(duty * 100000))
|
||||
msg = can.Message(
|
||||
arbitration_id=can_id,
|
||||
data=payload,
|
||||
is_extended_id=True,
|
||||
)
|
||||
try:
|
||||
if pyvesc is None:
|
||||
self.get_logger().error("pyvesc not installed. Install with: pip install pyvesc")
|
||||
return False
|
||||
self.bus.send(msg)
|
||||
except can.CanError as e:
|
||||
self.get_logger().error(f'CAN send error (id={vesc_id}): {e}')
|
||||
|
||||
self.serial = serial.Serial(
|
||||
port=self.port,
|
||||
baudrate=self.baudrate,
|
||||
timeout=0.1,
|
||||
)
|
||||
self.vesc = pyvesc.VescUart(
|
||||
serial_port=self.serial,
|
||||
has_sensor=False, # No wheel speed sensor
|
||||
start_heartbeat=True,
|
||||
)
|
||||
|
||||
self.get_logger().info(f"Connected to VESC on {self.port} @ {self.baudrate} baud")
|
||||
return True
|
||||
|
||||
except (serial.SerialException, Exception) as e:
|
||||
self.get_logger().error(f"Failed to initialize VESC: {e}")
|
||||
return False
|
||||
|
||||
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||
"""Handle velocity command from /cmd_vel."""
|
||||
if self.vesc is None:
|
||||
return
|
||||
|
||||
try:
|
||||
# Extract linear velocity (m/s) and angular velocity (rad/s)
|
||||
linear_v = msg.linear.x
|
||||
angular_v = msg.angular.z
|
||||
|
||||
# Convert velocity to VESC command based on mode
|
||||
if self.command_mode == "duty_cycle":
|
||||
# Map velocity to duty cycle (-100 to 100)
|
||||
# Assuming max velocity ~5 m/s maps to ~100% duty
|
||||
duty = max(-100, min(100, (linear_v / 5.0) * 100))
|
||||
self.vesc.set_duty(duty / 100.0) # pyvesc expects 0-1 or -1-0
|
||||
|
||||
elif self.command_mode == "rpm":
|
||||
# Map velocity to RPM
|
||||
# Assuming max velocity 5 m/s → max_rpm
|
||||
rpm = max(-self.max_rpm, min(self.max_rpm, (linear_v / 5.0) * self.max_rpm))
|
||||
self.vesc.set_rpm(int(rpm))
|
||||
|
||||
# Angular velocity could control steering if dual motors are used differently
|
||||
# For now, it's ignored (single dual-motor ESC)
|
||||
|
||||
self.last_cmd_time = time.time()
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"Error sending command to VESC: {e}")
|
||||
|
||||
def _poll_telemetry(self) -> None:
|
||||
"""Poll VESC for telemetry and publish state."""
|
||||
if self.vesc is None:
|
||||
return
|
||||
|
||||
try:
|
||||
# Read telemetry from VESC
|
||||
if self.vesc.read_values():
|
||||
# Update state from VESC data
|
||||
self.state.voltage_v = self.vesc.data.v_in
|
||||
self.state.current_a = self.vesc.data.current_in
|
||||
self.state.rpm = self.vesc.data.rpm
|
||||
self.state.temperature_c = self.vesc.data.temp_fet
|
||||
self.state.fault_code = self.vesc.data.fault_code
|
||||
self.state.timestamp = time.time()
|
||||
|
||||
# Check for timeout
|
||||
if time.time() - self.last_cmd_time > self.timeout_s:
|
||||
# No recent command, send zero
|
||||
if self.command_mode == "duty_cycle":
|
||||
self.vesc.set_duty(0.0)
|
||||
else:
|
||||
self.vesc.set_rpm(0)
|
||||
|
||||
# Publish state
|
||||
self._publish_state()
|
||||
else:
|
||||
self.get_logger().warn("Failed to read VESC telemetry")
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"Error polling VESC: {e}")
|
||||
|
||||
def _publish_state(self) -> None:
|
||||
"""Publish VESC state as JSON."""
|
||||
state_dict = {
|
||||
"voltage_v": round(self.state.voltage_v, 2),
|
||||
"current_a": round(self.state.current_a, 2),
|
||||
"rpm": self.state.rpm,
|
||||
"temperature_c": round(self.state.temperature_c, 1),
|
||||
"fault_code": self.state.fault_code,
|
||||
"timestamp": self.state.timestamp,
|
||||
}
|
||||
|
||||
msg = String(data=json.dumps(state_dict))
|
||||
self.pub_state.publish(msg)
|
||||
self.pub_telemetry.publish(msg)
|
||||
|
||||
# Log fault codes
|
||||
if self.state.fault_code != 0:
|
||||
self.get_logger().warn(f"VESC fault code: {self.state.fault_code}")
|
||||
def destroy_node(self):
|
||||
self._send_duty(self.left_id, 0.0)
|
||||
self._send_duty(self.right_id, 0.0)
|
||||
self.bus.shutdown()
|
||||
super().destroy_node()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = VESCDriverNode()
|
||||
node = VescCanDriver()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
if node.vesc and node.serial:
|
||||
node.vesc.set_duty(0.0) # Zero throttle on shutdown
|
||||
node.serial.close()
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
# VESC CAN Telemetry Node — SaltyBot dual FSESC 6.7 Pro (FW 6.6)
|
||||
# SocketCAN interface: can0 (SN65HVD230 transceiver on MAMBA F722S CAN2)
|
||||
|
||||
vesc_telemetry:
|
||||
ros__parameters:
|
||||
# SocketCAN interface name
|
||||
can_interface: "can0"
|
||||
|
||||
# CAN IDs assigned to each VESC in VESC Tool
|
||||
# Left motor VESC: ID 61 (0x3D)
|
||||
# Right motor VESC: ID 79 (0x4F)
|
||||
left_can_id: 61
|
||||
right_can_id: 79
|
||||
|
||||
# This node's CAN ID used as sender_id in GET_VALUES requests (0-127)
|
||||
sender_can_id: 127
|
||||
|
||||
# Telemetry publish rate and GET_VALUES poll rate (10-50 Hz)
|
||||
poll_rate_hz: 20
|
||||
|
||||
# Fault alert thresholds
|
||||
# Phase current threshold — FSESC 6.7 Pro rated 80 A peak; alert at 60 A
|
||||
overcurrent_threshold_a: 60.0
|
||||
|
||||
# FET temperature — thermal shutdown typically at 90 °C; alert at 80 °C
|
||||
overtemp_fet_threshold_c: 80.0
|
||||
|
||||
# Motor temperature — alert at 100 °C
|
||||
overtemp_motor_threshold_c: 100.0
|
||||
@ -0,0 +1,44 @@
|
||||
"""Launch file for VESC CAN telemetry node."""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_dir = get_package_share_directory("saltybot_vesc_telemetry")
|
||||
config_file = os.path.join(pkg_dir, "config", "vesc_telemetry_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to vesc_telemetry_params.yaml",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"can_interface",
|
||||
default_value="can0",
|
||||
description="SocketCAN interface (e.g. can0, vcan0 for testing)",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"poll_rate_hz",
|
||||
default_value="20",
|
||||
description="Telemetry publish + GET_VALUES poll rate (10-50 Hz)",
|
||||
),
|
||||
Node(
|
||||
package="saltybot_vesc_telemetry",
|
||||
executable="vesc_telemetry_node",
|
||||
name="vesc_telemetry",
|
||||
output="screen",
|
||||
parameters=[
|
||||
LaunchConfiguration("config_file"),
|
||||
{
|
||||
"can_interface": LaunchConfiguration("can_interface"),
|
||||
"poll_rate_hz": LaunchConfiguration("poll_rate_hz"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
30
jetson/ros2_ws/src/saltybot_vesc_telemetry/package.xml
Normal file
30
jetson/ros2_ws/src/saltybot_vesc_telemetry/package.xml
Normal file
@ -0,0 +1,30 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_vesc_telemetry</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
VESC CAN telemetry node for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
|
||||
Polls CAN IDs 61 (left) and 79 (right) via SocketCAN (python-can, can0).
|
||||
Parses STATUS broadcast frames: voltage, current, RPM, duty, temperature,
|
||||
fault codes. Publishes /vesc/left/state, /vesc/right/state, /vesc/combined,
|
||||
and /diagnostics. Issues #645.
|
||||
</description>
|
||||
<maintainer email="sl-firmware@saltylab.local">sl-firmware</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>diagnostic_msgs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,176 @@
|
||||
"""VESC CAN protocol constants and frame parsers (FW 6.6).
|
||||
|
||||
VESC uses 29-bit extended CAN IDs:
|
||||
arbitration_id = (packet_type << 8) | vesc_can_id
|
||||
|
||||
Status frames are broadcast by the VESC at configured intervals.
|
||||
GET_VALUES requests use CAN_PACKET_PROCESS_SHORT_BUFFER.
|
||||
"""
|
||||
|
||||
import struct
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Packet type IDs (upper bits of 29-bit extended arb ID)
|
||||
# ---------------------------------------------------------------------------
|
||||
CAN_PACKET_STATUS = 9 # RPM, phase current, duty cycle
|
||||
CAN_PACKET_STATUS_2 = 14 # Ah used / Ah charged
|
||||
CAN_PACKET_STATUS_3 = 15 # Wh used / Wh charged
|
||||
CAN_PACKET_STATUS_4 = 16 # FET temp, motor temp, input current
|
||||
CAN_PACKET_STATUS_5 = 27 # tachometer, input voltage
|
||||
CAN_PACKET_PROCESS_SHORT_BUFFER = 32 # send a command to the VESC (≤3-byte payload)
|
||||
|
||||
# VESC UART command IDs (embedded inside short-buffer requests)
|
||||
COMM_GET_VALUES = 4
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fault codes (FW 6.6)
|
||||
# ---------------------------------------------------------------------------
|
||||
FAULT_CODE_NONE = 0
|
||||
FAULT_CODE_OVER_VOLTAGE = 1
|
||||
FAULT_CODE_UNDER_VOLTAGE = 2
|
||||
FAULT_CODE_DRV = 3
|
||||
FAULT_CODE_ABS_OVER_CURRENT = 4
|
||||
FAULT_CODE_OVER_TEMP_FET = 5
|
||||
FAULT_CODE_OVER_TEMP_MOTOR = 6
|
||||
FAULT_CODE_GATE_DRIVER_OVER_VOLTAGE = 7
|
||||
FAULT_CODE_GATE_DRIVER_UNDER_VOLTAGE = 8
|
||||
FAULT_CODE_MCU_UNDER_VOLTAGE = 9
|
||||
FAULT_CODE_BOOTING_FROM_WATCHDOG_RESET = 10
|
||||
|
||||
FAULT_NAMES: dict[int, str] = {
|
||||
0: "NONE",
|
||||
1: "OVER_VOLTAGE",
|
||||
2: "UNDER_VOLTAGE",
|
||||
3: "DRV",
|
||||
4: "ABS_OVER_CURRENT",
|
||||
5: "OVER_TEMP_FET",
|
||||
6: "OVER_TEMP_MOTOR",
|
||||
7: "GATE_DRIVER_OVER_VOLTAGE",
|
||||
8: "GATE_DRIVER_UNDER_VOLTAGE",
|
||||
9: "MCU_UNDER_VOLTAGE",
|
||||
10: "BOOTING_FROM_WATCHDOG_RESET",
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State dataclass
|
||||
# ---------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class VescState:
|
||||
"""Telemetry snapshot for one VESC, populated from status broadcast frames."""
|
||||
|
||||
can_id: int = 0
|
||||
|
||||
# From STATUS (9)
|
||||
rpm: int = 0
|
||||
current_a: float = 0.0 # phase current (A)
|
||||
duty_cycle: float = 0.0 # -1.0 .. +1.0
|
||||
|
||||
# From STATUS_4 (16)
|
||||
temp_fet_c: float = 0.0
|
||||
temp_motor_c: float = 0.0
|
||||
current_in_a: float = 0.0 # battery/input current (A)
|
||||
|
||||
# From STATUS_5 (27)
|
||||
voltage_v: float = 0.0
|
||||
|
||||
# Fault (embedded in STATUS per FW 6.6 — lower byte of duty word)
|
||||
fault_code: int = 0
|
||||
|
||||
# Timestamps (monotonic seconds)
|
||||
last_status_ts: float = 0.0
|
||||
last_status5_ts: float = 0.0
|
||||
|
||||
ALIVE_TIMEOUT_S: float = 1.0 # mark offline if no STATUS frame for this long
|
||||
|
||||
@property
|
||||
def fault_name(self) -> str:
|
||||
return FAULT_NAMES.get(self.fault_code, f"UNKNOWN_{self.fault_code}")
|
||||
|
||||
@property
|
||||
def is_alive(self) -> bool:
|
||||
return (time.monotonic() - self.last_status_ts) < self.ALIVE_TIMEOUT_S
|
||||
|
||||
@property
|
||||
def has_fault(self) -> bool:
|
||||
return self.fault_code != FAULT_CODE_NONE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Frame parsers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def parse_status(data: bytes) -> dict | None:
|
||||
"""Parse CAN_PACKET_STATUS (9): RPM, phase current, duty cycle.
|
||||
|
||||
Wire format (8 bytes, all big-endian):
|
||||
bytes [0:4] RPM : int32
|
||||
bytes [4:6] current×10 : int16 → ÷10 = A
|
||||
bytes [6:8] duty×1000 : int16 → ÷1000 = fraction (-1..+1)
|
||||
"""
|
||||
if len(data) < 8:
|
||||
return None
|
||||
rpm, current_x10, duty_x1000 = struct.unpack_from(">ihh", data, 0)
|
||||
return {
|
||||
"rpm": rpm,
|
||||
"current_a": current_x10 / 10.0,
|
||||
"duty_cycle": duty_x1000 / 1000.0,
|
||||
}
|
||||
|
||||
|
||||
def parse_status_4(data: bytes) -> dict | None:
|
||||
"""Parse CAN_PACKET_STATUS_4 (16): temperatures and input current.
|
||||
|
||||
Wire format (8 bytes, all big-endian):
|
||||
bytes [0:2] temp_fet×10 : int16 → ÷10 = °C
|
||||
bytes [2:4] temp_motor×10 : int16 → ÷10 = °C
|
||||
bytes [4:6] current_in×10 : int16 → ÷10 = A
|
||||
bytes [6:8] pid_pos×50 : int16 (not used here)
|
||||
"""
|
||||
if len(data) < 8:
|
||||
return None
|
||||
tf_x10, tm_x10, ci_x10, _pid = struct.unpack_from(">hhhh", data, 0)
|
||||
return {
|
||||
"temp_fet_c": tf_x10 / 10.0,
|
||||
"temp_motor_c": tm_x10 / 10.0,
|
||||
"current_in_a": ci_x10 / 10.0,
|
||||
}
|
||||
|
||||
|
||||
def parse_status_5(data: bytes) -> dict | None:
|
||||
"""Parse CAN_PACKET_STATUS_5 (27): input voltage (and tachometer).
|
||||
|
||||
Wire format (8 bytes, all big-endian):
|
||||
bytes [0:4] tachometer : int32 (unused here)
|
||||
bytes [4:6] v_in×10 : int16 → ÷10 = V
|
||||
bytes [6:8] reserved
|
||||
"""
|
||||
if len(data) < 6:
|
||||
return None
|
||||
_tacho, v_in_x10 = struct.unpack_from(">ih", data, 0)
|
||||
return {
|
||||
"voltage_v": v_in_x10 / 10.0,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Request builder
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def make_get_values_request(sender_id: int, target_id: int) -> tuple[int, bytes]:
|
||||
"""Build a CAN_PACKET_PROCESS_SHORT_BUFFER GET_VALUES request frame.
|
||||
|
||||
The VESC responds by refreshing its STATUS broadcast cadence.
|
||||
|
||||
Args:
|
||||
sender_id: This node's CAN ID (0-127; use 127 for a host controller).
|
||||
target_id: CAN ID of the target VESC.
|
||||
|
||||
Returns:
|
||||
(arbitration_id, data) — pass directly to can.Message().
|
||||
"""
|
||||
arb_id = (CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | (target_id & 0xFF)
|
||||
payload = bytes([sender_id & 0xFF, 0x00, COMM_GET_VALUES])
|
||||
return arb_id, payload
|
||||
@ -0,0 +1,367 @@
|
||||
#!/usr/bin/env python3
|
||||
"""VESC CAN telemetry node for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
|
||||
|
||||
Listens on SocketCAN (can0) for VESC status broadcast frames from two VESCs
|
||||
and publishes parsed telemetry to ROS2 topics. Also sends periodic GET_VALUES
|
||||
requests to trigger fresh STATUS broadcasts.
|
||||
|
||||
Published topics
|
||||
----------------
|
||||
/vesc/left/state (std_msgs/String) — JSON telemetry for left VESC
|
||||
/vesc/right/state (std_msgs/String) — JSON telemetry for right VESC
|
||||
/vesc/combined (std_msgs/String) — voltage, total current, both RPMs
|
||||
/diagnostics (diagnostic_msgs/DiagnosticArray)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
can_interface str 'can0' SocketCAN interface
|
||||
left_can_id int 61 CAN ID of left VESC
|
||||
right_can_id int 79 CAN ID of right VESC
|
||||
poll_rate_hz int 20 Publish + request rate (10-50 Hz)
|
||||
sender_can_id int 127 This node's CAN ID
|
||||
overcurrent_threshold_a float 60.0 Fault alert threshold (A)
|
||||
overtemp_fet_threshold_c float 80.0 FET overtemp alert threshold (°C)
|
||||
overtemp_motor_threshold_c float 100.0 Motor overtemp alert threshold (°C)
|
||||
"""
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from std_msgs.msg import String
|
||||
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
|
||||
|
||||
try:
|
||||
import can
|
||||
_CAN_AVAILABLE = True
|
||||
except ImportError:
|
||||
_CAN_AVAILABLE = False
|
||||
|
||||
from .vesc_can_protocol import (
|
||||
CAN_PACKET_STATUS,
|
||||
CAN_PACKET_STATUS_4,
|
||||
CAN_PACKET_STATUS_5,
|
||||
VescState,
|
||||
make_get_values_request,
|
||||
parse_status,
|
||||
parse_status_4,
|
||||
parse_status_5,
|
||||
)
|
||||
|
||||
|
||||
class VescTelemetryNode(Node):
|
||||
"""ROS2 node: VESC CAN telemetry via SocketCAN."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("vesc_telemetry")
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Parameters
|
||||
# ----------------------------------------------------------------
|
||||
self.declare_parameter("can_interface", "can0")
|
||||
self.declare_parameter("left_can_id", 61)
|
||||
self.declare_parameter("right_can_id", 79)
|
||||
self.declare_parameter("poll_rate_hz", 20)
|
||||
self.declare_parameter("sender_can_id", 127)
|
||||
self.declare_parameter("overcurrent_threshold_a", 60.0)
|
||||
self.declare_parameter("overtemp_fet_threshold_c", 80.0)
|
||||
self.declare_parameter("overtemp_motor_threshold_c", 100.0)
|
||||
|
||||
self._iface = self.get_parameter("can_interface").value
|
||||
self._left_id = self.get_parameter("left_can_id").value
|
||||
self._right_id = self.get_parameter("right_can_id").value
|
||||
hz = int(max(10, min(50, self.get_parameter("poll_rate_hz").value)))
|
||||
self._sender_id = self.get_parameter("sender_can_id").value
|
||||
self._oc_thresh = self.get_parameter("overcurrent_threshold_a").value
|
||||
self._otf_thresh = self.get_parameter("overtemp_fet_threshold_c").value
|
||||
self._otm_thresh = self.get_parameter("overtemp_motor_threshold_c").value
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Per-VESC state (protected by _state_lock)
|
||||
# ----------------------------------------------------------------
|
||||
self._state: dict[int, VescState] = {
|
||||
self._left_id: VescState(can_id=self._left_id),
|
||||
self._right_id: VescState(can_id=self._right_id),
|
||||
}
|
||||
self._state_lock = threading.Lock()
|
||||
|
||||
# Track which faults we have already logged to avoid log spam
|
||||
self._fault_logged: dict[int, int] = {
|
||||
self._left_id: 0, self._right_id: 0
|
||||
}
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Publishers
|
||||
# ----------------------------------------------------------------
|
||||
self._pub_left = self.create_publisher(String, "/vesc/left/state", 10)
|
||||
self._pub_right = self.create_publisher(String, "/vesc/right/state", 10)
|
||||
self._pub_combined = self.create_publisher(String, "/vesc/combined", 10)
|
||||
self._pub_diag = self.create_publisher(DiagnosticArray, "/diagnostics", 10)
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# SocketCAN
|
||||
# ----------------------------------------------------------------
|
||||
self._bus: Optional["can.BusABC"] = None
|
||||
self._rx_thread: Optional[threading.Thread] = None
|
||||
self._running = False
|
||||
self._init_can()
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Publish + poll timer
|
||||
# ----------------------------------------------------------------
|
||||
self.create_timer(1.0 / hz, self._on_timer)
|
||||
|
||||
self.get_logger().info(
|
||||
f"vesc_telemetry: iface={self._iface}, "
|
||||
f"left_id={self._left_id}, right_id={self._right_id}, "
|
||||
f"poll={hz} Hz, sender_id={self._sender_id}"
|
||||
)
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# CAN initialisation
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
def _init_can(self) -> None:
|
||||
if not _CAN_AVAILABLE:
|
||||
self.get_logger().error(
|
||||
"python-can not installed — run: pip install python-can"
|
||||
)
|
||||
return
|
||||
try:
|
||||
self._bus = can.interface.Bus(channel=self._iface, bustype="socketcan")
|
||||
self._running = True
|
||||
self._rx_thread = threading.Thread(
|
||||
target=self._rx_loop, name="vesc_can_rx", daemon=True
|
||||
)
|
||||
self._rx_thread.start()
|
||||
self.get_logger().info(f"SocketCAN opened: {self._iface}")
|
||||
except Exception as exc:
|
||||
self.get_logger().error(f"Failed to open {self._iface}: {exc}")
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Receive loop (background thread)
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
def _rx_loop(self) -> None:
|
||||
"""Drain SocketCAN frames and update VESC state."""
|
||||
while self._running and self._bus is not None:
|
||||
try:
|
||||
msg = self._bus.recv(timeout=0.1)
|
||||
except Exception:
|
||||
continue
|
||||
if msg is None or not msg.is_extended_id:
|
||||
continue
|
||||
self._dispatch_frame(msg.arbitration_id, bytes(msg.data))
|
||||
|
||||
def _dispatch_frame(self, arb_id: int, data: bytes) -> None:
|
||||
"""Decode extended CAN ID and update the matching VescState."""
|
||||
packet_type = (arb_id >> 8) & 0xFFFF
|
||||
vesc_id = arb_id & 0xFF
|
||||
|
||||
if vesc_id not in self._state:
|
||||
return
|
||||
|
||||
now = time.monotonic()
|
||||
with self._state_lock:
|
||||
s = self._state[vesc_id]
|
||||
|
||||
if packet_type == CAN_PACKET_STATUS:
|
||||
parsed = parse_status(data)
|
||||
if parsed:
|
||||
s.rpm = parsed["rpm"]
|
||||
s.current_a = parsed["current_a"]
|
||||
s.duty_cycle = parsed["duty_cycle"]
|
||||
s.last_status_ts = now
|
||||
|
||||
elif packet_type == CAN_PACKET_STATUS_4:
|
||||
parsed = parse_status_4(data)
|
||||
if parsed:
|
||||
s.temp_fet_c = parsed["temp_fet_c"]
|
||||
s.temp_motor_c = parsed["temp_motor_c"]
|
||||
s.current_in_a = parsed["current_in_a"]
|
||||
|
||||
elif packet_type == CAN_PACKET_STATUS_5:
|
||||
parsed = parse_status_5(data)
|
||||
if parsed:
|
||||
s.voltage_v = parsed["voltage_v"]
|
||||
s.last_status5_ts = now
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Timer: poll + publish
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
def _on_timer(self) -> None:
|
||||
self._send_poll_requests()
|
||||
self._publish_all()
|
||||
|
||||
def _send_poll_requests(self) -> None:
|
||||
"""Send GET_VALUES to both VESCs to prompt STATUS broadcast refresh."""
|
||||
if self._bus is None:
|
||||
return
|
||||
for vesc_id in (self._left_id, self._right_id):
|
||||
try:
|
||||
arb_id, payload = make_get_values_request(self._sender_id, vesc_id)
|
||||
self._bus.send(can.Message(
|
||||
arbitration_id=arb_id,
|
||||
data=payload,
|
||||
is_extended_id=True,
|
||||
))
|
||||
except Exception as exc:
|
||||
self.get_logger().debug(
|
||||
f"GET_VALUES send failed (vesc_id={vesc_id}): {exc}"
|
||||
)
|
||||
|
||||
def _publish_all(self) -> None:
|
||||
with self._state_lock:
|
||||
left = self._state[self._left_id]
|
||||
right = self._state[self._right_id]
|
||||
# Snapshot dicts while holding lock
|
||||
l_dict = self._state_to_dict(left)
|
||||
r_dict = self._state_to_dict(right)
|
||||
left_alive = left.is_alive
|
||||
right_alive = right.is_alive
|
||||
voltage_v = left.voltage_v if left_alive else right.voltage_v
|
||||
combined = {
|
||||
"voltage_v": round(voltage_v, 2),
|
||||
"total_current_a": round(left.current_in_a + right.current_in_a, 2),
|
||||
"left_rpm": left.rpm,
|
||||
"right_rpm": right.rpm,
|
||||
"left_alive": left_alive,
|
||||
"right_alive": right_alive,
|
||||
"stamp": time.time(),
|
||||
}
|
||||
# Build diagnostics while holding lock so values are consistent
|
||||
diag_array = self._build_diagnostics(left, right)
|
||||
|
||||
self._pub_left.publish(String(data=json.dumps(l_dict)))
|
||||
self._pub_right.publish(String(data=json.dumps(r_dict)))
|
||||
self._pub_combined.publish(String(data=json.dumps(combined)))
|
||||
self._pub_diag.publish(diag_array)
|
||||
|
||||
# Fault alerting (outside lock — uses snapshots)
|
||||
self._check_faults_from_dict(l_dict, self._left_id, "left")
|
||||
self._check_faults_from_dict(r_dict, self._right_id, "right")
|
||||
|
||||
@staticmethod
|
||||
def _state_to_dict(s: VescState) -> dict:
|
||||
return {
|
||||
"can_id": s.can_id,
|
||||
"rpm": s.rpm,
|
||||
"current_a": round(s.current_a, 2),
|
||||
"current_in_a": round(s.current_in_a, 2),
|
||||
"duty_cycle": round(s.duty_cycle, 4),
|
||||
"voltage_v": round(s.voltage_v, 2),
|
||||
"temp_fet_c": round(s.temp_fet_c, 1),
|
||||
"temp_motor_c": round(s.temp_motor_c, 1),
|
||||
"fault_code": s.fault_code,
|
||||
"fault_name": s.fault_name,
|
||||
"alive": s.is_alive,
|
||||
"stamp": time.time(),
|
||||
}
|
||||
|
||||
def _build_diagnostics(
|
||||
self, left: VescState, right: VescState
|
||||
) -> DiagnosticArray:
|
||||
array = DiagnosticArray()
|
||||
array.header.stamp = self.get_clock().now().to_msg()
|
||||
|
||||
for s, label in ((left, "left"), (right, "right")):
|
||||
status = DiagnosticStatus()
|
||||
status.name = f"VESC/{label} (CAN ID {s.can_id})"
|
||||
status.hardware_id = f"vesc_can_{s.can_id}"
|
||||
|
||||
if not s.is_alive:
|
||||
status.level = DiagnosticStatus.ERROR
|
||||
status.message = "No CAN frames received (offline)"
|
||||
elif s.has_fault:
|
||||
status.level = DiagnosticStatus.ERROR
|
||||
status.message = f"Fault: {s.fault_name}"
|
||||
elif (
|
||||
s.temp_fet_c > self._otf_thresh
|
||||
or s.temp_motor_c > self._otm_thresh
|
||||
or abs(s.current_a) > self._oc_thresh
|
||||
):
|
||||
status.level = DiagnosticStatus.WARN
|
||||
status.message = "Threshold exceeded"
|
||||
else:
|
||||
status.level = DiagnosticStatus.OK
|
||||
status.message = "OK"
|
||||
|
||||
status.values = [
|
||||
KeyValue(key="rpm", value=str(s.rpm)),
|
||||
KeyValue(key="current_a", value=f"{s.current_a:.2f}"),
|
||||
KeyValue(key="current_in_a", value=f"{s.current_in_a:.2f}"),
|
||||
KeyValue(key="duty_cycle", value=f"{s.duty_cycle:.4f}"),
|
||||
KeyValue(key="voltage_v", value=f"{s.voltage_v:.2f}"),
|
||||
KeyValue(key="temp_fet_c", value=f"{s.temp_fet_c:.1f}"),
|
||||
KeyValue(key="temp_motor_c", value=f"{s.temp_motor_c:.1f}"),
|
||||
KeyValue(key="fault_code", value=str(s.fault_code)),
|
||||
KeyValue(key="fault_name", value=s.fault_name),
|
||||
]
|
||||
array.status.append(status)
|
||||
|
||||
return array
|
||||
|
||||
def _check_faults_from_dict(self, d: dict, vesc_id: int, label: str) -> None:
|
||||
"""Log fault/threshold warnings; suppress repeated identical messages."""
|
||||
if not d["alive"]:
|
||||
return
|
||||
|
||||
if abs(d["current_a"]) > self._oc_thresh:
|
||||
self.get_logger().warn(
|
||||
f"VESC {label}: overcurrent {d['current_a']:.1f} A "
|
||||
f"(threshold {self._oc_thresh:.0f} A)"
|
||||
)
|
||||
|
||||
if d["temp_fet_c"] > self._otf_thresh:
|
||||
self.get_logger().warn(
|
||||
f"VESC {label}: FET overtemp {d['temp_fet_c']:.1f} °C "
|
||||
f"(threshold {self._otf_thresh:.0f} °C)"
|
||||
)
|
||||
|
||||
if d["temp_motor_c"] > self._otm_thresh:
|
||||
self.get_logger().warn(
|
||||
f"VESC {label}: motor overtemp {d['temp_motor_c']:.1f} °C "
|
||||
f"(threshold {self._otm_thresh:.0f} °C)"
|
||||
)
|
||||
|
||||
fault_code = d["fault_code"]
|
||||
if fault_code != 0 and self._fault_logged.get(vesc_id) != fault_code:
|
||||
self.get_logger().error(
|
||||
f"VESC {label}: fault {fault_code} ({d['fault_name']})"
|
||||
)
|
||||
self._fault_logged[vesc_id] = fault_code
|
||||
elif fault_code == 0 and self._fault_logged.get(vesc_id, 0) != 0:
|
||||
self.get_logger().info(f"VESC {label}: fault cleared")
|
||||
self._fault_logged[vesc_id] = 0
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Cleanup
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
def destroy_node(self) -> None:
|
||||
self._running = False
|
||||
if self._rx_thread is not None:
|
||||
self._rx_thread.join(timeout=1.0)
|
||||
if self._bus is not None:
|
||||
self._bus.shutdown()
|
||||
super().destroy_node()
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = VescTelemetryNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_vesc_telemetry/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_vesc_telemetry/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_vesc_telemetry
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_vesc_telemetry
|
||||
27
jetson/ros2_ws/src/saltybot_vesc_telemetry/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_vesc_telemetry/setup.py
Normal file
@ -0,0 +1,27 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_vesc_telemetry"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/vesc_telemetry.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/vesc_telemetry_params.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools", "python-can"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-firmware",
|
||||
maintainer_email="sl-firmware@saltylab.local",
|
||||
description="VESC CAN telemetry node for dual FSESC 6.7 Pro",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"vesc_telemetry_node = saltybot_vesc_telemetry.vesc_telemetry_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,240 @@
|
||||
"""Unit tests for VESC CAN telemetry — protocol parsing and state logic.
|
||||
|
||||
These tests cover pure functions in vesc_can_protocol and do not require
|
||||
ROS2, python-can, or a live CAN bus.
|
||||
"""
|
||||
|
||||
import struct
|
||||
import time
|
||||
import pytest
|
||||
|
||||
from saltybot_vesc_telemetry.vesc_can_protocol import (
|
||||
CAN_PACKET_STATUS,
|
||||
CAN_PACKET_STATUS_4,
|
||||
CAN_PACKET_STATUS_5,
|
||||
CAN_PACKET_PROCESS_SHORT_BUFFER,
|
||||
COMM_GET_VALUES,
|
||||
FAULT_CODE_NONE,
|
||||
FAULT_CODE_ABS_OVER_CURRENT,
|
||||
FAULT_CODE_OVER_TEMP_FET,
|
||||
FAULT_CODE_OVER_TEMP_MOTOR,
|
||||
FAULT_NAMES,
|
||||
VescState,
|
||||
make_get_values_request,
|
||||
parse_status,
|
||||
parse_status_4,
|
||||
parse_status_5,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_status (CAN_PACKET_STATUS = 9)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseStatus:
|
||||
def test_forward_rpm(self):
|
||||
# 1000 RPM, 5.0 A, 0.300 duty
|
||||
data = struct.pack(">ihh", 1000, 50, 300)
|
||||
r = parse_status(data)
|
||||
assert r is not None
|
||||
assert r["rpm"] == 1000
|
||||
assert r["current_a"] == pytest.approx(5.0)
|
||||
assert r["duty_cycle"] == pytest.approx(0.300)
|
||||
|
||||
def test_reverse_rpm(self):
|
||||
data = struct.pack(">ihh", -3000, -80, -500)
|
||||
r = parse_status(data)
|
||||
assert r["rpm"] == -3000
|
||||
assert r["current_a"] == pytest.approx(-8.0)
|
||||
assert r["duty_cycle"] == pytest.approx(-0.5)
|
||||
|
||||
def test_zero(self):
|
||||
data = struct.pack(">ihh", 0, 0, 0)
|
||||
r = parse_status(data)
|
||||
assert r["rpm"] == 0
|
||||
assert r["current_a"] == pytest.approx(0.0)
|
||||
assert r["duty_cycle"] == pytest.approx(0.0)
|
||||
|
||||
def test_max_duty(self):
|
||||
# duty 1.0 → encoded as 1000
|
||||
data = struct.pack(">ihh", 50000, 200, 1000)
|
||||
r = parse_status(data)
|
||||
assert r["duty_cycle"] == pytest.approx(1.0)
|
||||
|
||||
def test_too_short_returns_none(self):
|
||||
assert parse_status(b"\x00\x01\x02") is None
|
||||
|
||||
def test_exact_minimum_length(self):
|
||||
data = struct.pack(">ihh", 42, 10, 100)
|
||||
assert parse_status(data) is not None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_status_4 (CAN_PACKET_STATUS_4 = 16)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseStatus4:
|
||||
def _make(self, temp_fet, temp_motor, current_in, pid_pos=0):
|
||||
return struct.pack(
|
||||
">hhhh",
|
||||
int(temp_fet * 10),
|
||||
int(temp_motor * 10),
|
||||
int(current_in * 10),
|
||||
pid_pos,
|
||||
)
|
||||
|
||||
def test_normal_temps(self):
|
||||
data = self._make(45.5, 62.3, 12.0)
|
||||
r = parse_status_4(data)
|
||||
assert r is not None
|
||||
assert r["temp_fet_c"] == pytest.approx(45.5)
|
||||
assert r["temp_motor_c"] == pytest.approx(62.3)
|
||||
assert r["current_in_a"] == pytest.approx(12.0)
|
||||
|
||||
def test_negative_current_in(self):
|
||||
data = self._make(30.0, 40.0, -5.0)
|
||||
r = parse_status_4(data)
|
||||
assert r["current_in_a"] == pytest.approx(-5.0)
|
||||
|
||||
def test_too_short_returns_none(self):
|
||||
assert parse_status_4(b"\x00" * 7) is None
|
||||
|
||||
def test_zero_values(self):
|
||||
data = self._make(0, 0, 0)
|
||||
r = parse_status_4(data)
|
||||
assert r["temp_fet_c"] == pytest.approx(0.0)
|
||||
assert r["temp_motor_c"] == pytest.approx(0.0)
|
||||
assert r["current_in_a"] == pytest.approx(0.0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_status_5 (CAN_PACKET_STATUS_5 = 27)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseStatus5:
|
||||
def _make(self, voltage_v, tacho=0):
|
||||
return struct.pack(">ih", tacho, int(voltage_v * 10))
|
||||
|
||||
def test_nominal_voltage(self):
|
||||
data = self._make(25.2)
|
||||
r = parse_status_5(data)
|
||||
assert r is not None
|
||||
assert r["voltage_v"] == pytest.approx(25.2)
|
||||
|
||||
def test_low_voltage(self):
|
||||
data = self._make(18.6)
|
||||
r = parse_status_5(data)
|
||||
assert r["voltage_v"] == pytest.approx(18.6)
|
||||
|
||||
def test_too_short_returns_none(self):
|
||||
assert parse_status_5(b"\x00\x01\x02\x03\x04") is None
|
||||
|
||||
def test_with_tachometer(self):
|
||||
data = struct.pack(">ih", 123456, 252)
|
||||
r = parse_status_5(data)
|
||||
assert r["voltage_v"] == pytest.approx(25.2)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# make_get_values_request
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMakeGetValuesRequest:
|
||||
def test_arb_id_encodes_packet_type_and_target(self):
|
||||
arb_id, payload = make_get_values_request(sender_id=127, target_id=61)
|
||||
expected_arb = (CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | 61
|
||||
assert arb_id == expected_arb
|
||||
|
||||
def test_payload_contains_sender_command(self):
|
||||
_, payload = make_get_values_request(sender_id=127, target_id=61)
|
||||
assert len(payload) == 3
|
||||
assert payload[0] == 127 # sender_id
|
||||
assert payload[1] == 0x00 # send_mode
|
||||
assert payload[2] == COMM_GET_VALUES
|
||||
|
||||
def test_right_vesc(self):
|
||||
arb_id, payload = make_get_values_request(sender_id=127, target_id=79)
|
||||
assert (arb_id & 0xFF) == 79
|
||||
assert payload[2] == COMM_GET_VALUES
|
||||
|
||||
def test_sender_id_in_payload(self):
|
||||
_, payload = make_get_values_request(sender_id=42, target_id=61)
|
||||
assert payload[0] == 42
|
||||
|
||||
def test_arb_id_is_extended(self):
|
||||
# Extended IDs can exceed 0x7FF (11-bit limit)
|
||||
arb_id, _ = make_get_values_request(127, 61)
|
||||
assert arb_id > 0x7FF
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VescState properties
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestVescState:
|
||||
def test_defaults(self):
|
||||
s = VescState(can_id=61)
|
||||
assert s.rpm == 0
|
||||
assert s.current_a == pytest.approx(0.0)
|
||||
assert s.voltage_v == pytest.approx(0.0)
|
||||
assert s.fault_code == FAULT_CODE_NONE
|
||||
assert s.has_fault is False
|
||||
|
||||
def test_is_alive_fresh(self):
|
||||
s = VescState(can_id=61)
|
||||
s.last_status_ts = time.monotonic()
|
||||
assert s.is_alive is True
|
||||
|
||||
def test_is_alive_stale(self):
|
||||
s = VescState(can_id=61)
|
||||
s.last_status_ts = time.monotonic() - 2.0 # 2 s ago > 1 s timeout
|
||||
assert s.is_alive is False
|
||||
|
||||
def test_is_alive_never_received(self):
|
||||
s = VescState(can_id=61)
|
||||
# last_status_ts defaults to 0.0 — monotonic() will be >> 1 s
|
||||
assert s.is_alive is False
|
||||
|
||||
def test_has_fault_true(self):
|
||||
s = VescState(can_id=61)
|
||||
s.fault_code = FAULT_CODE_ABS_OVER_CURRENT
|
||||
assert s.has_fault is True
|
||||
|
||||
def test_has_fault_false_on_none(self):
|
||||
s = VescState(can_id=61)
|
||||
s.fault_code = FAULT_CODE_NONE
|
||||
assert s.has_fault is False
|
||||
|
||||
def test_fault_name_known(self):
|
||||
s = VescState(can_id=61)
|
||||
s.fault_code = FAULT_CODE_OVER_TEMP_FET
|
||||
assert s.fault_name == "OVER_TEMP_FET"
|
||||
|
||||
def test_fault_name_unknown(self):
|
||||
s = VescState(can_id=61)
|
||||
s.fault_code = 99
|
||||
assert "UNKNOWN" in s.fault_name
|
||||
|
||||
def test_fault_name_none(self):
|
||||
s = VescState(can_id=61)
|
||||
assert s.fault_name == "NONE"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FAULT_NAMES completeness
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFaultNames:
|
||||
def test_all_common_codes_named(self):
|
||||
for code in (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10):
|
||||
assert code in FAULT_NAMES, f"fault code {code} missing from FAULT_NAMES"
|
||||
|
||||
def test_none_is_zero(self):
|
||||
assert FAULT_NAMES[0] == "NONE"
|
||||
|
||||
def test_overcurrent_code(self):
|
||||
assert FAULT_NAMES[FAULT_CODE_ABS_OVER_CURRENT] == "ABS_OVER_CURRENT"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
52
jetson/scripts/can_setup.sh
Executable file
52
jetson/scripts/can_setup.sh
Executable file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env bash
|
||||
# can_setup.sh — Bring up CANable 2.0 (gs_usb) as can0 at 500 kbps
|
||||
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/643
|
||||
#
|
||||
# Usage:
|
||||
# sudo ./can_setup.sh # bring up
|
||||
# sudo ./can_setup.sh down # bring down
|
||||
# sudo ./can_setup.sh verify # candump one-shot check (Ctrl-C to stop)
|
||||
#
|
||||
# VESCs on bus: CAN ID 61 (0x3D) and CAN ID 79 (0x4F), 500 kbps
|
||||
set -euo pipefail
|
||||
|
||||
IFACE="${CAN_IFACE:-can0}"
|
||||
BITRATE="${CAN_BITRATE:-500000}"
|
||||
|
||||
log() { echo "[can_setup] $*"; }
|
||||
die() { echo "[can_setup] ERROR: $*" >&2; exit 1; }
|
||||
|
||||
cmd="${1:-up}"
|
||||
|
||||
case "$cmd" in
|
||||
up)
|
||||
# Verify the interface exists (gs_usb module loaded by kernel on plug-in)
|
||||
if ! ip link show "$IFACE" &>/dev/null; then
|
||||
die "$IFACE not found — is CANable 2.0 plugged in and gs_usb loaded?"
|
||||
fi
|
||||
|
||||
log "Bringing up $IFACE at ${BITRATE} bps..."
|
||||
ip link set "$IFACE" down 2>/dev/null || true
|
||||
ip link set "$IFACE" up type can bitrate "$BITRATE"
|
||||
ip link set "$IFACE" up
|
||||
log "$IFACE is up."
|
||||
ip -details link show "$IFACE"
|
||||
;;
|
||||
|
||||
down)
|
||||
log "Bringing down $IFACE..."
|
||||
ip link set "$IFACE" down
|
||||
log "$IFACE is down."
|
||||
;;
|
||||
|
||||
verify)
|
||||
log "Listening on $IFACE — expecting frames from VESC IDs 0x3D (61) and 0x4F (79)"
|
||||
log "Press Ctrl-C to stop."
|
||||
exec candump "$IFACE"
|
||||
;;
|
||||
|
||||
*)
|
||||
echo "Usage: $0 [up|down|verify]"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@ -7,6 +7,9 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_DIR="$(dirname "${SCRIPT_DIR}")"
|
||||
SYSTEMD_DIR="/etc/systemd/system"
|
||||
DEPLOY_DIR="/opt/saltybot/jetson"
|
||||
UDEV_DIR="/etc/udev/rules.d"
|
||||
BRINGUP_SYSTEMD="${REPO_DIR}/ros2_ws/src/saltybot_bringup/systemd"
|
||||
BRINGUP_UDEV="${REPO_DIR}/ros2_ws/src/saltybot_bringup/udev"
|
||||
|
||||
log() { echo "[install_systemd] $*"; }
|
||||
|
||||
@ -23,15 +26,30 @@ log "Installing systemd units..."
|
||||
cp "${SCRIPT_DIR}/saltybot.target" "${SYSTEMD_DIR}/"
|
||||
cp "${SCRIPT_DIR}/saltybot-social.service" "${SYSTEMD_DIR}/"
|
||||
cp "${SCRIPT_DIR}/tailscale-vpn.service" "${SYSTEMD_DIR}/"
|
||||
cp "${BRINGUP_SYSTEMD}/can-bringup.service" "${SYSTEMD_DIR}/"
|
||||
|
||||
# Install udev rules
|
||||
log "Installing udev rules..."
|
||||
mkdir -p "${UDEV_DIR}"
|
||||
cp "${BRINGUP_UDEV}/70-canable.rules" "${UDEV_DIR}/"
|
||||
cp "${BRINGUP_UDEV}/90-magedok-touch.rules" "${UDEV_DIR}/"
|
||||
udevadm control --reload
|
||||
udevadm trigger --subsystem-match=net --action=add
|
||||
|
||||
# Reload and enable
|
||||
systemctl daemon-reload
|
||||
systemctl enable saltybot.target
|
||||
systemctl enable saltybot-social.service
|
||||
systemctl enable tailscale-vpn.service
|
||||
systemctl enable can-bringup.service
|
||||
|
||||
log "Services installed. Start with:"
|
||||
log " systemctl start saltybot-social"
|
||||
log " systemctl start tailscale-vpn"
|
||||
log " systemctl start can-bringup"
|
||||
log " journalctl -fu saltybot-social"
|
||||
log " journalctl -fu tailscale-vpn"
|
||||
log " journalctl -fu can-bringup"
|
||||
log ""
|
||||
log "Verify CAN bus: candump can0"
|
||||
log " VESC CAN IDs: 61 (0x3D) and 79 (0x4F)"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user