feat: VESC CAN health monitor (Issue #651)

New package: saltybot_vesc_health

- recovery_fsm.py: pure state machine (no ROS2/CAN deps; fully unit-tested)
  - VescHealthState: HEALTHY → DEGRADED (>500 ms) → ESTOP (>2 s) / BUS_OFF
  - VescMonitor.tick(): drives recovery sequence per VESC; startup-safe
  - VescMonitor.on_frame(): resets state on CAN frame arrival
  - VescMonitor.on_bus_off/on_bus_ok(): bus-off override + recovery
  - HealthFsm: dual-VESC wrapper aggregating both monitors

- health_monitor_node.py: ROS2 node
  - Subscribes /vesc/left/state + /vesc/right/state (JSON from vesc_telemetry)
  - Sends GET_VALUES alive frames via SocketCAN on DEGRADED state
  - Publishes /vesc/health (JSON, 10 Hz) — state, elapsed, recent faults
  - Publishes /diagnostics (DiagnosticArray, OK/WARN/ERROR per VESC)
  - Publishes /estop (JSON event) + zero /cmd_vel on e-stop trigger/clear
  - Polls ip link for bus-off state (1 Hz)
  - 200-entry fault event log included in /vesc/health

- test/test_vesc_health.py: 39 unit tests, all passing, no hardware needed
- config/vesc_health_params.yaml, launch/vesc_health.launch.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-jetson 2026-03-17 11:44:55 -04:00
parent d9b4b10b90
commit eab26c35c5
11 changed files with 1145 additions and 0 deletions

View File

@ -0,0 +1,13 @@
# VESC CAN Health Monitor — SaltyBot dual FSESC 6.7 Pro
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/651
vesc_health_monitor:
ros__parameters:
can_interface: "can0"
left_can_id: 61
right_can_id: 79
sender_can_id: 127
frame_timeout_s: 0.5
estop_timeout_s: 2.0
health_rate_hz: 10
bus_off_check_rate_hz: 1

View File

@ -0,0 +1,38 @@
"""Launch file for VESC CAN health monitor node."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_dir = get_package_share_directory("saltybot_vesc_health")
config_file = os.path.join(pkg_dir, "config", "vesc_health_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("config_file", default_value=config_file,
description="Path to vesc_health_params.yaml"),
DeclareLaunchArgument("can_interface", default_value="can0",
description="SocketCAN interface name"),
DeclareLaunchArgument("frame_timeout_s", default_value="0.5",
description="Frame-drop detection threshold (s)"),
DeclareLaunchArgument("estop_timeout_s", default_value="2.0",
description="E-stop escalation threshold (s)"),
Node(
package="saltybot_vesc_health",
executable="vesc_health_node",
name="vesc_health_monitor",
output="screen",
parameters=[
LaunchConfiguration("config_file"),
{
"can_interface": LaunchConfiguration("can_interface"),
"frame_timeout_s": LaunchConfiguration("frame_timeout_s"),
"estop_timeout_s": LaunchConfiguration("estop_timeout_s"),
},
],
),
])

View File

@ -0,0 +1,33 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_vesc_health</name>
<version>0.1.0</version>
<description>
VESC CAN health monitor for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
Monitors CAN IDs 61 (left) and 79 (right) for frame drops, bus-off errors,
and timeouts (&gt;500 ms). Drives a recovery sequence (GET_VALUES alive
frames), escalates to e-stop if unresponsive &gt;2 s. Publishes /vesc/health
(JSON), /diagnostics, /estop, and /cmd_vel (zero on e-stop). Issue #651.
</description>
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>diagnostic_msgs</depend>
<exec_depend>python3-can</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,450 @@
#!/usr/bin/env python3
"""VESC CAN health monitor node for SaltyBot.
Monitors both VESC motor controllers (CAN IDs 61 and 79) for frame drops,
bus-off errors, and timeouts. Drives a recovery sequence on fault and
escalates to an e-stop if a VESC stays unresponsive beyond 2 s.
Subscribed topics
-----------------
/vesc/left/state (std_msgs/String) JSON telemetry from vesc_telemetry node
/vesc/right/state (std_msgs/String) JSON telemetry from vesc_telemetry node
Published topics
----------------
/vesc/health (std_msgs/String) JSON health summary (10 Hz)
/diagnostics (diagnostic_msgs/DiagnosticArray)
/estop (std_msgs/String) JSON e-stop event on state change
/cmd_vel (geometry_msgs/Twist) zero velocity when e-stop active
Parameters
----------
can_interface str 'can0' SocketCAN interface
left_can_id int 61
right_can_id int 79
sender_can_id int 127 This node's CAN ID for alive frames
frame_timeout_s float 0.5 Timeout before recovery starts
estop_timeout_s float 2.0 Timeout before e-stop escalation
health_rate_hz int 10 Publish + watchdog rate
bus_off_check_rate_hz int 1 How often to poll CAN interface state
"""
from __future__ import annotations
import json
import subprocess
import threading
import time
from typing import Optional
try:
import rclpy
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from geometry_msgs.msg import Twist
from rclpy.node import Node
from std_msgs.msg import String
_ROS_AVAILABLE = True
except ImportError: # pragma: no cover — missing outside ROS2 env
_ROS_AVAILABLE = False
Node = object # type: ignore[assignment,misc]
try:
import can as python_can
_CAN_AVAILABLE = True
except ImportError:
_CAN_AVAILABLE = False
from .recovery_fsm import Action, HealthFsm, VescHealthState
class VescHealthMonitor(Node):
"""ROS2 node: VESC CAN health watchdog + auto-recovery."""
def __init__(self) -> None:
super().__init__("vesc_health_monitor")
# ----------------------------------------------------------------
# Parameters
# ----------------------------------------------------------------
self.declare_parameter("can_interface", "can0")
self.declare_parameter("left_can_id", 61)
self.declare_parameter("right_can_id", 79)
self.declare_parameter("sender_can_id", 127)
self.declare_parameter("frame_timeout_s", 0.5)
self.declare_parameter("estop_timeout_s", 2.0)
self.declare_parameter("health_rate_hz", 10)
self.declare_parameter("bus_off_check_rate_hz", 1)
self._iface = self.get_parameter("can_interface").value
self._left_id = self.get_parameter("left_can_id").value
self._right_id = self.get_parameter("right_can_id").value
self._sender_id = self.get_parameter("sender_can_id").value
timeout_s = self.get_parameter("frame_timeout_s").value
estop_s = self.get_parameter("estop_timeout_s").value
hz = max(1, int(self.get_parameter("health_rate_hz").value))
bus_hz = max(1, int(self.get_parameter("bus_off_check_rate_hz").value))
# ----------------------------------------------------------------
# State machine
# ----------------------------------------------------------------
self._fsm = HealthFsm(
left_id=self._left_id,
right_id=self._right_id,
timeout_s=timeout_s,
estop_s=estop_s,
)
self._fsm_lock = threading.Lock()
# ----------------------------------------------------------------
# Fault event log (ring buffer, newest last, max 200 entries)
# ----------------------------------------------------------------
self._fault_log: list[dict] = []
self._fault_log_lock = threading.Lock()
# ----------------------------------------------------------------
# SocketCAN (for sending alive / GET_VALUES recovery frames)
# ----------------------------------------------------------------
self._bus: Optional["python_can.BusABC"] = None
self._init_can()
# ----------------------------------------------------------------
# Publishers
# ----------------------------------------------------------------
self._pub_health = self.create_publisher(String, "/vesc/health", 10)
self._pub_diag = self.create_publisher(DiagnosticArray, "/diagnostics", 10)
self._pub_estop = self.create_publisher(String, "/estop", 10)
self._pub_cmd = self.create_publisher(Twist, "/cmd_vel", 10)
# ----------------------------------------------------------------
# Subscribers
# ----------------------------------------------------------------
self.create_subscription(String, "/vesc/left/state", self._on_left_state, 10)
self.create_subscription(String, "/vesc/right/state", self._on_right_state, 10)
# ----------------------------------------------------------------
# Timers
# ----------------------------------------------------------------
self.create_timer(1.0 / hz, self._on_health_tick)
self.create_timer(1.0 / bus_hz, self._check_bus_off)
self.get_logger().info(
f"vesc_health_monitor: iface={self._iface}, "
f"left={self._left_id}, right={self._right_id}, "
f"timeout={timeout_s}s, estop={estop_s}s, {hz} Hz"
)
# ----------------------------------------------------------------
# CAN initialisation
# ----------------------------------------------------------------
def _init_can(self) -> None:
if not _CAN_AVAILABLE:
self.get_logger().warn(
"python-can not installed — alive frame recovery disabled"
)
return
try:
self._bus = python_can.interface.Bus(
channel=self._iface, bustype="socketcan"
)
self.get_logger().info(f"CAN bus opened for recovery: {self._iface}")
except Exception as exc:
self.get_logger().warn(
f"Could not open CAN bus for recovery frames ({exc}); "
"health monitoring continues via topic subscription"
)
# ----------------------------------------------------------------
# State subscribers
# ----------------------------------------------------------------
def _on_left_state(self, msg: String) -> None:
self._handle_state_msg(msg, self._left_id)
def _on_right_state(self, msg: String) -> None:
self._handle_state_msg(msg, self._right_id)
def _handle_state_msg(self, msg: String, can_id: int) -> None:
try:
d = json.loads(msg.data)
except json.JSONDecodeError:
return
# The telemetry node sets "alive": true when the VESC is responding.
# We track our own timeout independently for tighter control.
now = time.monotonic()
if d.get("alive", False):
with self._fsm_lock:
actions = self._fsm.on_frame(can_id, now)
self._dispatch_actions(actions, can_id)
# ----------------------------------------------------------------
# Health timer (main watchdog loop)
# ----------------------------------------------------------------
def _on_health_tick(self) -> None:
now = time.monotonic()
with self._fsm_lock:
per_vesc = self._fsm.tick(now)
for can_id, actions in per_vesc.items():
self._dispatch_actions(actions, can_id)
self._publish_health(now)
self._publish_diagnostics(now)
# If any VESC is in e-stop, keep asserting zero velocity
with self._fsm_lock:
if self._fsm.any_estop:
self._pub_cmd.publish(Twist()) # zero Twist = stop
# ----------------------------------------------------------------
# Bus-off check
# ----------------------------------------------------------------
def _check_bus_off(self) -> None:
bus_off = self._is_can_bus_off()
now = time.monotonic()
with self._fsm_lock:
if bus_off:
actions = self._fsm.on_bus_off(now)
else:
actions = self._fsm.on_bus_ok(now)
self._dispatch_actions(actions, can_id=None)
def _is_can_bus_off(self) -> bool:
"""Return True if the CAN interface is in bus-off state."""
try:
result = subprocess.run(
["ip", "-json", "link", "show", self._iface],
capture_output=True, text=True, timeout=2.0
)
if result.returncode != 0:
return False
data = json.loads(result.stdout)
if not data:
return False
flags = data[0].get("flags", [])
# Linux SocketCAN sets "BUS-OFF" in the flags list
return "BUS-OFF" in [f.upper() for f in flags]
except Exception:
return False
# ----------------------------------------------------------------
# Action dispatcher
# ----------------------------------------------------------------
def _dispatch_actions(self, actions: list[Action], can_id: int | None) -> None:
label = self._id_to_label(can_id) if can_id is not None else "bus"
for action in actions:
if action == Action.SEND_ALIVE:
self._send_alive(can_id)
elif action == Action.TRIGGER_ESTOP:
self._on_estop_trigger(can_id, label)
elif action == Action.CLEAR_ESTOP:
self._on_estop_clear(can_id, label)
elif action == Action.LOG_WARN:
self.get_logger().warn(
f"VESC {label} (id={can_id}): "
"no CAN frames for >500 ms — sending alive frame"
)
self._log_fault_event("timeout", can_id, label)
elif action == Action.LOG_ERROR:
self.get_logger().error(
f"VESC {label} (id={can_id}): "
"unresponsive >2 s — e-stop triggered"
)
self._log_fault_event("estop", can_id, label)
elif action == Action.LOG_RECOVERY:
self.get_logger().info(
f"VESC {label} (id={can_id}): CAN frames resumed — recovered"
)
self._log_fault_event("recovery", can_id, label)
def _on_estop_trigger(self, can_id: int | None, label: str) -> None:
self._pub_cmd.publish(Twist()) # immediate zero velocity
payload = json.dumps({
"event": "estop_triggered",
"can_id": can_id,
"label": label,
"stamp": time.time(),
})
self._pub_estop.publish(String(data=payload))
def _on_estop_clear(self, can_id: int | None, label: str) -> None:
payload = json.dumps({
"event": "estop_cleared",
"can_id": can_id,
"label": label,
"stamp": time.time(),
})
self._pub_estop.publish(String(data=payload))
# ----------------------------------------------------------------
# Alive / recovery frame
# ----------------------------------------------------------------
def _send_alive(self, can_id: int | None) -> None:
"""Send GET_VALUES request to prompt telemetry refresh."""
if self._bus is None or can_id is None:
return
try:
arb_id, payload = _make_get_values_request(self._sender_id, can_id)
self._bus.send(python_can.Message(
arbitration_id=arb_id,
data=payload,
is_extended_id=True,
))
self.get_logger().debug(
f"Sent GET_VALUES alive frame to VESC id={can_id}"
)
except Exception as exc:
self.get_logger().warn(f"Failed to send alive frame (id={can_id}): {exc}")
# ----------------------------------------------------------------
# Publishers
# ----------------------------------------------------------------
def _publish_health(self, now: float) -> None:
with self._fsm_lock:
left_state = self._fsm.left.state.value
right_state = self._fsm.right.state.value
left_elapsed = self._fsm.left.elapsed(now)
right_elapsed = self._fsm.right.elapsed(now)
left_estop = self._fsm.left.estop_active
right_estop = self._fsm.right.estop_active
bus_off = self._fsm._bus_off
any_estop = self._fsm.any_estop
health = {
"stamp": time.time(),
"left": {
"can_id": self._left_id,
"state": left_state,
"elapsed_s": round(min(left_elapsed, 9999.0), 3),
"estop_active": left_estop,
},
"right": {
"can_id": self._right_id,
"state": right_state,
"elapsed_s": round(min(right_elapsed, 9999.0), 3),
"estop_active": right_estop,
},
"bus_off": bus_off,
"estop_active": any_estop,
}
with self._fault_log_lock:
health["recent_faults"] = list(self._fault_log[-10:])
self._pub_health.publish(String(data=json.dumps(health)))
def _publish_diagnostics(self, now: float) -> None:
array = DiagnosticArray()
array.header.stamp = self.get_clock().now().to_msg()
with self._fsm_lock:
monitors = [
(self._fsm.left, "left"),
(self._fsm.right, "right"),
]
for mon, label in monitors:
status = DiagnosticStatus()
status.name = f"VESC/health/{label} (CAN ID {mon.can_id})"
status.hardware_id = f"vesc_health_{mon.can_id}"
elapsed = mon.elapsed(now)
if mon.state == VescHealthState.ESTOP:
status.level = DiagnosticStatus.ERROR
status.message = "E-STOP: VESC unresponsive >2 s"
elif mon.state == VescHealthState.BUS_OFF:
status.level = DiagnosticStatus.ERROR
status.message = "CAN bus-off error"
elif mon.state == VescHealthState.DEGRADED:
status.level = DiagnosticStatus.WARN
status.message = f"Frame timeout ({elapsed:.2f} s) — recovery active"
else:
status.level = DiagnosticStatus.OK
status.message = "OK"
status.values = [
KeyValue(key="state", value=mon.state.value),
KeyValue(key="elapsed_s", value=f"{min(elapsed, 9999.0):.3f}"),
KeyValue(key="estop_active", value=str(mon.estop_active)),
]
array.status.append(status)
self._pub_diag.publish(array)
# ----------------------------------------------------------------
# Fault event log
# ----------------------------------------------------------------
def _log_fault_event(self, event: str, can_id: int | None, label: str) -> None:
entry = {
"event": event,
"can_id": can_id,
"label": label,
"stamp": time.time(),
}
with self._fault_log_lock:
self._fault_log.append(entry)
if len(self._fault_log) > 200:
self._fault_log = self._fault_log[-200:]
# ----------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------
def _id_to_label(self, can_id: int | None) -> str:
if can_id == self._left_id:
return "left"
if can_id == self._right_id:
return "right"
return str(can_id)
# ----------------------------------------------------------------
# Cleanup
# ----------------------------------------------------------------
def destroy_node(self) -> None:
if self._bus is not None:
self._bus.shutdown()
super().destroy_node()
# ---------------------------------------------------------------------------
# Inline GET_VALUES request builder (avoids cross-package import at runtime)
# ---------------------------------------------------------------------------
_CAN_PACKET_PROCESS_SHORT_BUFFER = 32
_COMM_GET_VALUES = 4
def _make_get_values_request(sender_id: int, target_id: int) -> tuple[int, bytes]:
"""Build a GET_VALUES short-buffer CAN frame for the given VESC."""
arb_id = (_CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | (target_id & 0xFF)
payload = bytes([sender_id & 0xFF, 0x00, _COMM_GET_VALUES])
return arb_id, payload
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(args=None):
rclpy.init(args=args)
node = VescHealthMonitor()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,225 @@
"""VESC CAN health monitor — pure state machine.
No ROS2, python-can, or hardware dependencies. Safe to import in unit tests.
State transitions per VESC
--------------------------
HEALTHY (elapsed > timeout_s = 0.5 s) DEGRADED (send alive, log warn)
(frame received) (log recovery)
(elapsed > estop_s = 2.0 s) ESTOP (trigger e-stop)
(frame received) (clear e-stop)
BUS_OFF is a parallel override: set by node when the CAN interface reports
bus-off; cleared when the interface recovers. While active it always
produces TRIGGER_ESTOP.
The first tick records a startup timestamp so a node that starts before any
VESCs are powered on does not immediately e-stop it enters DEGRADED after
timeout_s and escalates to ESTOP only after estop_s from first tick.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class VescHealthState(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded" # >timeout_s, alive frames being sent
ESTOP = "estop" # >estop_s unresponsive, e-stop asserted
BUS_OFF = "bus_off" # CAN bus-off error
class Action(str, Enum):
SEND_ALIVE = "send_alive" # send GET_VALUES request
TRIGGER_ESTOP = "trigger_estop" # publish zero cmd_vel / /estop
CLEAR_ESTOP = "clear_estop" # stop asserting e-stop
LOG_WARN = "log_warn" # WARN-level log
LOG_ERROR = "log_error" # ERROR-level log
LOG_RECOVERY = "log_recovery" # INFO-level recovery log
# ---------------------------------------------------------------------------
# Per-VESC state machine
# ---------------------------------------------------------------------------
@dataclass
class VescMonitor:
"""Monitors one VESC and drives the recovery state machine.
All timestamps are ``time.monotonic()`` seconds.
Usage::
mon = VescMonitor(can_id=61)
actions = mon.tick(now) # call at ~10 Hz
actions = mon.on_frame(now) # call when a CAN frame arrives
"""
can_id: int
timeout_s: float = 0.5 # frame-drop detection threshold
estop_s: float = 2.0 # escalation to e-stop threshold
alive_retry_s: float = 0.2 # interval between alive retries in DEGRADED
# Mutable runtime state (not init args)
state: VescHealthState = field(default=VescHealthState.HEALTHY, init=False)
last_frame_ts: float = field(default=0.0, init=False) # 0 = never
_first_tick_ts: float = field(default=0.0, init=False) # set on first tick
_last_alive_ts: float = field(default=0.0, init=False)
# ------------------------------------------------------------------
def on_frame(self, now: float) -> list[Action]:
"""Call whenever a STATUS frame arrives from this VESC."""
self.last_frame_ts = now
prev = self.state
if prev in (VescHealthState.DEGRADED, VescHealthState.ESTOP,
VescHealthState.BUS_OFF):
self.state = VescHealthState.HEALTHY
actions: list[Action] = [Action.LOG_RECOVERY]
if prev in (VescHealthState.ESTOP, VescHealthState.BUS_OFF):
actions.append(Action.CLEAR_ESTOP)
return actions
return []
def on_bus_off(self, now: float) -> list[Action]:
"""Call when the CAN interface enters bus-off state."""
if self.state != VescHealthState.BUS_OFF:
self.state = VescHealthState.BUS_OFF
return [Action.LOG_ERROR, Action.TRIGGER_ESTOP]
return [Action.TRIGGER_ESTOP]
def on_bus_ok(self, now: float) -> list[Action]:
"""Call when the CAN interface recovers from bus-off."""
if self.state == VescHealthState.BUS_OFF:
self.state = VescHealthState.HEALTHY
self.last_frame_ts = 0.0 # wait for first fresh frame
return [Action.LOG_RECOVERY, Action.CLEAR_ESTOP]
return []
def tick(self, now: float) -> list[Action]:
"""Periodic update (~10 Hz). Returns list of actions to take."""
# BUS_OFF and ESTOP keep asserting until a frame or bus recovery
if self.state == VescHealthState.BUS_OFF:
return [Action.TRIGGER_ESTOP]
if self.state == VescHealthState.ESTOP:
return [Action.TRIGGER_ESTOP]
# Track elapsed since startup for the "never received" case so we
# don't immediately e-stop on node startup before any frames arrive.
if self._first_tick_ts == 0.0:
self._first_tick_ts = now
if self.last_frame_ts > 0:
elapsed = now - self.last_frame_ts
else:
elapsed = now - self._first_tick_ts
if elapsed >= self.estop_s:
# Escalate to e-stop
if self.state != VescHealthState.ESTOP:
self.state = VescHealthState.ESTOP
return [Action.TRIGGER_ESTOP, Action.LOG_ERROR]
return [Action.TRIGGER_ESTOP]
if elapsed >= self.timeout_s:
if self.state == VescHealthState.HEALTHY:
# First timeout — transition to DEGRADED and send alive
self.state = VescHealthState.DEGRADED
self._last_alive_ts = now
return [Action.SEND_ALIVE, Action.LOG_WARN]
elif self.state == VescHealthState.DEGRADED:
# Retry alive at interval
if now - self._last_alive_ts >= self.alive_retry_s:
self._last_alive_ts = now
return [Action.SEND_ALIVE]
return []
# elapsed < timeout_s — connection is healthy
if self.state == VescHealthState.DEGRADED:
self.state = VescHealthState.HEALTHY
return [Action.LOG_RECOVERY]
return []
# ------------------------------------------------------------------
# Introspection helpers
# ------------------------------------------------------------------
@property
def is_healthy(self) -> bool:
return self.state == VescHealthState.HEALTHY
@property
def estop_active(self) -> bool:
return self.state in (VescHealthState.ESTOP, VescHealthState.BUS_OFF)
def elapsed(self, now: float) -> float:
"""Seconds since last frame (inf if never received)."""
return (now - self.last_frame_ts) if self.last_frame_ts > 0 else float("inf")
# ---------------------------------------------------------------------------
# Dual-VESC wrapper
# ---------------------------------------------------------------------------
@dataclass
class HealthFsm:
"""Manages two VescMonitor instances and aggregates bus-off state."""
left_id: int = 61
right_id: int = 79
timeout_s: float = 0.5
estop_s: float = 2.0
def __post_init__(self) -> None:
self.left = VescMonitor(self.left_id, self.timeout_s, self.estop_s)
self.right = VescMonitor(self.right_id, self.timeout_s, self.estop_s)
self._bus_off = False
def monitors(self) -> tuple[VescMonitor, VescMonitor]:
return self.left, self.right
def on_frame(self, can_id: int, now: float) -> list[Action]:
mon = self._get(can_id)
return mon.on_frame(now) if mon else []
def on_bus_off(self, now: float) -> list[Action]:
if not self._bus_off:
self._bus_off = True
return self.left.on_bus_off(now) + self.right.on_bus_off(now)
return [Action.TRIGGER_ESTOP]
def on_bus_ok(self, now: float) -> list[Action]:
if self._bus_off:
self._bus_off = False
return self.left.on_bus_ok(now) + self.right.on_bus_ok(now)
return []
def tick(self, now: float) -> dict[int, list[Action]]:
"""Returns {can_id: [actions]} for each VESC."""
return {
self.left_id: self.left.tick(now),
self.right_id: self.right.tick(now),
}
@property
def any_estop(self) -> bool:
return self.left.estop_active or self.right.estop_active
def _get(self, can_id: int) -> VescMonitor | None:
if can_id == self.left_id:
return self.left
if can_id == self.right_id:
return self.right
return None

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_vesc_health
[install]
install_scripts=$base/lib/saltybot_vesc_health

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_vesc_health"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/vesc_health.launch.py"]),
(f"share/{package_name}/config", ["config/vesc_health_params.yaml"]),
],
install_requires=["setuptools", "python-can"],
zip_safe=True,
maintainer="sl-jetson",
maintainer_email="sl-jetson@saltylab.local",
description="VESC CAN health monitor with watchdog and auto-recovery",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"vesc_health_node = saltybot_vesc_health.health_monitor_node:main",
],
},
)

