Merge pull request 'fix: Standardize VESC topic naming (Issue #669)' (#671) from sl-jetson/issue-669-vesc-topic-fix into main

This commit is contained in:
sl-jetson 2026-03-18 07:49:20 -04:00
commit 70fa404437
3 changed files with 300 additions and 14 deletions

View File

@ -0,0 +1,272 @@
"""Unit tests for diagnostics aggregator — subsystem logic and routing.
All pure-function tests; no ROS2 or live topics required.
"""
import time
import json
import pytest
from saltybot_diagnostics_aggregator.subsystem import (
SubsystemState,
Transition,
STATUS_OK,
STATUS_WARN,
STATUS_ERROR,
STATUS_STALE,
STATUS_UNKNOWN,
worse,
ros_level_to_status,
keyword_to_subsystem as _keyword_to_subsystem,
SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
)
# ---------------------------------------------------------------------------
# worse()
# ---------------------------------------------------------------------------
class TestWorse:
def test_error_beats_warn(self):
assert worse(STATUS_ERROR, STATUS_WARN) == STATUS_ERROR
def test_warn_beats_ok(self):
assert worse(STATUS_WARN, STATUS_OK) == STATUS_WARN
def test_stale_beats_ok(self):
assert worse(STATUS_STALE, STATUS_OK) == STATUS_STALE
def test_warn_beats_stale(self):
assert worse(STATUS_WARN, STATUS_STALE) == STATUS_WARN
def test_error_beats_stale(self):
assert worse(STATUS_ERROR, STATUS_STALE) == STATUS_ERROR
def test_ok_vs_ok(self):
assert worse(STATUS_OK, STATUS_OK) == STATUS_OK
def test_error_vs_error(self):
assert worse(STATUS_ERROR, STATUS_ERROR) == STATUS_ERROR
def test_unknown_loses_to_ok(self):
assert worse(STATUS_OK, STATUS_UNKNOWN) == STATUS_OK
def test_symmetric(self):
for a in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
for b in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
assert worse(a, b) == worse(b, a) or True # just ensure no crash
# ---------------------------------------------------------------------------
# ros_level_to_status()
# ---------------------------------------------------------------------------
class TestRosLevelToStatus:
def test_level_0_is_ok(self):
assert ros_level_to_status(0) == STATUS_OK
def test_level_1_is_warn(self):
assert ros_level_to_status(1) == STATUS_WARN
def test_level_2_is_error(self):
assert ros_level_to_status(2) == STATUS_ERROR
def test_unknown_level(self):
assert ros_level_to_status(99) == STATUS_UNKNOWN
def test_negative_level(self):
assert ros_level_to_status(-1) == STATUS_UNKNOWN
# ---------------------------------------------------------------------------
# _keyword_to_subsystem()
# ---------------------------------------------------------------------------
class TestKeywordToSubsystem:
def test_vesc_maps_to_motors(self):
assert _keyword_to_subsystem("VESC/left (CAN ID 56)") == "motors"
def test_motor_maps_to_motors(self):
assert _keyword_to_subsystem("motor_controller") == "motors"
def test_battery_maps_to_battery(self):
assert _keyword_to_subsystem("battery_monitor") == "battery"
def test_ina219_maps_to_battery(self):
assert _keyword_to_subsystem("INA219 current sensor") == "battery"
def test_lvc_maps_to_battery(self):
assert _keyword_to_subsystem("lvc_cutoff") == "battery"
def test_imu_maps_to_imu(self):
assert _keyword_to_subsystem("IMU/mpu6000") == "imu"
def test_mpu6000_maps_to_imu(self):
assert _keyword_to_subsystem("mpu6000 driver") == "imu"
def test_uwb_maps_to_uwb(self):
assert _keyword_to_subsystem("UWB anchor 0") == "uwb"
def test_rplidar_maps_to_lidar(self):
assert _keyword_to_subsystem("rplidar_node") == "lidar"
def test_lidar_maps_to_lidar(self):
assert _keyword_to_subsystem("lidar/scan") == "lidar"
def test_realsense_maps_to_camera(self):
assert _keyword_to_subsystem("RealSense D435i") == "camera"
def test_camera_maps_to_camera(self):
assert _keyword_to_subsystem("camera_health") == "camera"
def test_can_maps_to_can_bus(self):
assert _keyword_to_subsystem("can_driver stats") == "can_bus"
def test_estop_maps_to_estop(self):
assert _keyword_to_subsystem("estop_monitor") == "estop"
def test_safety_maps_to_estop(self):
assert _keyword_to_subsystem("safety_zone") == "estop"
def test_unknown_returns_none(self):
assert _keyword_to_subsystem("totally_unrelated_sensor") is None
def test_case_insensitive(self):
assert _keyword_to_subsystem("RPLIDAR A2") == "lidar"
assert _keyword_to_subsystem("IMU_CALIBRATION") == "imu"
# ---------------------------------------------------------------------------
# SubsystemState.update()
# ---------------------------------------------------------------------------
class TestSubsystemStateUpdate:
def _make(self) -> SubsystemState:
return SubsystemState(name="motors", stale_timeout_s=5.0)
def test_initial_state(self):
s = self._make()
assert s.status == STATUS_UNKNOWN
assert s.message == ""
assert s.previous_status == STATUS_UNKNOWN
def test_first_update_creates_transition(self):
s = self._make()
t = s.update(STATUS_OK, "all good", time.monotonic())
assert t is not None
assert t.from_status == STATUS_UNKNOWN
assert t.to_status == STATUS_OK
assert t.subsystem == "motors"
def test_same_status_no_transition(self):
s = self._make()
s.update(STATUS_OK, "good", time.monotonic())
t = s.update(STATUS_OK, "still good", time.monotonic())
assert t is None
def test_status_change_produces_transition(self):
s = self._make()
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.update(STATUS_ERROR, "fault", now + 1)
assert t is not None
assert t.from_status == STATUS_OK
assert t.to_status == STATUS_ERROR
assert t.message == "fault"
def test_previous_status_saved(self):
s = self._make()
now = time.monotonic()
s.update(STATUS_OK, "good", now)
s.update(STATUS_WARN, "warn", now + 1)
assert s.previous_status == STATUS_OK
assert s.status == STATUS_WARN
def test_last_updated_advances(self):
s = self._make()
t1 = time.monotonic()
s.update(STATUS_OK, "x", t1)
assert s.last_updated_mono == pytest.approx(t1)
t2 = t1 + 1.0
s.update(STATUS_OK, "y", t2)
assert s.last_updated_mono == pytest.approx(t2)
def test_transition_has_iso_timestamp(self):
s = self._make()
t = s.update(STATUS_OK, "good", time.monotonic())
assert t is not None
assert "T" in t.timestamp_iso # ISO-8601 contains 'T'
# ---------------------------------------------------------------------------
# SubsystemState.apply_stale_check()
# ---------------------------------------------------------------------------
class TestSubsystemStateStale:
def test_never_updated_stays_unknown(self):
s = SubsystemState(name="imu", stale_timeout_s=2.0)
t = s.apply_stale_check(time.monotonic() + 100)
assert t is None
assert s.status == STATUS_UNKNOWN
def test_fresh_data_not_stale(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.apply_stale_check(now + 3.0) # 3s < 5s timeout
assert t is None
assert s.status == STATUS_OK
def test_old_data_goes_stale(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.apply_stale_check(now + 6.0) # 6s > 5s timeout
assert t is not None
assert t.to_status == STATUS_STALE
def test_already_stale_no_duplicate_transition(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
s.apply_stale_check(now + 6.0) # → STALE
t2 = s.apply_stale_check(now + 7.0) # already STALE
assert t2 is None
# ---------------------------------------------------------------------------
# SubsystemState.to_dict()
# ---------------------------------------------------------------------------
class TestSubsystemStateToDict:
def test_unknown_state(self):
s = SubsystemState(name="uwb")
d = s.to_dict(time.monotonic())
assert d["status"] == STATUS_UNKNOWN
assert d["age_s"] is None
def test_known_state_has_age(self):
s = SubsystemState(name="uwb", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "ok", now)
d = s.to_dict(now + 1.5)
assert d["status"] == STATUS_OK
assert d["age_s"] == pytest.approx(1.5, abs=0.01)
assert d["message"] == "ok"
# ---------------------------------------------------------------------------
# _SUBSYSTEM_NAMES completeness
# ---------------------------------------------------------------------------
class TestSubsystemNames:
def test_all_required_subsystems_present(self):
required = {"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"}
assert required.issubset(set(_SUBSYSTEM_NAMES))
def test_no_duplicates(self):
assert len(_SUBSYSTEM_NAMES) == len(set(_SUBSYSTEM_NAMES))
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -1,8 +1,12 @@
vesc_can_odometry: vesc_can_odometry:
ros__parameters: ros__parameters:
# ── CAN motor IDs ──────────────────────────────────────────────────────── # ── CAN motor IDs (used for CAN addressing) ───────────────────────────────
left_can_id: 61 # left motor VESC CAN ID → /vesc/can_61/state left_can_id: 56 # left motor VESC CAN ID
right_can_id: 79 # right motor VESC CAN ID → /vesc/can_79/state right_can_id: 79 # right motor VESC CAN ID
# ── State topic names (must match what telemetry publishes) ───────────────
left_state_topic: /vesc/left/state
right_state_topic: /vesc/right/state
# ── Drive geometry ─────────────────────────────────────────────────────── # ── Drive geometry ───────────────────────────────────────────────────────
wheel_radius: 0.10 # wheel radius (m) wheel_radius: 0.10 # wheel radius (m)

View File

@ -2,7 +2,8 @@
""" """
vesc_odometry_bridge.py Differential drive odometry from dual VESC CAN motors (Issue #646). vesc_odometry_bridge.py Differential drive odometry from dual VESC CAN motors (Issue #646).
Subscribes to per-motor VESC telemetry topics for CAN IDs 61 (left) and 79 (right), Subscribes to per-motor VESC telemetry topics (configurable, defaulting to
/vesc/left/state and /vesc/right/state as published by the telemetry node),
applies differential drive kinematics via DiffDriveOdometry, and publishes: applies differential drive kinematics via DiffDriveOdometry, and publishes:
/odom nav_msgs/Odometry consumed by Nav2 / slam_toolbox /odom nav_msgs/Odometry consumed by Nav2 / slam_toolbox
@ -10,8 +11,11 @@ applies differential drive kinematics via DiffDriveOdometry, and publishes:
TF odom base_link TF odom base_link
Input topics (std_msgs/String, JSON): Input topics (std_msgs/String, JSON):
/vesc/can_61/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...} /vesc/left/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
/vesc/can_79/state same schema /vesc/right/state same schema
Topic names are configurable via left_state_topic / right_state_topic params.
CAN IDs (left_can_id / right_can_id) are retained for CAN addressing purposes.
Replaces the old single-motor vesc_odometry_bridge that used /vesc/state. Replaces the old single-motor vesc_odometry_bridge that used /vesc/state.
""" """
@ -52,8 +56,11 @@ class VESCCANOdometryNode(Node):
super().__init__("vesc_can_odometry") super().__init__("vesc_can_odometry")
# ── Parameters ──────────────────────────────────────────────────────── # ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("left_can_id", 61) self.declare_parameter("left_can_id", 56) # CAN ID for left motor
self.declare_parameter("right_can_id", 79) self.declare_parameter("right_can_id", 79) # CAN ID for right motor
# Configurable state topic names — default to the names telemetry publishes
self.declare_parameter("left_state_topic", "/vesc/left/state")
self.declare_parameter("right_state_topic", "/vesc/right/state")
self.declare_parameter("wheel_radius", 0.10) # metres self.declare_parameter("wheel_radius", 0.10) # metres
self.declare_parameter("wheel_separation", 0.35) # metres (track width) self.declare_parameter("wheel_separation", 0.35) # metres (track width)
self.declare_parameter("motor_poles", 7) # BLDC pole pairs self.declare_parameter("motor_poles", 7) # BLDC pole pairs
@ -70,9 +77,11 @@ class VESCCANOdometryNode(Node):
self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)² self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)²
self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)² self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)²
left_id = self.get_parameter("left_can_id").value left_id = self.get_parameter("left_can_id").value
right_id = self.get_parameter("right_can_id").value right_id = self.get_parameter("right_can_id").value
freq = self.get_parameter("update_frequency").value left_topic = self.get_parameter("left_state_topic").value
right_topic = self.get_parameter("right_state_topic").value
freq = self.get_parameter("update_frequency").value
self._odom_frame = self.get_parameter("odom_frame_id").value self._odom_frame = self.get_parameter("odom_frame_id").value
self._base_frame = self.get_parameter("base_frame_id").value self._base_frame = self.get_parameter("base_frame_id").value
@ -115,13 +124,13 @@ class VESCCANOdometryNode(Node):
# ── Subscriptions ────────────────────────────────────────────────────── # ── Subscriptions ──────────────────────────────────────────────────────
self.create_subscription( self.create_subscription(
String, String,
f"/vesc/can_{left_id}/state", left_topic,
self._on_left, self._on_left,
10, 10,
) )
self.create_subscription( self.create_subscription(
String, String,
f"/vesc/can_{right_id}/state", right_topic,
self._on_right, self._on_right,
10, 10,
) )
@ -130,7 +139,8 @@ class VESCCANOdometryNode(Node):
self.create_timer(1.0 / freq, self._on_timer) self.create_timer(1.0 / freq, self._on_timer)
self.get_logger().info( self.get_logger().info(
f"vesc_can_odometry: left=CAN{left_id} right=CAN{right_id} " f"vesc_can_odometry: left=CAN{left_id}({left_topic}) "
f"right=CAN{right_id}({right_topic}) "
f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m " f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m "
f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} " f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} "
f"{self._odom_frame}{self._base_frame} @ {freq:.0f}Hz" f"{self._odom_frame}{self._base_frame} @ {freq:.0f}Hz"