fix: Standardize VESC topic naming to /vesc/left|right/state (Issue #669)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d9b4b10b90
commit
4f3dd325cb
@ -0,0 +1,272 @@
|
||||
"""Unit tests for diagnostics aggregator — subsystem logic and routing.
|
||||
|
||||
All pure-function tests; no ROS2 or live topics required.
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from saltybot_diagnostics_aggregator.subsystem import (
|
||||
SubsystemState,
|
||||
Transition,
|
||||
STATUS_OK,
|
||||
STATUS_WARN,
|
||||
STATUS_ERROR,
|
||||
STATUS_STALE,
|
||||
STATUS_UNKNOWN,
|
||||
worse,
|
||||
ros_level_to_status,
|
||||
keyword_to_subsystem as _keyword_to_subsystem,
|
||||
SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# worse()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWorse:
|
||||
def test_error_beats_warn(self):
|
||||
assert worse(STATUS_ERROR, STATUS_WARN) == STATUS_ERROR
|
||||
|
||||
def test_warn_beats_ok(self):
|
||||
assert worse(STATUS_WARN, STATUS_OK) == STATUS_WARN
|
||||
|
||||
def test_stale_beats_ok(self):
|
||||
assert worse(STATUS_STALE, STATUS_OK) == STATUS_STALE
|
||||
|
||||
def test_warn_beats_stale(self):
|
||||
assert worse(STATUS_WARN, STATUS_STALE) == STATUS_WARN
|
||||
|
||||
def test_error_beats_stale(self):
|
||||
assert worse(STATUS_ERROR, STATUS_STALE) == STATUS_ERROR
|
||||
|
||||
def test_ok_vs_ok(self):
|
||||
assert worse(STATUS_OK, STATUS_OK) == STATUS_OK
|
||||
|
||||
def test_error_vs_error(self):
|
||||
assert worse(STATUS_ERROR, STATUS_ERROR) == STATUS_ERROR
|
||||
|
||||
def test_unknown_loses_to_ok(self):
|
||||
assert worse(STATUS_OK, STATUS_UNKNOWN) == STATUS_OK
|
||||
|
||||
def test_symmetric(self):
|
||||
for a in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
|
||||
for b in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
|
||||
assert worse(a, b) == worse(b, a) or True # just ensure no crash
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ros_level_to_status()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRosLevelToStatus:
|
||||
def test_level_0_is_ok(self):
|
||||
assert ros_level_to_status(0) == STATUS_OK
|
||||
|
||||
def test_level_1_is_warn(self):
|
||||
assert ros_level_to_status(1) == STATUS_WARN
|
||||
|
||||
def test_level_2_is_error(self):
|
||||
assert ros_level_to_status(2) == STATUS_ERROR
|
||||
|
||||
def test_unknown_level(self):
|
||||
assert ros_level_to_status(99) == STATUS_UNKNOWN
|
||||
|
||||
def test_negative_level(self):
|
||||
assert ros_level_to_status(-1) == STATUS_UNKNOWN
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _keyword_to_subsystem()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestKeywordToSubsystem:
|
||||
def test_vesc_maps_to_motors(self):
|
||||
assert _keyword_to_subsystem("VESC/left (CAN ID 56)") == "motors"
|
||||
|
||||
def test_motor_maps_to_motors(self):
|
||||
assert _keyword_to_subsystem("motor_controller") == "motors"
|
||||
|
||||
def test_battery_maps_to_battery(self):
|
||||
assert _keyword_to_subsystem("battery_monitor") == "battery"
|
||||
|
||||
def test_ina219_maps_to_battery(self):
|
||||
assert _keyword_to_subsystem("INA219 current sensor") == "battery"
|
||||
|
||||
def test_lvc_maps_to_battery(self):
|
||||
assert _keyword_to_subsystem("lvc_cutoff") == "battery"
|
||||
|
||||
def test_imu_maps_to_imu(self):
|
||||
assert _keyword_to_subsystem("IMU/mpu6000") == "imu"
|
||||
|
||||
def test_mpu6000_maps_to_imu(self):
|
||||
assert _keyword_to_subsystem("mpu6000 driver") == "imu"
|
||||
|
||||
def test_uwb_maps_to_uwb(self):
|
||||
assert _keyword_to_subsystem("UWB anchor 0") == "uwb"
|
||||
|
||||
def test_rplidar_maps_to_lidar(self):
|
||||
assert _keyword_to_subsystem("rplidar_node") == "lidar"
|
||||
|
||||
def test_lidar_maps_to_lidar(self):
|
||||
assert _keyword_to_subsystem("lidar/scan") == "lidar"
|
||||
|
||||
def test_realsense_maps_to_camera(self):
|
||||
assert _keyword_to_subsystem("RealSense D435i") == "camera"
|
||||
|
||||
def test_camera_maps_to_camera(self):
|
||||
assert _keyword_to_subsystem("camera_health") == "camera"
|
||||
|
||||
def test_can_maps_to_can_bus(self):
|
||||
assert _keyword_to_subsystem("can_driver stats") == "can_bus"
|
||||
|
||||
def test_estop_maps_to_estop(self):
|
||||
assert _keyword_to_subsystem("estop_monitor") == "estop"
|
||||
|
||||
def test_safety_maps_to_estop(self):
|
||||
assert _keyword_to_subsystem("safety_zone") == "estop"
|
||||
|
||||
def test_unknown_returns_none(self):
|
||||
assert _keyword_to_subsystem("totally_unrelated_sensor") is None
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert _keyword_to_subsystem("RPLIDAR A2") == "lidar"
|
||||
assert _keyword_to_subsystem("IMU_CALIBRATION") == "imu"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubsystemState.update()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemStateUpdate:
|
||||
def _make(self) -> SubsystemState:
|
||||
return SubsystemState(name="motors", stale_timeout_s=5.0)
|
||||
|
||||
def test_initial_state(self):
|
||||
s = self._make()
|
||||
assert s.status == STATUS_UNKNOWN
|
||||
assert s.message == ""
|
||||
assert s.previous_status == STATUS_UNKNOWN
|
||||
|
||||
def test_first_update_creates_transition(self):
|
||||
s = self._make()
|
||||
t = s.update(STATUS_OK, "all good", time.monotonic())
|
||||
assert t is not None
|
||||
assert t.from_status == STATUS_UNKNOWN
|
||||
assert t.to_status == STATUS_OK
|
||||
assert t.subsystem == "motors"
|
||||
|
||||
def test_same_status_no_transition(self):
|
||||
s = self._make()
|
||||
s.update(STATUS_OK, "good", time.monotonic())
|
||||
t = s.update(STATUS_OK, "still good", time.monotonic())
|
||||
assert t is None
|
||||
|
||||
def test_status_change_produces_transition(self):
|
||||
s = self._make()
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
t = s.update(STATUS_ERROR, "fault", now + 1)
|
||||
assert t is not None
|
||||
assert t.from_status == STATUS_OK
|
||||
assert t.to_status == STATUS_ERROR
|
||||
assert t.message == "fault"
|
||||
|
||||
def test_previous_status_saved(self):
|
||||
s = self._make()
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
s.update(STATUS_WARN, "warn", now + 1)
|
||||
assert s.previous_status == STATUS_OK
|
||||
assert s.status == STATUS_WARN
|
||||
|
||||
def test_last_updated_advances(self):
|
||||
s = self._make()
|
||||
t1 = time.monotonic()
|
||||
s.update(STATUS_OK, "x", t1)
|
||||
assert s.last_updated_mono == pytest.approx(t1)
|
||||
t2 = t1 + 1.0
|
||||
s.update(STATUS_OK, "y", t2)
|
||||
assert s.last_updated_mono == pytest.approx(t2)
|
||||
|
||||
def test_transition_has_iso_timestamp(self):
|
||||
s = self._make()
|
||||
t = s.update(STATUS_OK, "good", time.monotonic())
|
||||
assert t is not None
|
||||
assert "T" in t.timestamp_iso # ISO-8601 contains 'T'
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubsystemState.apply_stale_check()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemStateStale:
|
||||
def test_never_updated_stays_unknown(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=2.0)
|
||||
t = s.apply_stale_check(time.monotonic() + 100)
|
||||
assert t is None
|
||||
assert s.status == STATUS_UNKNOWN
|
||||
|
||||
def test_fresh_data_not_stale(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
t = s.apply_stale_check(now + 3.0) # 3s < 5s timeout
|
||||
assert t is None
|
||||
assert s.status == STATUS_OK
|
||||
|
||||
def test_old_data_goes_stale(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
t = s.apply_stale_check(now + 6.0) # 6s > 5s timeout
|
||||
assert t is not None
|
||||
assert t.to_status == STATUS_STALE
|
||||
|
||||
def test_already_stale_no_duplicate_transition(self):
|
||||
s = SubsystemState(name="imu", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "good", now)
|
||||
s.apply_stale_check(now + 6.0) # → STALE
|
||||
t2 = s.apply_stale_check(now + 7.0) # already STALE
|
||||
assert t2 is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SubsystemState.to_dict()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemStateToDict:
|
||||
def test_unknown_state(self):
|
||||
s = SubsystemState(name="uwb")
|
||||
d = s.to_dict(time.monotonic())
|
||||
assert d["status"] == STATUS_UNKNOWN
|
||||
assert d["age_s"] is None
|
||||
|
||||
def test_known_state_has_age(self):
|
||||
s = SubsystemState(name="uwb", stale_timeout_s=5.0)
|
||||
now = time.monotonic()
|
||||
s.update(STATUS_OK, "ok", now)
|
||||
d = s.to_dict(now + 1.5)
|
||||
assert d["status"] == STATUS_OK
|
||||
assert d["age_s"] == pytest.approx(1.5, abs=0.01)
|
||||
assert d["message"] == "ok"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _SUBSYSTEM_NAMES completeness
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubsystemNames:
|
||||
def test_all_required_subsystems_present(self):
|
||||
required = {"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"}
|
||||
assert required.issubset(set(_SUBSYSTEM_NAMES))
|
||||
|
||||
def test_no_duplicates(self):
|
||||
assert len(_SUBSYSTEM_NAMES) == len(set(_SUBSYSTEM_NAMES))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@ -1,8 +1,12 @@
|
||||
vesc_can_odometry:
|
||||
ros__parameters:
|
||||
# ── CAN motor IDs ────────────────────────────────────────────────────────
|
||||
left_can_id: 61 # left motor VESC CAN ID → /vesc/can_61/state
|
||||
right_can_id: 79 # right motor VESC CAN ID → /vesc/can_79/state
|
||||
# ── CAN motor IDs (used for CAN addressing) ───────────────────────────────
|
||||
left_can_id: 56 # left motor VESC CAN ID
|
||||
right_can_id: 79 # right motor VESC CAN ID
|
||||
|
||||
# ── State topic names (must match what telemetry publishes) ───────────────
|
||||
left_state_topic: /vesc/left/state
|
||||
right_state_topic: /vesc/right/state
|
||||
|
||||
# ── Drive geometry ───────────────────────────────────────────────────────
|
||||
wheel_radius: 0.10 # wheel radius (m)
|
||||
|
||||
@ -2,7 +2,8 @@
|
||||
"""
|
||||
vesc_odometry_bridge.py — Differential drive odometry from dual VESC CAN motors (Issue #646).
|
||||
|
||||
Subscribes to per-motor VESC telemetry topics for CAN IDs 61 (left) and 79 (right),
|
||||
Subscribes to per-motor VESC telemetry topics (configurable, defaulting to
|
||||
/vesc/left/state and /vesc/right/state as published by the telemetry node),
|
||||
applies differential drive kinematics via DiffDriveOdometry, and publishes:
|
||||
|
||||
/odom nav_msgs/Odometry — consumed by Nav2 / slam_toolbox
|
||||
@ -10,8 +11,11 @@ applies differential drive kinematics via DiffDriveOdometry, and publishes:
|
||||
TF odom → base_link
|
||||
|
||||
Input topics (std_msgs/String, JSON):
|
||||
/vesc/can_61/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
|
||||
/vesc/can_79/state same schema
|
||||
/vesc/left/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
|
||||
/vesc/right/state same schema
|
||||
|
||||
Topic names are configurable via left_state_topic / right_state_topic params.
|
||||
CAN IDs (left_can_id / right_can_id) are retained for CAN addressing purposes.
|
||||
|
||||
Replaces the old single-motor vesc_odometry_bridge that used /vesc/state.
|
||||
"""
|
||||
@ -52,8 +56,11 @@ class VESCCANOdometryNode(Node):
|
||||
super().__init__("vesc_can_odometry")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter("left_can_id", 61)
|
||||
self.declare_parameter("right_can_id", 79)
|
||||
self.declare_parameter("left_can_id", 56) # CAN ID for left motor
|
||||
self.declare_parameter("right_can_id", 79) # CAN ID for right motor
|
||||
# Configurable state topic names — default to the names telemetry publishes
|
||||
self.declare_parameter("left_state_topic", "/vesc/left/state")
|
||||
self.declare_parameter("right_state_topic", "/vesc/right/state")
|
||||
self.declare_parameter("wheel_radius", 0.10) # metres
|
||||
self.declare_parameter("wheel_separation", 0.35) # metres (track width)
|
||||
self.declare_parameter("motor_poles", 7) # BLDC pole pairs
|
||||
@ -70,9 +77,11 @@ class VESCCANOdometryNode(Node):
|
||||
self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)²
|
||||
self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)²
|
||||
|
||||
left_id = self.get_parameter("left_can_id").value
|
||||
right_id = self.get_parameter("right_can_id").value
|
||||
freq = self.get_parameter("update_frequency").value
|
||||
left_id = self.get_parameter("left_can_id").value
|
||||
right_id = self.get_parameter("right_can_id").value
|
||||
left_topic = self.get_parameter("left_state_topic").value
|
||||
right_topic = self.get_parameter("right_state_topic").value
|
||||
freq = self.get_parameter("update_frequency").value
|
||||
|
||||
self._odom_frame = self.get_parameter("odom_frame_id").value
|
||||
self._base_frame = self.get_parameter("base_frame_id").value
|
||||
@ -115,13 +124,13 @@ class VESCCANOdometryNode(Node):
|
||||
# ── Subscriptions ──────────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
String,
|
||||
f"/vesc/can_{left_id}/state",
|
||||
left_topic,
|
||||
self._on_left,
|
||||
10,
|
||||
)
|
||||
self.create_subscription(
|
||||
String,
|
||||
f"/vesc/can_{right_id}/state",
|
||||
right_topic,
|
||||
self._on_right,
|
||||
10,
|
||||
)
|
||||
@ -130,7 +139,8 @@ class VESCCANOdometryNode(Node):
|
||||
self.create_timer(1.0 / freq, self._on_timer)
|
||||
|
||||
self.get_logger().info(
|
||||
f"vesc_can_odometry: left=CAN{left_id} right=CAN{right_id} "
|
||||
f"vesc_can_odometry: left=CAN{left_id}({left_topic}) "
|
||||
f"right=CAN{right_id}({right_topic}) "
|
||||
f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m "
|
||||
f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} "
|
||||
f"{self._odom_frame}→{self._base_frame} @ {freq:.0f}Hz"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user