View File

@ -0,0 +1,355 @@
"""Unit tests for VESC CAN health monitor — recovery FSM.
No ROS2, python-can, or hardware required. Tests cover:
- VescMonitor state machine transitions
- Timeout DEGRADED ESTOP escalation
- Frame-received recovery from DEGRADED and ESTOP
- Bus-off override and recovery
- Alive retry interval in DEGRADED
- HealthFsm dual-VESC wrapper
- Edge cases (never seen, exact boundary, bus_ok when not bus_off)
"""
import pytest
from saltybot_vesc_health.recovery_fsm import (
Action,
HealthFsm,
VescHealthState,
VescMonitor,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
T0 = 1_000.0 # arbitrary monotonic baseline (seconds)
def fresh_mon(**kwargs) -> VescMonitor:
"""Return a VescMonitor that has never seen a frame."""
return VescMonitor(can_id=61, **kwargs)
def alive_mon(last_ts: float = T0, **kwargs) -> VescMonitor:
"""Return a VescMonitor whose last frame was at last_ts."""
mon = VescMonitor(can_id=61, **kwargs)
mon.last_frame_ts = last_ts
return mon
# ---------------------------------------------------------------------------
# VescMonitor — initial state
# ---------------------------------------------------------------------------
class TestVescMonitorInit:
def test_starts_healthy(self):
assert fresh_mon().state == VescHealthState.HEALTHY
def test_never_received_is_unhealthy_after_timeout(self):
mon = fresh_mon(timeout_s=0.5)
# First tick records startup time; second tick 600ms later sees timeout
mon.tick(T0) # startup: elapsed=0, still healthy
actions = mon.tick(T0 + 0.6) # 600 ms > timeout → DEGRADED
assert Action.SEND_ALIVE in actions
assert Action.LOG_WARN in actions
assert mon.state == VescHealthState.DEGRADED
def test_elapsed_never_received(self):
mon = fresh_mon()
assert mon.elapsed(T0) == float("inf")
# ---------------------------------------------------------------------------
# VescMonitor — healthy zone
# ---------------------------------------------------------------------------
class TestHealthyZone:
def test_no_actions_when_fresh_frames_arriving(self):
mon = alive_mon(last_ts=T0)
actions = mon.tick(T0 + 0.1) # 100 ms — well within 500 ms
assert actions == []
assert mon.state == VescHealthState.HEALTHY
def test_no_transition_at_just_under_timeout(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5)
actions = mon.tick(T0 + 0.499)
assert actions == []
assert mon.state == VescHealthState.HEALTHY
# ---------------------------------------------------------------------------
# VescMonitor — timeout → DEGRADED
# ---------------------------------------------------------------------------
class TestTimeoutToDegraded:
def test_transitions_at_timeout(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5)
actions = mon.tick(T0 + 0.5)
assert mon.state == VescHealthState.DEGRADED
assert Action.SEND_ALIVE in actions
assert Action.LOG_WARN in actions
def test_send_alive_not_repeated_immediately(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, alive_retry_s=0.2)
mon.tick(T0 + 0.5) # → DEGRADED, sends alive
actions = mon.tick(T0 + 0.55) # only 50 ms later → no retry yet
assert Action.SEND_ALIVE not in actions
def test_alive_retry_at_interval(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, alive_retry_s=0.2)
mon.tick(T0 + 0.5) # → DEGRADED, first alive
actions = mon.tick(T0 + 0.71) # 210 ms later → retry
assert Action.SEND_ALIVE in actions
# ---------------------------------------------------------------------------
# VescMonitor — DEGRADED → ESTOP
# ---------------------------------------------------------------------------
class TestDegradedToEstop:
def test_escalates_at_estop_threshold(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
mon.tick(T0 + 0.5) # → DEGRADED
actions = mon.tick(T0 + 2.0)
assert mon.state == VescHealthState.ESTOP
assert Action.TRIGGER_ESTOP in actions
assert Action.LOG_ERROR in actions
def test_estop_keeps_asserting(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
mon.tick(T0 + 0.5)
mon.tick(T0 + 2.0) # → ESTOP
actions1 = mon.tick(T0 + 2.1)
actions2 = mon.tick(T0 + 3.0)
assert Action.TRIGGER_ESTOP in actions1
assert Action.TRIGGER_ESTOP in actions2
def test_no_duplicate_log_error_while_in_estop(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
mon.tick(T0 + 0.5)
actions_at_escalation = mon.tick(T0 + 2.0) # LOG_ERROR here
assert Action.LOG_ERROR in actions_at_escalation
later_actions = mon.tick(T0 + 2.1)
assert Action.LOG_ERROR not in later_actions # not repeated
# ---------------------------------------------------------------------------
# VescMonitor — recovery from DEGRADED
# ---------------------------------------------------------------------------
class TestRecoveryFromDegraded:
def test_on_frame_clears_degraded(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5)
mon.tick(T0 + 0.5) # → DEGRADED
actions = mon.on_frame(T0 + 0.9)
assert mon.state == VescHealthState.HEALTHY
assert Action.LOG_RECOVERY in actions
def test_on_frame_does_not_trigger_clear_estop_from_degraded(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5)
mon.tick(T0 + 0.5) # → DEGRADED (not ESTOP)
actions = mon.on_frame(T0 + 0.9)
assert Action.CLEAR_ESTOP not in actions
# ---------------------------------------------------------------------------
# VescMonitor — recovery from ESTOP
# ---------------------------------------------------------------------------
class TestRecoveryFromEstop:
def test_on_frame_clears_estop(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
mon.tick(T0 + 0.5)
mon.tick(T0 + 2.0) # → ESTOP
actions = mon.on_frame(T0 + 2.5)
assert mon.state == VescHealthState.HEALTHY
assert Action.LOG_RECOVERY in actions
assert Action.CLEAR_ESTOP in actions
def test_tick_after_recovery_is_clean(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
mon.tick(T0 + 0.5)
mon.tick(T0 + 2.0)
mon.on_frame(T0 + 2.5)
actions = mon.tick(T0 + 2.6)
assert actions == []
# ---------------------------------------------------------------------------
# VescMonitor — bus-off
# ---------------------------------------------------------------------------
class TestBusOff:
def test_bus_off_triggers_estop(self):
mon = alive_mon(last_ts=T0)
actions = mon.on_bus_off(T0 + 1.0)
assert mon.state == VescHealthState.BUS_OFF
assert Action.TRIGGER_ESTOP in actions
assert Action.LOG_ERROR in actions
def test_bus_off_repeated_call_still_asserts_estop(self):
mon = alive_mon(last_ts=T0)
mon.on_bus_off(T0)
actions = mon.on_bus_off(T0 + 0.5)
assert Action.TRIGGER_ESTOP in actions
def test_bus_ok_clears_bus_off(self):
mon = alive_mon(last_ts=T0)
mon.on_bus_off(T0)
actions = mon.on_bus_ok(T0 + 1.0)
assert mon.state == VescHealthState.HEALTHY
assert Action.CLEAR_ESTOP in actions
assert Action.LOG_RECOVERY in actions
def test_bus_ok_when_not_bus_off_is_noop(self):
mon = alive_mon(last_ts=T0)
assert mon.on_bus_ok(T0) == []
def test_bus_off_tick_keeps_asserting_estop(self):
mon = alive_mon(last_ts=T0)
mon.on_bus_off(T0)
actions = mon.tick(T0 + 1.0)
assert Action.TRIGGER_ESTOP in actions
def test_on_frame_clears_bus_off(self):
mon = alive_mon(last_ts=T0)
mon.on_bus_off(T0)
actions = mon.on_frame(T0 + 0.5)
assert mon.state == VescHealthState.HEALTHY
assert Action.CLEAR_ESTOP in actions
# ---------------------------------------------------------------------------
# VescMonitor — elapsed helper
# ---------------------------------------------------------------------------
class TestElapsed:
def test_elapsed_after_frame(self):
mon = alive_mon(last_ts=T0)
assert mon.elapsed(T0 + 0.3) == pytest.approx(0.3)
def test_elapsed_never_received(self):
mon = fresh_mon()
assert mon.elapsed(T0) == float("inf")
# ---------------------------------------------------------------------------
# VescMonitor — estop_active property
# ---------------------------------------------------------------------------
class TestEstopActive:
def test_healthy_not_active(self):
assert not alive_mon().estop_active
def test_estop_state_is_active(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
mon.tick(T0 + 0.5)
mon.tick(T0 + 2.0)
assert mon.estop_active
def test_bus_off_is_active(self):
mon = alive_mon(last_ts=T0)
mon.on_bus_off(T0)
assert mon.estop_active
def test_degraded_not_active(self):
mon = alive_mon(last_ts=T0, timeout_s=0.5)
mon.tick(T0 + 0.5)
assert not mon.estop_active
# ---------------------------------------------------------------------------
# HealthFsm — dual-VESC wrapper
# ---------------------------------------------------------------------------
class TestHealthFsm:
def _make_fsm(self) -> HealthFsm:
fsm = HealthFsm(left_id=61, right_id=79, timeout_s=0.5, estop_s=2.0)
fsm.left.last_frame_ts = T0
fsm.right.last_frame_ts = T0
return fsm
def test_tick_returns_per_vesc_actions(self):
fsm = self._make_fsm()
result = fsm.tick(T0 + 0.1)
assert 61 in result
assert 79 in result
def test_on_frame_updates_left(self):
fsm = self._make_fsm()
fsm.left.last_frame_ts = T0 - 1.0 # stale
fsm.left.state = VescHealthState.DEGRADED
actions = fsm.on_frame(61, T0 + 0.1)
assert Action.LOG_RECOVERY in actions
assert fsm.left.state == VescHealthState.HEALTHY
def test_on_frame_unknown_id_is_noop(self):
fsm = self._make_fsm()
assert fsm.on_frame(99, T0) == []
def test_any_estop_true_when_one_vesc_estop(self):
fsm = self._make_fsm()
fsm.left.last_frame_ts = T0 - 3.0
fsm.tick(T0) # should escalate left to ESTOP
assert fsm.any_estop
def test_any_estop_false_when_both_healthy(self):
fsm = self._make_fsm()
fsm.tick(T0 + 0.1)
assert not fsm.any_estop
def test_bus_off_propagates_to_both(self):
fsm = self._make_fsm()
actions = fsm.on_bus_off(T0)
assert fsm.left.state == VescHealthState.BUS_OFF
assert fsm.right.state == VescHealthState.BUS_OFF
assert any(a == Action.TRIGGER_ESTOP for a in actions)
def test_bus_ok_clears_both(self):
fsm = self._make_fsm()
fsm.on_bus_off(T0)
actions = fsm.on_bus_ok(T0 + 1.0)
assert fsm.left.state == VescHealthState.HEALTHY
assert fsm.right.state == VescHealthState.HEALTHY
assert any(a == Action.CLEAR_ESTOP for a in actions)
def test_bus_ok_when_not_bus_off_is_noop(self):
fsm = self._make_fsm()
assert fsm.on_bus_ok(T0) == []
def test_left_timeout_right_healthy(self):
fsm = self._make_fsm()
fsm.left.last_frame_ts = T0 - 1.0 # stale
result = fsm.tick(T0)
assert Action.SEND_ALIVE in result[61]
assert result[79] == []
# ---------------------------------------------------------------------------
# _make_get_values_request (inline copy in node module)
# ---------------------------------------------------------------------------
class TestMakeGetValuesRequest:
def test_arb_id_encodes_target(self):
from saltybot_vesc_health.health_monitor_node import _make_get_values_request
arb_id, _ = _make_get_values_request(127, 61)
assert (arb_id & 0xFF) == 61
def test_payload_has_sender_and_comm_id(self):
from saltybot_vesc_health.health_monitor_node import (
_make_get_values_request,
_COMM_GET_VALUES,
)
_, payload = _make_get_values_request(42, 61)
assert payload[0] == 42
assert payload[2] == _COMM_GET_VALUES
def test_arb_id_is_extended(self):
from saltybot_vesc_health.health_monitor_node import _make_get_values_request
arb_id, _ = _make_get_values_request(127, 61)
assert arb_id > 0x7FF
if __name__ == "__main__":
pytest.main([__file__, "-v"])