feat: UWB geofence speed limiting (Issue #657) #663
@ -0,0 +1,56 @@
|
|||||||
|
# UWB Geofence Zones Configuration
|
||||||
|
# All coordinates are in the UWB local frame (metres, X-forward, Y-left)
|
||||||
|
#
|
||||||
|
# emergency_boundary: outer hard limit — robot must stay inside.
|
||||||
|
# If it exits, cmd_vel_limited is zeroed and /saltybot/geofence_violation
|
||||||
|
# is latched True (triggers e-stop cascade).
|
||||||
|
#
|
||||||
|
# zones: list of named speed-limit regions.
|
||||||
|
# vertices: flat [x1, y1, x2, y2, ...] polygon, at least 3 points.
|
||||||
|
# max_speed: m/s (linear)
|
||||||
|
# max_angular: rad/s (yaw)
|
||||||
|
# The most restrictive zone the robot is currently inside wins.
|
||||||
|
# Transitions between limits are smoothed over `transition_time` seconds.
|
||||||
|
|
||||||
|
# ── Emergency boundary (6 m × 10 m space, centred at origin) ─────────────────
|
||||||
|
emergency_boundary:
|
||||||
|
vertices:
|
||||||
|
[-6.0, -5.0,
|
||||||
|
6.0, -5.0,
|
||||||
|
6.0, 5.0,
|
||||||
|
-6.0, 5.0]
|
||||||
|
|
||||||
|
# ── Named speed-limit zones ───────────────────────────────────────────────────
|
||||||
|
zones:
|
||||||
|
# Caution perimeter — 1 m inside the emergency boundary
|
||||||
|
- name: caution_perimeter
|
||||||
|
description: "Slow near room walls (1 m buffer inside emergency boundary)"
|
||||||
|
max_speed: 0.4
|
||||||
|
max_angular: 0.8
|
||||||
|
vertices:
|
||||||
|
[-5.0, -4.0,
|
||||||
|
5.0, -4.0,
|
||||||
|
5.0, 4.0,
|
||||||
|
-5.0, 4.0]
|
||||||
|
|
||||||
|
# Dock approach — slow zone in front of the docking station (positive Y end)
|
||||||
|
- name: dock_approach
|
||||||
|
description: "Very slow approach corridor to docking station"
|
||||||
|
max_speed: 0.15
|
||||||
|
max_angular: 0.3
|
||||||
|
vertices:
|
||||||
|
[-0.75, 2.5,
|
||||||
|
0.75, 2.5,
|
||||||
|
0.75, 4.5,
|
||||||
|
-0.75, 4.5]
|
||||||
|
|
||||||
|
# Static obstacle cluster A (example — update to match your layout)
|
||||||
|
- name: obstacle_cluster_a
|
||||||
|
description: "Caution zone around obstacle cluster near (+2, -1)"
|
||||||
|
max_speed: 0.25
|
||||||
|
max_angular: 0.5
|
||||||
|
vertices:
|
||||||
|
[1.0, -2.0,
|
||||||
|
3.0, -2.0,
|
||||||
|
3.0, 0.0,
|
||||||
|
1.0, 0.0]
|
||||||
@ -0,0 +1,47 @@
|
|||||||
|
"""Launch file for saltybot_uwb_geofence node."""
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description() -> LaunchDescription:
|
||||||
|
pkg_share = str(
|
||||||
|
Path(__file__).parent.parent / "config" / "uwb_geofence_zones.yaml"
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"zones_file",
|
||||||
|
default_value=pkg_share,
|
||||||
|
description="Path to YAML zones config file",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"transition_time",
|
||||||
|
default_value="0.5",
|
||||||
|
description="Speed ramp time in seconds",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"frequency",
|
||||||
|
default_value="20.0",
|
||||||
|
description="Node update frequency in Hz",
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
package="saltybot_uwb_geofence",
|
||||||
|
executable="uwb_geofence_node",
|
||||||
|
name="uwb_geofence",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
{
|
||||||
|
"zones_file": LaunchConfiguration("zones_file"),
|
||||||
|
"transition_time": LaunchConfiguration("transition_time"),
|
||||||
|
"frequency": LaunchConfiguration("frequency"),
|
||||||
|
}
|
||||||
|
],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
27
jetson/ros2_ws/src/saltybot_uwb_geofence/package.xml
Normal file
27
jetson/ros2_ws/src/saltybot_uwb_geofence/package.xml
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_uwb_geofence</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
UWB-position-based geofence speed limiter for SaltyBot (Issue #657).
|
||||||
|
Subscribes to /saltybot/pose/authoritative (UWB+IMU fused), enforces
|
||||||
|
configurable polygon speed zones, smooth transitions, and emergency
|
||||||
|
e-stop on boundary exit.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-uwb@saltylab.local">SaltyLab UWB</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>python3-yaml</depend>
|
||||||
|
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
<test_depend>geometry_msgs</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,296 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""UWB Geofence Speed Limiter for SaltyBot (Issue #657).
|
||||||
|
|
||||||
|
Subscribes to:
|
||||||
|
/saltybot/pose/authoritative geometry_msgs/PoseWithCovarianceStamped
|
||||||
|
/cmd_vel geometry_msgs/Twist
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/cmd_vel_limited geometry_msgs/Twist -- speed-limited output
|
||||||
|
/saltybot/geofence/status std_msgs/String -- JSON telemetry
|
||||||
|
/saltybot/geofence_violation std_msgs/Bool -- triggers e-stop cascade
|
||||||
|
|
||||||
|
Zones are defined in a YAML file (zones_file parameter). Each zone has a
|
||||||
|
name, polygon vertices in UWB local frame (metres), max_speed (m/s) and
|
||||||
|
max_angular (rad/s). The most restrictive of all containing zones wins.
|
||||||
|
|
||||||
|
An outer emergency_boundary polygon is mandatory. If the robot exits it,
|
||||||
|
all motion is zeroed and /saltybot/geofence_violation is latched True.
|
||||||
|
|
||||||
|
Speed transitions are smoothed over `transition_time` seconds using a
|
||||||
|
linear ramp so that passing through a zone boundary never causes a
|
||||||
|
velocity spike.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from geometry_msgs.msg import PoseWithCovarianceStamped
|
||||||
|
from std_msgs.msg import Bool, String
|
||||||
|
|
||||||
|
from saltybot_uwb_geofence.zone_checker import (
|
||||||
|
apply_speed_limit,
|
||||||
|
min_dist_to_boundary,
|
||||||
|
parse_flat_vertices,
|
||||||
|
point_in_polygon,
|
||||||
|
ramp_toward,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class UwbGeofenceNode(Node):
|
||||||
|
"""ROS2 node enforcing UWB-position-based speed limits inside geofence zones."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("uwb_geofence")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter(
|
||||||
|
"zones_file",
|
||||||
|
str(Path(__file__).parent.parent.parent / "config" / "uwb_geofence_zones.yaml"),
|
||||||
|
)
|
||||||
|
self.declare_parameter("transition_time", 0.5) # seconds
|
||||||
|
self.declare_parameter("frequency", 20.0) # Hz
|
||||||
|
self.declare_parameter("default_max_speed", 1.0) # m/s (outside all named zones)
|
||||||
|
self.declare_parameter("default_max_angular", 1.5) # rad/s
|
||||||
|
self.declare_parameter("pose_topic", "/saltybot/pose/authoritative")
|
||||||
|
self.declare_parameter("cmd_vel_topic", "/cmd_vel")
|
||||||
|
|
||||||
|
zones_file = self.get_parameter("zones_file").value
|
||||||
|
self._transition_time: float = self.get_parameter("transition_time").value
|
||||||
|
frequency: float = self.get_parameter("frequency").value
|
||||||
|
self._default_max_speed: float = self.get_parameter("default_max_speed").value
|
||||||
|
self._default_max_angular: float = self.get_parameter("default_max_angular").value
|
||||||
|
pose_topic: str = self.get_parameter("pose_topic").value
|
||||||
|
cmd_vel_topic: str = self.get_parameter("cmd_vel_topic").value
|
||||||
|
|
||||||
|
# ── Load zones config ─────────────────────────────────────────────────
|
||||||
|
self._zones: List[Dict[str, Any]] = []
|
||||||
|
self._emergency_boundary: List[Tuple[float, float]] = []
|
||||||
|
self._load_zones(zones_file)
|
||||||
|
|
||||||
|
# ── Runtime state ─────────────────────────────────────────────────────
|
||||||
|
self._robot_x: float = 0.0
|
||||||
|
self._robot_y: float = 0.0
|
||||||
|
self._pose_received: bool = False
|
||||||
|
self._last_cmd_vel: Optional[Twist] = None
|
||||||
|
|
||||||
|
# Current smoothed speed limits (ramp toward target)
|
||||||
|
self._current_max_speed: float = self._default_max_speed
|
||||||
|
self._current_max_angular: float = self._default_max_angular
|
||||||
|
|
||||||
|
# E-stop latch (requires operator reset)
|
||||||
|
self._violation_latched: bool = False
|
||||||
|
|
||||||
|
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(
|
||||||
|
PoseWithCovarianceStamped,
|
||||||
|
pose_topic,
|
||||||
|
self._on_pose,
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
Twist,
|
||||||
|
cmd_vel_topic,
|
||||||
|
self._on_cmd_vel,
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publications ──────────────────────────────────────────────────────
|
||||||
|
self._pub_limited = self.create_publisher(Twist, "/cmd_vel_limited", 10)
|
||||||
|
self._pub_status = self.create_publisher(String, "/saltybot/geofence/status", 10)
|
||||||
|
self._pub_violation = self.create_publisher(Bool, "/saltybot/geofence_violation", 1)
|
||||||
|
|
||||||
|
# ── Timer ─────────────────────────────────────────────────────────────
|
||||||
|
self._dt: float = 1.0 / frequency
|
||||||
|
self.create_timer(self._dt, self._tick)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"uwb_geofence started: {len(self._zones)} zone(s), "
|
||||||
|
f"freq={frequency:.0f}Hz, transition={self._transition_time}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Config loading ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _load_zones(self, path: str) -> None:
|
||||||
|
"""Parse YAML zones file and populate self._zones and self._emergency_boundary."""
|
||||||
|
p = Path(path)
|
||||||
|
if not p.exists():
|
||||||
|
self.get_logger().warn(f"zones_file not found: {path}; running with empty zones")
|
||||||
|
return
|
||||||
|
|
||||||
|
with open(p) as fh:
|
||||||
|
cfg = yaml.safe_load(fh)
|
||||||
|
|
||||||
|
# Emergency boundary (mandatory)
|
||||||
|
eb = cfg.get("emergency_boundary", {})
|
||||||
|
flat = eb.get("vertices", [])
|
||||||
|
if flat:
|
||||||
|
try:
|
||||||
|
self._emergency_boundary = parse_flat_vertices(flat)
|
||||||
|
except ValueError as exc:
|
||||||
|
self.get_logger().error(f"emergency_boundary vertices invalid: {exc}")
|
||||||
|
else:
|
||||||
|
self.get_logger().warn("No emergency_boundary defined — boundary checking disabled")
|
||||||
|
|
||||||
|
# Named speed zones
|
||||||
|
for z in cfg.get("zones", []):
|
||||||
|
try:
|
||||||
|
verts = parse_flat_vertices(z["vertices"])
|
||||||
|
except (KeyError, ValueError) as exc:
|
||||||
|
self.get_logger().error(f"Skipping zone '{z.get('name', '?')}': {exc}")
|
||||||
|
continue
|
||||||
|
self._zones.append(
|
||||||
|
{
|
||||||
|
"name": z.get("name", "unnamed"),
|
||||||
|
"description": z.get("description", ""),
|
||||||
|
"max_speed": float(z.get("max_speed", self._default_max_speed)),
|
||||||
|
"max_angular": float(z.get("max_angular", self._default_max_angular)),
|
||||||
|
"vertices": verts,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
self.get_logger().info(
|
||||||
|
f" zone '{z['name']}': max_speed={z.get('max_speed')} m/s, "
|
||||||
|
f"{len(verts)} vertices"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_pose(self, msg: PoseWithCovarianceStamped) -> None:
|
||||||
|
self._robot_x = msg.pose.pose.position.x
|
||||||
|
self._robot_y = msg.pose.pose.position.y
|
||||||
|
self._pose_received = True
|
||||||
|
|
||||||
|
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||||
|
self._last_cmd_vel = msg
|
||||||
|
|
||||||
|
# ── Main loop ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _tick(self) -> None:
|
||||||
|
"""Called at `frequency` Hz. Compute limits, ramp, publish."""
|
||||||
|
if not self._pose_received or self._last_cmd_vel is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
x, y = self._robot_x, self._robot_y
|
||||||
|
|
||||||
|
# ── Emergency boundary check ──────────────────────────────────────────
|
||||||
|
if self._emergency_boundary:
|
||||||
|
inside_boundary = point_in_polygon(x, y, self._emergency_boundary)
|
||||||
|
if not inside_boundary:
|
||||||
|
self._trigger_violation(x, y)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Clear latch once back inside boundary (manual operator resume)
|
||||||
|
if self._violation_latched:
|
||||||
|
# Publish safe=False until operator clears via /saltybot/geofence_violation
|
||||||
|
# (this node keeps publishing zero until reset)
|
||||||
|
self._publish_zero_cmd()
|
||||||
|
self._pub_violation.publish(Bool(data=True))
|
||||||
|
return
|
||||||
|
|
||||||
|
# ── Zone classification ───────────────────────────────────────────────
|
||||||
|
active_zones: List[str] = []
|
||||||
|
target_max_speed = self._default_max_speed
|
||||||
|
target_max_angular = self._default_max_angular
|
||||||
|
|
||||||
|
for zone in self._zones:
|
||||||
|
if point_in_polygon(x, y, zone["vertices"]):
|
||||||
|
active_zones.append(zone["name"])
|
||||||
|
if zone["max_speed"] < target_max_speed:
|
||||||
|
target_max_speed = zone["max_speed"]
|
||||||
|
if zone["max_angular"] < target_max_angular:
|
||||||
|
target_max_angular = zone["max_angular"]
|
||||||
|
|
||||||
|
# ── Smooth ramp toward target limits ──────────────────────────────────
|
||||||
|
ramp_rate_speed = (self._default_max_speed / max(self._transition_time, 0.01))
|
||||||
|
ramp_rate_angular = (self._default_max_angular / max(self._transition_time, 0.01))
|
||||||
|
|
||||||
|
self._current_max_speed = ramp_toward(
|
||||||
|
self._current_max_speed, target_max_speed, ramp_rate_speed * self._dt
|
||||||
|
)
|
||||||
|
self._current_max_angular = ramp_toward(
|
||||||
|
self._current_max_angular, target_max_angular, ramp_rate_angular * self._dt
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Apply limits to cmd_vel ───────────────────────────────────────────
|
||||||
|
cmd = self._last_cmd_vel
|
||||||
|
lx, ly, lz, ax, ay, wz = apply_speed_limit(
|
||||||
|
cmd.linear.x, cmd.linear.y, cmd.angular.z,
|
||||||
|
self._current_max_speed, self._current_max_angular,
|
||||||
|
linear_z=cmd.linear.z, angular_x=cmd.angular.x, angular_y=cmd.angular.y,
|
||||||
|
)
|
||||||
|
limited = Twist()
|
||||||
|
limited.linear.x = lx
|
||||||
|
limited.linear.y = ly
|
||||||
|
limited.linear.z = lz
|
||||||
|
limited.angular.x = ax
|
||||||
|
limited.angular.y = ay
|
||||||
|
limited.angular.z = wz
|
||||||
|
self._pub_limited.publish(limited)
|
||||||
|
|
||||||
|
# ── Status ───────────────────────────────────────────────────────────
|
||||||
|
dist_to_boundary = (
|
||||||
|
min_dist_to_boundary(x, y, self._emergency_boundary)
|
||||||
|
if self._emergency_boundary
|
||||||
|
else -1.0
|
||||||
|
)
|
||||||
|
status = {
|
||||||
|
"robot_xy": [round(x, 3), round(y, 3)],
|
||||||
|
"active_zones": active_zones,
|
||||||
|
"current_max_speed": round(self._current_max_speed, 3),
|
||||||
|
"current_max_angular": round(self._current_max_angular, 3),
|
||||||
|
"target_max_speed": round(target_max_speed, 3),
|
||||||
|
"dist_to_emergency_boundary": round(dist_to_boundary, 3),
|
||||||
|
"violation": False,
|
||||||
|
}
|
||||||
|
self._pub_status.publish(String(data=json.dumps(status)))
|
||||||
|
self._pub_violation.publish(Bool(data=False))
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _trigger_violation(self, x: float, y: float) -> None:
|
||||||
|
"""Robot has exited emergency boundary. Zero all motion, latch violation."""
|
||||||
|
if not self._violation_latched:
|
||||||
|
self.get_logger().error(
|
||||||
|
f"GEOFENCE VIOLATION: robot ({x:.3f}, {y:.3f}) is outside emergency boundary!"
|
||||||
|
)
|
||||||
|
self._violation_latched = True
|
||||||
|
self._publish_zero_cmd()
|
||||||
|
self._pub_violation.publish(Bool(data=True))
|
||||||
|
status = {
|
||||||
|
"robot_xy": [round(x, 3), round(y, 3)],
|
||||||
|
"active_zones": [],
|
||||||
|
"current_max_speed": 0.0,
|
||||||
|
"current_max_angular": 0.0,
|
||||||
|
"target_max_speed": 0.0,
|
||||||
|
"dist_to_emergency_boundary": 0.0,
|
||||||
|
"violation": True,
|
||||||
|
}
|
||||||
|
self._pub_status.publish(String(data=json.dumps(status)))
|
||||||
|
|
||||||
|
def _publish_zero_cmd(self) -> None:
|
||||||
|
self._pub_limited.publish(Twist())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = UwbGeofenceNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,104 @@
|
|||||||
|
"""zone_checker.py — Pure-geometry helpers for UWB geofence zones.
|
||||||
|
|
||||||
|
No ROS2 dependencies; fully unit-testable.
|
||||||
|
|
||||||
|
Coordinate system: UWB local frame, metres, right-hand X-Y plane.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
Vertex = Tuple[float, float]
|
||||||
|
Polygon = List[Vertex]
|
||||||
|
|
||||||
|
|
||||||
|
def parse_flat_vertices(flat: List[float]) -> Polygon:
|
||||||
|
"""Convert flat [x1, y1, x2, y2, ...] list to [(x1,y1), (x2,y2), ...].
|
||||||
|
|
||||||
|
Raises ValueError if fewer than 3 vertices are provided or the list
|
||||||
|
length is odd.
|
||||||
|
"""
|
||||||
|
if len(flat) % 2 != 0:
|
||||||
|
raise ValueError(f"Flat vertex list must have even length, got {len(flat)}")
|
||||||
|
if len(flat) < 6:
|
||||||
|
raise ValueError(f"Need at least 3 vertices (6 values), got {len(flat)}")
|
||||||
|
return [(flat[i], flat[i + 1]) for i in range(0, len(flat), 2)]
|
||||||
|
|
||||||
|
|
||||||
|
def point_in_polygon(x: float, y: float, polygon: Polygon) -> bool:
|
||||||
|
"""Ray-casting point-in-polygon test.
|
||||||
|
|
||||||
|
Returns True if (x, y) is strictly inside or on the boundary of
|
||||||
|
*polygon*. Polygon vertices must be ordered (CW or CCW).
|
||||||
|
"""
|
||||||
|
n = len(polygon)
|
||||||
|
if n < 3:
|
||||||
|
return False
|
||||||
|
inside = False
|
||||||
|
x1, y1 = polygon[-1]
|
||||||
|
for x2, y2 in polygon:
|
||||||
|
# Check if the ray from (x,y) rightward crosses edge (x1,y1)-(x2,y2)
|
||||||
|
if (y1 > y) != (y2 > y):
|
||||||
|
x_intersect = (x2 - x1) * (y - y1) / (y2 - y1) + x1
|
||||||
|
if x <= x_intersect:
|
||||||
|
inside = not inside
|
||||||
|
x1, y1 = x2, y2
|
||||||
|
return inside
|
||||||
|
|
||||||
|
|
||||||
|
def min_dist_to_boundary(x: float, y: float, polygon: Polygon) -> float:
|
||||||
|
"""Minimum distance from (x, y) to any edge of *polygon* (metres).
|
||||||
|
|
||||||
|
Uses point-to-segment distance with clamped projection.
|
||||||
|
"""
|
||||||
|
if len(polygon) < 2:
|
||||||
|
return float("inf")
|
||||||
|
min_d = float("inf")
|
||||||
|
n = len(polygon)
|
||||||
|
for i in range(n):
|
||||||
|
ax, ay = polygon[i]
|
||||||
|
bx, by = polygon[(i + 1) % n]
|
||||||
|
dx, dy = bx - ax, by - ay
|
||||||
|
seg_len_sq = dx * dx + dy * dy
|
||||||
|
if seg_len_sq < 1e-12:
|
||||||
|
d = math.hypot(x - ax, y - ay)
|
||||||
|
else:
|
||||||
|
t = max(0.0, min(1.0, ((x - ax) * dx + (y - ay) * dy) / seg_len_sq))
|
||||||
|
px, py = ax + t * dx, ay + t * dy
|
||||||
|
d = math.hypot(x - px, y - py)
|
||||||
|
if d < min_d:
|
||||||
|
min_d = d
|
||||||
|
return min_d
|
||||||
|
|
||||||
|
|
||||||
|
# ── Velocity helpers (no ROS2 deps — also used by tests) ─────────────────────
|
||||||
|
|
||||||
|
def ramp_toward(current: float, target: float, step: float) -> float:
|
||||||
|
"""Move *current* toward *target* by at most *step*, never overshoot."""
|
||||||
|
if current < target:
|
||||||
|
return min(current + step, target)
|
||||||
|
if current > target:
|
||||||
|
return max(current - step, target)
|
||||||
|
return current
|
||||||
|
|
||||||
|
|
||||||
|
def apply_speed_limit(linear_x: float, linear_y: float, angular_z: float,
|
||||||
|
max_speed: float, max_angular: float,
|
||||||
|
linear_z: float = 0.0,
|
||||||
|
angular_x: float = 0.0,
|
||||||
|
angular_y: float = 0.0
|
||||||
|
) -> tuple:
|
||||||
|
"""Return (linear_x, linear_y, linear_z, angular_x, angular_y, angular_z)
|
||||||
|
with linear speed and angular_z clamped to limits.
|
||||||
|
"""
|
||||||
|
speed = math.hypot(linear_x, linear_y)
|
||||||
|
if speed > max_speed and speed > 1e-6:
|
||||||
|
scale = max_speed / speed
|
||||||
|
lx = linear_x * scale
|
||||||
|
ly = linear_y * scale
|
||||||
|
else:
|
||||||
|
lx, ly = linear_x, linear_y
|
||||||
|
wz = max(-max_angular, min(max_angular, angular_z))
|
||||||
|
return lx, ly, linear_z, angular_x, angular_y, wz
|
||||||
5
jetson/ros2_ws/src/saltybot_uwb_geofence/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_uwb_geofence/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_uwb_geofence
|
||||||
|
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_uwb_geofence
|
||||||
27
jetson/ros2_ws/src/saltybot_uwb_geofence/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_uwb_geofence/setup.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name="saltybot_uwb_geofence",
|
||||||
|
version="0.1.0",
|
||||||
|
packages=find_packages(),
|
||||||
|
data_files=[
|
||||||
|
(
|
||||||
|
"share/ament_index/resource_index/packages",
|
||||||
|
["resource/saltybot_uwb_geofence"],
|
||||||
|
),
|
||||||
|
("share/saltybot_uwb_geofence", ["package.xml"]),
|
||||||
|
("share/saltybot_uwb_geofence/config", ["config/uwb_geofence_zones.yaml"]),
|
||||||
|
("share/saltybot_uwb_geofence/launch", ["launch/uwb_geofence.launch.py"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools", "pyyaml"],
|
||||||
|
zip_safe=True,
|
||||||
|
author="SaltyLab UWB",
|
||||||
|
author_email="sl-uwb@saltylab.local",
|
||||||
|
description="UWB geofence speed limiter for SaltyBot (Issue #657)",
|
||||||
|
license="MIT",
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"uwb_geofence_node=saltybot_uwb_geofence.uwb_geofence_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
Binary file not shown.
Binary file not shown.
@ -0,0 +1,177 @@
|
|||||||
|
"""Unit tests for zone_checker.py — pure geometry, no ROS2 required."""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_uwb_geofence.zone_checker import (
|
||||||
|
parse_flat_vertices,
|
||||||
|
point_in_polygon,
|
||||||
|
min_dist_to_boundary,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── parse_flat_vertices ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestParseFlatVertices:
|
||||||
|
def test_square(self):
|
||||||
|
verts = parse_flat_vertices([0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0])
|
||||||
|
assert verts == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)]
|
||||||
|
|
||||||
|
def test_triangle(self):
|
||||||
|
verts = parse_flat_vertices([0.0, 0.0, 2.0, 0.0, 1.0, 2.0])
|
||||||
|
assert len(verts) == 3
|
||||||
|
|
||||||
|
def test_odd_length_raises(self):
|
||||||
|
with pytest.raises(ValueError, match="even"):
|
||||||
|
parse_flat_vertices([1.0, 2.0, 3.0])
|
||||||
|
|
||||||
|
def test_too_few_points_raises(self):
|
||||||
|
with pytest.raises(ValueError, match="3 vertices"):
|
||||||
|
parse_flat_vertices([0.0, 0.0, 1.0, 1.0])
|
||||||
|
|
||||||
|
|
||||||
|
# ── point_in_polygon ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
UNIT_SQUARE = [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)]
|
||||||
|
TRIANGLE = [(0.0, 0.0), (4.0, 0.0), (2.0, 4.0)]
|
||||||
|
|
||||||
|
|
||||||
|
class TestPointInPolygon:
|
||||||
|
# --- Unit square ---
|
||||||
|
def test_centre_inside(self):
|
||||||
|
assert point_in_polygon(0.5, 0.5, UNIT_SQUARE) is True
|
||||||
|
|
||||||
|
def test_outside_right(self):
|
||||||
|
assert point_in_polygon(1.5, 0.5, UNIT_SQUARE) is False
|
||||||
|
|
||||||
|
def test_outside_left(self):
|
||||||
|
assert point_in_polygon(-0.1, 0.5, UNIT_SQUARE) is False
|
||||||
|
|
||||||
|
def test_outside_above(self):
|
||||||
|
assert point_in_polygon(0.5, 1.5, UNIT_SQUARE) is False
|
||||||
|
|
||||||
|
def test_outside_below(self):
|
||||||
|
assert point_in_polygon(0.5, -0.1, UNIT_SQUARE) is False
|
||||||
|
|
||||||
|
def test_far_outside(self):
|
||||||
|
assert point_in_polygon(10.0, 10.0, UNIT_SQUARE) is False
|
||||||
|
|
||||||
|
# --- Triangle ---
|
||||||
|
def test_triangle_apex_region(self):
|
||||||
|
assert point_in_polygon(2.0, 2.0, TRIANGLE) is True
|
||||||
|
|
||||||
|
def test_triangle_base_centre(self):
|
||||||
|
assert point_in_polygon(2.0, 0.5, TRIANGLE) is True
|
||||||
|
|
||||||
|
def test_triangle_outside_top(self):
|
||||||
|
assert point_in_polygon(2.0, 5.0, TRIANGLE) is False
|
||||||
|
|
||||||
|
def test_triangle_outside_right(self):
|
||||||
|
assert point_in_polygon(4.5, 0.5, TRIANGLE) is False
|
||||||
|
|
||||||
|
# --- Degenerate ---
|
||||||
|
def test_empty_polygon(self):
|
||||||
|
assert point_in_polygon(0.5, 0.5, []) is False
|
||||||
|
|
||||||
|
def test_two_vertex_polygon(self):
|
||||||
|
assert point_in_polygon(0.5, 0.5, [(0.0, 0.0), (1.0, 1.0)]) is False
|
||||||
|
|
||||||
|
# --- 10 × 8 room (matches emergency boundary in default config) ---
|
||||||
|
def test_inside_room(self):
|
||||||
|
room = [(-6.0, -5.0), (6.0, -5.0), (6.0, 5.0), (-6.0, 5.0)]
|
||||||
|
assert point_in_polygon(0.0, 0.0, room) is True
|
||||||
|
|
||||||
|
def test_outside_room(self):
|
||||||
|
room = [(-6.0, -5.0), (6.0, -5.0), (6.0, 5.0), (-6.0, 5.0)]
|
||||||
|
assert point_in_polygon(7.0, 0.0, room) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── min_dist_to_boundary ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestMinDistToBoundary:
|
||||||
|
def test_centre_of_square(self):
|
||||||
|
# Centre of unit square: distance to each edge = 0.5
|
||||||
|
d = min_dist_to_boundary(0.5, 0.5, UNIT_SQUARE)
|
||||||
|
assert abs(d - 0.5) < 1e-9
|
||||||
|
|
||||||
|
def test_on_edge(self):
|
||||||
|
d = min_dist_to_boundary(0.5, 0.0, UNIT_SQUARE)
|
||||||
|
assert d < 1e-9
|
||||||
|
|
||||||
|
def test_outside_corner(self):
|
||||||
|
# Point at (1.5, 0.5): closest edge is the right edge at x=1
|
||||||
|
d = min_dist_to_boundary(1.5, 0.5, UNIT_SQUARE)
|
||||||
|
assert abs(d - 0.5) < 1e-9
|
||||||
|
|
||||||
|
def test_outside_above(self):
|
||||||
|
d = min_dist_to_boundary(0.5, 1.3, UNIT_SQUARE)
|
||||||
|
assert abs(d - 0.3) < 1e-9
|
||||||
|
|
||||||
|
def test_empty_polygon(self):
|
||||||
|
assert min_dist_to_boundary(0.0, 0.0, []) == float("inf")
|
||||||
|
|
||||||
|
def test_single_vertex(self):
|
||||||
|
assert min_dist_to_boundary(1.0, 1.0, [(0.0, 0.0)]) == float("inf")
|
||||||
|
|
||||||
|
|
||||||
|
# ── ramp_toward & apply_speed_limit ──────────────────────────────────────────
|
||||||
|
|
||||||
|
from saltybot_uwb_geofence.zone_checker import ramp_toward, apply_speed_limit
|
||||||
|
|
||||||
|
|
||||||
|
class TestRampToward:
|
||||||
|
def test_ramp_up(self):
|
||||||
|
val = ramp_toward(0.0, 1.0, 0.1)
|
||||||
|
assert abs(val - 0.1) < 1e-9
|
||||||
|
|
||||||
|
def test_ramp_down(self):
|
||||||
|
val = ramp_toward(1.0, 0.0, 0.1)
|
||||||
|
assert abs(val - 0.9) < 1e-9
|
||||||
|
|
||||||
|
def test_no_overshoot_up(self):
|
||||||
|
val = ramp_toward(0.95, 1.0, 0.1)
|
||||||
|
assert val == 1.0
|
||||||
|
|
||||||
|
def test_no_overshoot_down(self):
|
||||||
|
val = ramp_toward(0.05, 0.0, 0.1)
|
||||||
|
assert val == 0.0
|
||||||
|
|
||||||
|
def test_already_at_target(self):
|
||||||
|
val = ramp_toward(0.5, 0.5, 0.1)
|
||||||
|
assert val == 0.5
|
||||||
|
|
||||||
|
|
||||||
|
class TestApplySpeedLimit:
|
||||||
|
def _call(self, vx: float, vy: float = 0.0, wz: float = 0.0,
|
||||||
|
max_speed: float = 1.0, max_ang: float = 1.5):
|
||||||
|
return apply_speed_limit(vx, vy, wz, max_speed, max_ang)
|
||||||
|
|
||||||
|
def test_within_limit_unchanged(self):
|
||||||
|
lx, ly, *_, wz = self._call(0.3, 0.0, 0.2)
|
||||||
|
assert abs(lx - 0.3) < 1e-9
|
||||||
|
assert abs(wz - 0.2) < 1e-9
|
||||||
|
|
||||||
|
def test_linear_exceeds_limit(self):
|
||||||
|
lx, ly, *_ = self._call(2.0, 0.0, 0.0)
|
||||||
|
speed = math.hypot(lx, ly)
|
||||||
|
assert speed <= 1.0 + 1e-9
|
||||||
|
|
||||||
|
def test_direction_preserved_when_limited(self):
|
||||||
|
lx, ly, *_ = self._call(3.0, 4.0, 0.0, max_speed=1.0)
|
||||||
|
speed = math.hypot(lx, ly)
|
||||||
|
assert abs(speed - 1.0) < 1e-9
|
||||||
|
assert abs(lx / ly - 3.0 / 4.0) < 1e-6
|
||||||
|
|
||||||
|
def test_angular_clamp_positive(self):
|
||||||
|
*_, wz = self._call(0.0, 0.0, 2.0, max_ang=1.0)
|
||||||
|
assert abs(wz - 1.0) < 1e-9
|
||||||
|
|
||||||
|
def test_angular_clamp_negative(self):
|
||||||
|
*_, wz = self._call(0.0, 0.0, -2.0, max_ang=1.0)
|
||||||
|
assert abs(wz + 1.0) < 1e-9
|
||||||
|
|
||||||
|
def test_zero_cmd_stays_zero(self):
|
||||||
|
lx, ly, lz, ax, ay, wz = self._call(0.0, 0.0, 0.0)
|
||||||
|
assert lx == 0.0
|
||||||
|
assert ly == 0.0
|
||||||
|
assert wz == 0.0
|
||||||
@ -0,0 +1,27 @@
|
|||||||
|
# VESC CAN Health Monitor — SaltyBot dual FSESC 6.7 Pro
|
||||||
|
# Issue: https://gitea.vayrette.com/seb/saltylab-firmware/issues/651
|
||||||
|
|
||||||
|
vesc_health_monitor:
|
||||||
|
ros__parameters:
|
||||||
|
# SocketCAN interface (must match can-bringup.service / Issue #643)
|
||||||
|
can_interface: "can0"
|
||||||
|
|
||||||
|
# VESC CAN IDs (must match vesc_driver / vesc_telemetry nodes)
|
||||||
|
left_can_id: 61 # 0x3D — left drive motor
|
||||||
|
right_can_id: 79 # 0x4F — right drive motor
|
||||||
|
|
||||||
|
# This node's CAN ID used as sender_id in GET_VALUES recovery requests
|
||||||
|
sender_can_id: 127
|
||||||
|
|
||||||
|
# Timeout before recovery sequence begins (s)
|
||||||
|
# Frames expected at 20 Hz from vesc_telemetry; alert if > 500 ms gap
|
||||||
|
frame_timeout_s: 0.5
|
||||||
|
|
||||||
|
# Timeout before e-stop escalation (s, measured from last good frame)
|
||||||
|
estop_timeout_s: 2.0
|
||||||
|
|
||||||
|
# Health publish + watchdog timer rate
|
||||||
|
health_rate_hz: 10
|
||||||
|
|
||||||
|
# How often to poll CAN interface for bus-off state (via ip link show)
|
||||||
|
bus_off_check_rate_hz: 1
|
||||||
@ -0,0 +1,50 @@
|
|||||||
|
"""Launch file for VESC CAN health monitor node."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg_dir = get_package_share_directory("saltybot_vesc_health")
|
||||||
|
config_file = os.path.join(pkg_dir, "config", "vesc_health_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"config_file",
|
||||||
|
default_value=config_file,
|
||||||
|
description="Path to vesc_health_params.yaml",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"can_interface",
|
||||||
|
default_value="can0",
|
||||||
|
description="SocketCAN interface name",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"frame_timeout_s",
|
||||||
|
default_value="0.5",
|
||||||
|
description="Frame-drop detection threshold (s)",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"estop_timeout_s",
|
||||||
|
default_value="2.0",
|
||||||
|
description="E-stop escalation threshold (s from last good frame)",
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
package="saltybot_vesc_health",
|
||||||
|
executable="vesc_health_node",
|
||||||
|
name="vesc_health_monitor",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
LaunchConfiguration("config_file"),
|
||||||
|
{
|
||||||
|
"can_interface": LaunchConfiguration("can_interface"),
|
||||||
|
"frame_timeout_s": LaunchConfiguration("frame_timeout_s"),
|
||||||
|
"estop_timeout_s": LaunchConfiguration("estop_timeout_s"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
33
jetson/ros2_ws/src/saltybot_vesc_health/package.xml
Normal file
33
jetson/ros2_ws/src/saltybot_vesc_health/package.xml
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_vesc_health</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
VESC CAN health monitor for SaltyBot dual FSESC 6.7 Pro (FW 6.6).
|
||||||
|
Monitors CAN IDs 61 (left) and 79 (right) for frame drops, bus-off errors,
|
||||||
|
and timeouts (>500 ms). Drives a recovery sequence (GET_VALUES alive frames),
|
||||||
|
escalates to e-stop if unresponsive >2 s. Publishes /vesc/health (JSON),
|
||||||
|
/diagnostics, /estop, and /cmd_vel (zero on e-stop). Issue #651.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>diagnostic_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>python3-can</exec_depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,450 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""VESC CAN health monitor node for SaltyBot.
|
||||||
|
|
||||||
|
Monitors both VESC motor controllers (CAN IDs 61 and 79) for frame drops,
|
||||||
|
bus-off errors, and timeouts. Drives a recovery sequence on fault and
|
||||||
|
escalates to an e-stop if a VESC stays unresponsive beyond 2 s.
|
||||||
|
|
||||||
|
Subscribed topics
|
||||||
|
-----------------
|
||||||
|
/vesc/left/state (std_msgs/String) — JSON telemetry from vesc_telemetry node
|
||||||
|
/vesc/right/state (std_msgs/String) — JSON telemetry from vesc_telemetry node
|
||||||
|
|
||||||
|
Published topics
|
||||||
|
----------------
|
||||||
|
/vesc/health (std_msgs/String) — JSON health summary (10 Hz)
|
||||||
|
/diagnostics (diagnostic_msgs/DiagnosticArray)
|
||||||
|
/estop (std_msgs/String) — JSON e-stop event on state change
|
||||||
|
/cmd_vel (geometry_msgs/Twist) — zero velocity when e-stop active
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
can_interface str 'can0' SocketCAN interface
|
||||||
|
left_can_id int 61
|
||||||
|
right_can_id int 79
|
||||||
|
sender_can_id int 127 This node's CAN ID for alive frames
|
||||||
|
frame_timeout_s float 0.5 Timeout before recovery starts
|
||||||
|
estop_timeout_s float 2.0 Timeout before e-stop escalation
|
||||||
|
health_rate_hz int 10 Publish + watchdog rate
|
||||||
|
bus_off_check_rate_hz int 1 How often to poll CAN interface state
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
try:
|
||||||
|
import rclpy
|
||||||
|
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from rclpy.node import Node
|
||||||
|
from std_msgs.msg import String
|
||||||
|
_ROS_AVAILABLE = True
|
||||||
|
except ImportError: # pragma: no cover — missing outside ROS2 env
|
||||||
|
_ROS_AVAILABLE = False
|
||||||
|
Node = object # type: ignore[assignment,misc]
|
||||||
|
|
||||||
|
try:
|
||||||
|
import can as python_can
|
||||||
|
_CAN_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
_CAN_AVAILABLE = False
|
||||||
|
|
||||||
|
from .recovery_fsm import Action, HealthFsm, VescHealthState
|
||||||
|
|
||||||
|
|
||||||
|
class VescHealthMonitor(Node):
|
||||||
|
"""ROS2 node: VESC CAN health watchdog + auto-recovery."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("vesc_health_monitor")
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Parameters
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
self.declare_parameter("can_interface", "can0")
|
||||||
|
self.declare_parameter("left_can_id", 61)
|
||||||
|
self.declare_parameter("right_can_id", 79)
|
||||||
|
self.declare_parameter("sender_can_id", 127)
|
||||||
|
self.declare_parameter("frame_timeout_s", 0.5)
|
||||||
|
self.declare_parameter("estop_timeout_s", 2.0)
|
||||||
|
self.declare_parameter("health_rate_hz", 10)
|
||||||
|
self.declare_parameter("bus_off_check_rate_hz", 1)
|
||||||
|
|
||||||
|
self._iface = self.get_parameter("can_interface").value
|
||||||
|
self._left_id = self.get_parameter("left_can_id").value
|
||||||
|
self._right_id = self.get_parameter("right_can_id").value
|
||||||
|
self._sender_id = self.get_parameter("sender_can_id").value
|
||||||
|
timeout_s = self.get_parameter("frame_timeout_s").value
|
||||||
|
estop_s = self.get_parameter("estop_timeout_s").value
|
||||||
|
hz = max(1, int(self.get_parameter("health_rate_hz").value))
|
||||||
|
bus_hz = max(1, int(self.get_parameter("bus_off_check_rate_hz").value))
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# State machine
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
self._fsm = HealthFsm(
|
||||||
|
left_id=self._left_id,
|
||||||
|
right_id=self._right_id,
|
||||||
|
timeout_s=timeout_s,
|
||||||
|
estop_s=estop_s,
|
||||||
|
)
|
||||||
|
self._fsm_lock = threading.Lock()
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Fault event log (ring buffer, newest last, max 200 entries)
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
self._fault_log: list[dict] = []
|
||||||
|
self._fault_log_lock = threading.Lock()
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# SocketCAN (for sending alive / GET_VALUES recovery frames)
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
self._bus: Optional["python_can.BusABC"] = None
|
||||||
|
self._init_can()
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Publishers
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
self._pub_health = self.create_publisher(String, "/vesc/health", 10)
|
||||||
|
self._pub_diag = self.create_publisher(DiagnosticArray, "/diagnostics", 10)
|
||||||
|
self._pub_estop = self.create_publisher(String, "/estop", 10)
|
||||||
|
self._pub_cmd = self.create_publisher(Twist, "/cmd_vel", 10)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Subscribers
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
self.create_subscription(String, "/vesc/left/state", self._on_left_state, 10)
|
||||||
|
self.create_subscription(String, "/vesc/right/state", self._on_right_state, 10)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Timers
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
self.create_timer(1.0 / hz, self._on_health_tick)
|
||||||
|
self.create_timer(1.0 / bus_hz, self._check_bus_off)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"vesc_health_monitor: iface={self._iface}, "
|
||||||
|
f"left={self._left_id}, right={self._right_id}, "
|
||||||
|
f"timeout={timeout_s}s, estop={estop_s}s, {hz} Hz"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# CAN initialisation
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _init_can(self) -> None:
|
||||||
|
if not _CAN_AVAILABLE:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"python-can not installed — alive frame recovery disabled"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._bus = python_can.interface.Bus(
|
||||||
|
channel=self._iface, bustype="socketcan"
|
||||||
|
)
|
||||||
|
self.get_logger().info(f"CAN bus opened for recovery: {self._iface}")
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Could not open CAN bus for recovery frames ({exc}); "
|
||||||
|
"health monitoring continues via topic subscription"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# State subscribers
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _on_left_state(self, msg: String) -> None:
|
||||||
|
self._handle_state_msg(msg, self._left_id)
|
||||||
|
|
||||||
|
def _on_right_state(self, msg: String) -> None:
|
||||||
|
self._handle_state_msg(msg, self._right_id)
|
||||||
|
|
||||||
|
def _handle_state_msg(self, msg: String, can_id: int) -> None:
|
||||||
|
try:
|
||||||
|
d = json.loads(msg.data)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return
|
||||||
|
|
||||||
|
# The telemetry node sets "alive": true when the VESC is responding.
|
||||||
|
# We track our own timeout independently for tighter control.
|
||||||
|
now = time.monotonic()
|
||||||
|
if d.get("alive", False):
|
||||||
|
with self._fsm_lock:
|
||||||
|
actions = self._fsm.on_frame(can_id, now)
|
||||||
|
self._dispatch_actions(actions, can_id)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Health timer (main watchdog loop)
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _on_health_tick(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
with self._fsm_lock:
|
||||||
|
per_vesc = self._fsm.tick(now)
|
||||||
|
|
||||||
|
for can_id, actions in per_vesc.items():
|
||||||
|
self._dispatch_actions(actions, can_id)
|
||||||
|
|
||||||
|
self._publish_health(now)
|
||||||
|
self._publish_diagnostics(now)
|
||||||
|
|
||||||
|
# If any VESC is in e-stop, keep asserting zero velocity
|
||||||
|
with self._fsm_lock:
|
||||||
|
if self._fsm.any_estop:
|
||||||
|
self._pub_cmd.publish(Twist()) # zero Twist = stop
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Bus-off check
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _check_bus_off(self) -> None:
|
||||||
|
bus_off = self._is_can_bus_off()
|
||||||
|
now = time.monotonic()
|
||||||
|
with self._fsm_lock:
|
||||||
|
if bus_off:
|
||||||
|
actions = self._fsm.on_bus_off(now)
|
||||||
|
else:
|
||||||
|
actions = self._fsm.on_bus_ok(now)
|
||||||
|
self._dispatch_actions(actions, can_id=None)
|
||||||
|
|
||||||
|
def _is_can_bus_off(self) -> bool:
|
||||||
|
"""Return True if the CAN interface is in bus-off state."""
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["ip", "-json", "link", "show", self._iface],
|
||||||
|
capture_output=True, text=True, timeout=2.0
|
||||||
|
)
|
||||||
|
if result.returncode != 0:
|
||||||
|
return False
|
||||||
|
data = json.loads(result.stdout)
|
||||||
|
if not data:
|
||||||
|
return False
|
||||||
|
flags = data[0].get("flags", [])
|
||||||
|
# Linux SocketCAN sets "BUS-OFF" in the flags list
|
||||||
|
return "BUS-OFF" in [f.upper() for f in flags]
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Action dispatcher
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _dispatch_actions(self, actions: list[Action], can_id: int | None) -> None:
|
||||||
|
label = self._id_to_label(can_id) if can_id is not None else "bus"
|
||||||
|
for action in actions:
|
||||||
|
if action == Action.SEND_ALIVE:
|
||||||
|
self._send_alive(can_id)
|
||||||
|
elif action == Action.TRIGGER_ESTOP:
|
||||||
|
self._on_estop_trigger(can_id, label)
|
||||||
|
elif action == Action.CLEAR_ESTOP:
|
||||||
|
self._on_estop_clear(can_id, label)
|
||||||
|
elif action == Action.LOG_WARN:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"VESC {label} (id={can_id}): "
|
||||||
|
"no CAN frames for >500 ms — sending alive frame"
|
||||||
|
)
|
||||||
|
self._log_fault_event("timeout", can_id, label)
|
||||||
|
elif action == Action.LOG_ERROR:
|
||||||
|
self.get_logger().error(
|
||||||
|
f"VESC {label} (id={can_id}): "
|
||||||
|
"unresponsive >2 s — e-stop triggered"
|
||||||
|
)
|
||||||
|
self._log_fault_event("estop", can_id, label)
|
||||||
|
elif action == Action.LOG_RECOVERY:
|
||||||
|
self.get_logger().info(
|
||||||
|
f"VESC {label} (id={can_id}): CAN frames resumed — recovered"
|
||||||
|
)
|
||||||
|
self._log_fault_event("recovery", can_id, label)
|
||||||
|
|
||||||
|
def _on_estop_trigger(self, can_id: int | None, label: str) -> None:
|
||||||
|
self._pub_cmd.publish(Twist()) # immediate zero velocity
|
||||||
|
payload = json.dumps({
|
||||||
|
"event": "estop_triggered",
|
||||||
|
"can_id": can_id,
|
||||||
|
"label": label,
|
||||||
|
"stamp": time.time(),
|
||||||
|
})
|
||||||
|
self._pub_estop.publish(String(data=payload))
|
||||||
|
|
||||||
|
def _on_estop_clear(self, can_id: int | None, label: str) -> None:
|
||||||
|
payload = json.dumps({
|
||||||
|
"event": "estop_cleared",
|
||||||
|
"can_id": can_id,
|
||||||
|
"label": label,
|
||||||
|
"stamp": time.time(),
|
||||||
|
})
|
||||||
|
self._pub_estop.publish(String(data=payload))
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Alive / recovery frame
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _send_alive(self, can_id: int | None) -> None:
|
||||||
|
"""Send GET_VALUES request to prompt telemetry refresh."""
|
||||||
|
if self._bus is None or can_id is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
arb_id, payload = _make_get_values_request(self._sender_id, can_id)
|
||||||
|
self._bus.send(python_can.Message(
|
||||||
|
arbitration_id=arb_id,
|
||||||
|
data=payload,
|
||||||
|
is_extended_id=True,
|
||||||
|
))
|
||||||
|
self.get_logger().debug(
|
||||||
|
f"Sent GET_VALUES alive frame to VESC id={can_id}"
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(f"Failed to send alive frame (id={can_id}): {exc}")
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Publishers
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _publish_health(self, now: float) -> None:
|
||||||
|
with self._fsm_lock:
|
||||||
|
left_state = self._fsm.left.state.value
|
||||||
|
right_state = self._fsm.right.state.value
|
||||||
|
left_elapsed = self._fsm.left.elapsed(now)
|
||||||
|
right_elapsed = self._fsm.right.elapsed(now)
|
||||||
|
left_estop = self._fsm.left.estop_active
|
||||||
|
right_estop = self._fsm.right.estop_active
|
||||||
|
bus_off = self._fsm._bus_off
|
||||||
|
any_estop = self._fsm.any_estop
|
||||||
|
|
||||||
|
health = {
|
||||||
|
"stamp": time.time(),
|
||||||
|
"left": {
|
||||||
|
"can_id": self._left_id,
|
||||||
|
"state": left_state,
|
||||||
|
"elapsed_s": round(min(left_elapsed, 9999.0), 3),
|
||||||
|
"estop_active": left_estop,
|
||||||
|
},
|
||||||
|
"right": {
|
||||||
|
"can_id": self._right_id,
|
||||||
|
"state": right_state,
|
||||||
|
"elapsed_s": round(min(right_elapsed, 9999.0), 3),
|
||||||
|
"estop_active": right_estop,
|
||||||
|
},
|
||||||
|
"bus_off": bus_off,
|
||||||
|
"estop_active": any_estop,
|
||||||
|
}
|
||||||
|
|
||||||
|
with self._fault_log_lock:
|
||||||
|
health["recent_faults"] = list(self._fault_log[-10:])
|
||||||
|
|
||||||
|
self._pub_health.publish(String(data=json.dumps(health)))
|
||||||
|
|
||||||
|
def _publish_diagnostics(self, now: float) -> None:
|
||||||
|
array = DiagnosticArray()
|
||||||
|
array.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
|
||||||
|
with self._fsm_lock:
|
||||||
|
monitors = [
|
||||||
|
(self._fsm.left, "left"),
|
||||||
|
(self._fsm.right, "right"),
|
||||||
|
]
|
||||||
|
|
||||||
|
for mon, label in monitors:
|
||||||
|
status = DiagnosticStatus()
|
||||||
|
status.name = f"VESC/health/{label} (CAN ID {mon.can_id})"
|
||||||
|
status.hardware_id = f"vesc_health_{mon.can_id}"
|
||||||
|
|
||||||
|
elapsed = mon.elapsed(now)
|
||||||
|
|
||||||
|
if mon.state == VescHealthState.ESTOP:
|
||||||
|
status.level = DiagnosticStatus.ERROR
|
||||||
|
status.message = "E-STOP: VESC unresponsive >2 s"
|
||||||
|
elif mon.state == VescHealthState.BUS_OFF:
|
||||||
|
status.level = DiagnosticStatus.ERROR
|
||||||
|
status.message = "CAN bus-off error"
|
||||||
|
elif mon.state == VescHealthState.DEGRADED:
|
||||||
|
status.level = DiagnosticStatus.WARN
|
||||||
|
status.message = f"Frame timeout ({elapsed:.2f} s) — recovery active"
|
||||||
|
else:
|
||||||
|
status.level = DiagnosticStatus.OK
|
||||||
|
status.message = "OK"
|
||||||
|
|
||||||
|
status.values = [
|
||||||
|
KeyValue(key="state", value=mon.state.value),
|
||||||
|
KeyValue(key="elapsed_s", value=f"{min(elapsed, 9999.0):.3f}"),
|
||||||
|
KeyValue(key="estop_active", value=str(mon.estop_active)),
|
||||||
|
]
|
||||||
|
array.status.append(status)
|
||||||
|
|
||||||
|
self._pub_diag.publish(array)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Fault event log
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _log_fault_event(self, event: str, can_id: int | None, label: str) -> None:
|
||||||
|
entry = {
|
||||||
|
"event": event,
|
||||||
|
"can_id": can_id,
|
||||||
|
"label": label,
|
||||||
|
"stamp": time.time(),
|
||||||
|
}
|
||||||
|
with self._fault_log_lock:
|
||||||
|
self._fault_log.append(entry)
|
||||||
|
if len(self._fault_log) > 200:
|
||||||
|
self._fault_log = self._fault_log[-200:]
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def _id_to_label(self, can_id: int | None) -> str:
|
||||||
|
if can_id == self._left_id:
|
||||||
|
return "left"
|
||||||
|
if can_id == self._right_id:
|
||||||
|
return "right"
|
||||||
|
return str(can_id)
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# Cleanup
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
if self._bus is not None:
|
||||||
|
self._bus.shutdown()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Inline GET_VALUES request builder (avoids cross-package import at runtime)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_CAN_PACKET_PROCESS_SHORT_BUFFER = 32
|
||||||
|
_COMM_GET_VALUES = 4
|
||||||
|
|
||||||
|
|
||||||
|
def _make_get_values_request(sender_id: int, target_id: int) -> tuple[int, bytes]:
|
||||||
|
"""Build a GET_VALUES short-buffer CAN frame for the given VESC."""
|
||||||
|
arb_id = (_CAN_PACKET_PROCESS_SHORT_BUFFER << 8) | (target_id & 0xFF)
|
||||||
|
payload = bytes([sender_id & 0xFF, 0x00, _COMM_GET_VALUES])
|
||||||
|
return arb_id, payload
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Entry point
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = VescHealthMonitor()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,223 @@
|
|||||||
|
"""VESC CAN health monitor — pure state machine.
|
||||||
|
|
||||||
|
No ROS2, python-can, or hardware dependencies. Safe to import in unit tests.
|
||||||
|
|
||||||
|
State transitions per VESC
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
HEALTHY ──(elapsed > timeout_s = 0.5 s)──► DEGRADED (send alive, log warn)
|
||||||
|
│
|
||||||
|
◄──(frame received)─────────────┘ (log recovery)
|
||||||
|
│
|
||||||
|
(elapsed > estop_s = 2.0 s)──► ESTOP (trigger e-stop)
|
||||||
|
│
|
||||||
|
◄──(frame received)──────────────────────────┘ (clear e-stop)
|
||||||
|
|
||||||
|
BUS_OFF is a parallel override: set by node when the CAN interface reports
|
||||||
|
bus-off; cleared when the interface recovers. While active it always
|
||||||
|
produces TRIGGER_ESTOP.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Enumerations
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class VescHealthState(str, Enum):
|
||||||
|
HEALTHY = "healthy"
|
||||||
|
DEGRADED = "degraded" # >timeout_s, alive frames being sent
|
||||||
|
ESTOP = "estop" # >estop_s unresponsive, e-stop asserted
|
||||||
|
BUS_OFF = "bus_off" # CAN bus-off error
|
||||||
|
|
||||||
|
|
||||||
|
class Action(str, Enum):
|
||||||
|
SEND_ALIVE = "send_alive" # send GET_VALUES request
|
||||||
|
TRIGGER_ESTOP = "trigger_estop" # publish zero cmd_vel / /estop
|
||||||
|
CLEAR_ESTOP = "clear_estop" # stop asserting e-stop
|
||||||
|
LOG_WARN = "log_warn" # WARN-level log
|
||||||
|
LOG_ERROR = "log_error" # ERROR-level log
|
||||||
|
LOG_RECOVERY = "log_recovery" # INFO-level recovery log
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Per-VESC state machine
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class VescMonitor:
|
||||||
|
"""Monitors one VESC and drives the recovery state machine.
|
||||||
|
|
||||||
|
All timestamps are ``time.monotonic()`` seconds.
|
||||||
|
|
||||||
|
Usage::
|
||||||
|
|
||||||
|
mon = VescMonitor(can_id=61)
|
||||||
|
actions = mon.tick(now) # call at ~10 Hz
|
||||||
|
actions = mon.on_frame(now) # call when a CAN frame arrives
|
||||||
|
"""
|
||||||
|
|
||||||
|
can_id: int
|
||||||
|
timeout_s: float = 0.5 # frame-drop detection threshold
|
||||||
|
estop_s: float = 2.0 # escalation to e-stop threshold
|
||||||
|
alive_retry_s: float = 0.2 # interval between alive retries in DEGRADED
|
||||||
|
|
||||||
|
# Mutable runtime state (not init args)
|
||||||
|
state: VescHealthState = field(default=VescHealthState.HEALTHY, init=False)
|
||||||
|
last_frame_ts: float = field(default=0.0, init=False) # 0 = never
|
||||||
|
_first_tick_ts: float = field(default=0.0, init=False) # set on first tick
|
||||||
|
_last_alive_ts: float = field(default=0.0, init=False)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def on_frame(self, now: float) -> list[Action]:
|
||||||
|
"""Call whenever a STATUS frame arrives from this VESC."""
|
||||||
|
self.last_frame_ts = now
|
||||||
|
|
||||||
|
prev = self.state
|
||||||
|
if prev in (VescHealthState.DEGRADED, VescHealthState.ESTOP,
|
||||||
|
VescHealthState.BUS_OFF):
|
||||||
|
self.state = VescHealthState.HEALTHY
|
||||||
|
actions: list[Action] = [Action.LOG_RECOVERY]
|
||||||
|
if prev in (VescHealthState.ESTOP, VescHealthState.BUS_OFF):
|
||||||
|
actions.append(Action.CLEAR_ESTOP)
|
||||||
|
return actions
|
||||||
|
return []
|
||||||
|
|
||||||
|
def on_bus_off(self, now: float) -> list[Action]:
|
||||||
|
"""Call when the CAN interface enters bus-off state."""
|
||||||
|
if self.state != VescHealthState.BUS_OFF:
|
||||||
|
self.state = VescHealthState.BUS_OFF
|
||||||
|
return [Action.LOG_ERROR, Action.TRIGGER_ESTOP]
|
||||||
|
return [Action.TRIGGER_ESTOP]
|
||||||
|
|
||||||
|
def on_bus_ok(self, now: float) -> list[Action]:
|
||||||
|
"""Call when the CAN interface recovers from bus-off."""
|
||||||
|
if self.state == VescHealthState.BUS_OFF:
|
||||||
|
self.state = VescHealthState.HEALTHY
|
||||||
|
self.last_frame_ts = 0.0 # wait for first fresh frame
|
||||||
|
return [Action.LOG_RECOVERY, Action.CLEAR_ESTOP]
|
||||||
|
return []
|
||||||
|
|
||||||
|
def tick(self, now: float) -> list[Action]:
|
||||||
|
"""Periodic update (~10 Hz). Returns list of actions to take."""
|
||||||
|
# BUS_OFF and ESTOP keep asserting until a frame or bus recovery
|
||||||
|
if self.state == VescHealthState.BUS_OFF:
|
||||||
|
return [Action.TRIGGER_ESTOP]
|
||||||
|
if self.state == VescHealthState.ESTOP:
|
||||||
|
return [Action.TRIGGER_ESTOP]
|
||||||
|
|
||||||
|
# Track elapsed since startup for the "never received" case so we
|
||||||
|
# don't immediately e-stop on node startup before any frames arrive.
|
||||||
|
if self._first_tick_ts == 0.0:
|
||||||
|
self._first_tick_ts = now
|
||||||
|
|
||||||
|
if self.last_frame_ts > 0:
|
||||||
|
elapsed = now - self.last_frame_ts
|
||||||
|
else:
|
||||||
|
elapsed = now - self._first_tick_ts
|
||||||
|
|
||||||
|
if elapsed >= self.estop_s:
|
||||||
|
# Escalate to e-stop
|
||||||
|
if self.state != VescHealthState.ESTOP:
|
||||||
|
self.state = VescHealthState.ESTOP
|
||||||
|
return [Action.TRIGGER_ESTOP, Action.LOG_ERROR]
|
||||||
|
return [Action.TRIGGER_ESTOP]
|
||||||
|
|
||||||
|
if elapsed >= self.timeout_s:
|
||||||
|
if self.state == VescHealthState.HEALTHY:
|
||||||
|
# First timeout — transition to DEGRADED and send alive
|
||||||
|
self.state = VescHealthState.DEGRADED
|
||||||
|
self._last_alive_ts = now
|
||||||
|
return [Action.SEND_ALIVE, Action.LOG_WARN]
|
||||||
|
elif self.state == VescHealthState.DEGRADED:
|
||||||
|
# Retry alive at interval
|
||||||
|
if now - self._last_alive_ts >= self.alive_retry_s:
|
||||||
|
self._last_alive_ts = now
|
||||||
|
return [Action.SEND_ALIVE]
|
||||||
|
return []
|
||||||
|
|
||||||
|
# elapsed < timeout_s — connection is healthy
|
||||||
|
if self.state == VescHealthState.DEGRADED:
|
||||||
|
# Frame arrived (on_frame handles ESTOP/BUS_OFF → HEALTHY)
|
||||||
|
# If somehow we reach here still DEGRADED, clear it
|
||||||
|
self.state = VescHealthState.HEALTHY
|
||||||
|
return [Action.LOG_RECOVERY]
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Introspection helpers
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_healthy(self) -> bool:
|
||||||
|
return self.state == VescHealthState.HEALTHY
|
||||||
|
|
||||||
|
@property
|
||||||
|
def estop_active(self) -> bool:
|
||||||
|
return self.state in (VescHealthState.ESTOP, VescHealthState.BUS_OFF)
|
||||||
|
|
||||||
|
def elapsed(self, now: float) -> float:
|
||||||
|
"""Seconds since last frame (inf if never received)."""
|
||||||
|
return (now - self.last_frame_ts) if self.last_frame_ts > 0 else float("inf")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Dual-VESC wrapper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HealthFsm:
|
||||||
|
"""Manages two VescMonitor instances and aggregates bus-off state."""
|
||||||
|
|
||||||
|
left_id: int = 61
|
||||||
|
right_id: int = 79
|
||||||
|
timeout_s: float = 0.5
|
||||||
|
estop_s: float = 2.0
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
self.left = VescMonitor(self.left_id, self.timeout_s, self.estop_s)
|
||||||
|
self.right = VescMonitor(self.right_id, self.timeout_s, self.estop_s)
|
||||||
|
self._bus_off = False
|
||||||
|
|
||||||
|
def monitors(self) -> tuple[VescMonitor, VescMonitor]:
|
||||||
|
return self.left, self.right
|
||||||
|
|
||||||
|
def on_frame(self, can_id: int, now: float) -> list[Action]:
|
||||||
|
mon = self._get(can_id)
|
||||||
|
return mon.on_frame(now) if mon else []
|
||||||
|
|
||||||
|
def on_bus_off(self, now: float) -> list[Action]:
|
||||||
|
if not self._bus_off:
|
||||||
|
self._bus_off = True
|
||||||
|
return self.left.on_bus_off(now) + self.right.on_bus_off(now)
|
||||||
|
return [Action.TRIGGER_ESTOP]
|
||||||
|
|
||||||
|
def on_bus_ok(self, now: float) -> list[Action]:
|
||||||
|
if self._bus_off:
|
||||||
|
self._bus_off = False
|
||||||
|
return self.left.on_bus_ok(now) + self.right.on_bus_ok(now)
|
||||||
|
return []
|
||||||
|
|
||||||
|
def tick(self, now: float) -> dict[int, list[Action]]:
|
||||||
|
"""Returns {can_id: [actions]} for each VESC."""
|
||||||
|
return {
|
||||||
|
self.left_id: self.left.tick(now),
|
||||||
|
self.right_id: self.right.tick(now),
|
||||||
|
}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def any_estop(self) -> bool:
|
||||||
|
return self.left.estop_active or self.right.estop_active
|
||||||
|
|
||||||
|
def _get(self, can_id: int) -> VescMonitor | None:
|
||||||
|
if can_id == self.left_id:
|
||||||
|
return self.left
|
||||||
|
if can_id == self.right_id:
|
||||||
|
return self.right
|
||||||
|
return None
|
||||||
4
jetson/ros2_ws/src/saltybot_vesc_health/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_vesc_health/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_vesc_health
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_vesc_health
|
||||||
27
jetson/ros2_ws/src/saltybot_vesc_health/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_vesc_health/setup.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_vesc_health"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/vesc_health.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/vesc_health_params.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools", "python-can"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-jetson",
|
||||||
|
maintainer_email="sl-jetson@saltylab.local",
|
||||||
|
description="VESC CAN health monitor with watchdog and auto-recovery",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"vesc_health_node = saltybot_vesc_health.health_monitor_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
Binary file not shown.
Binary file not shown.
355
jetson/ros2_ws/src/saltybot_vesc_health/test/test_vesc_health.py
Normal file
355
jetson/ros2_ws/src/saltybot_vesc_health/test/test_vesc_health.py
Normal file
@ -0,0 +1,355 @@
|
|||||||
|
"""Unit tests for VESC CAN health monitor — recovery FSM.
|
||||||
|
|
||||||
|
No ROS2, python-can, or hardware required. Tests cover:
|
||||||
|
- VescMonitor state machine transitions
|
||||||
|
- Timeout → DEGRADED → ESTOP escalation
|
||||||
|
- Frame-received recovery from DEGRADED and ESTOP
|
||||||
|
- Bus-off override and recovery
|
||||||
|
- Alive retry interval in DEGRADED
|
||||||
|
- HealthFsm dual-VESC wrapper
|
||||||
|
- Edge cases (never seen, exact boundary, bus_ok when not bus_off)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_vesc_health.recovery_fsm import (
|
||||||
|
Action,
|
||||||
|
HealthFsm,
|
||||||
|
VescHealthState,
|
||||||
|
VescMonitor,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
T0 = 1_000.0 # arbitrary monotonic baseline (seconds)
|
||||||
|
|
||||||
|
|
||||||
|
def fresh_mon(**kwargs) -> VescMonitor:
|
||||||
|
"""Return a VescMonitor that has never seen a frame."""
|
||||||
|
return VescMonitor(can_id=61, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def alive_mon(last_ts: float = T0, **kwargs) -> VescMonitor:
|
||||||
|
"""Return a VescMonitor whose last frame was at last_ts."""
|
||||||
|
mon = VescMonitor(can_id=61, **kwargs)
|
||||||
|
mon.last_frame_ts = last_ts
|
||||||
|
return mon
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — initial state
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestVescMonitorInit:
|
||||||
|
def test_starts_healthy(self):
|
||||||
|
assert fresh_mon().state == VescHealthState.HEALTHY
|
||||||
|
|
||||||
|
def test_never_received_is_unhealthy_after_timeout(self):
|
||||||
|
mon = fresh_mon(timeout_s=0.5)
|
||||||
|
# First tick records startup time; second tick 600ms later sees timeout
|
||||||
|
mon.tick(T0) # startup: elapsed=0, still healthy
|
||||||
|
actions = mon.tick(T0 + 0.6) # 600 ms > timeout → DEGRADED
|
||||||
|
assert Action.SEND_ALIVE in actions
|
||||||
|
assert Action.LOG_WARN in actions
|
||||||
|
assert mon.state == VescHealthState.DEGRADED
|
||||||
|
|
||||||
|
def test_elapsed_never_received(self):
|
||||||
|
mon = fresh_mon()
|
||||||
|
assert mon.elapsed(T0) == float("inf")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — healthy zone
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestHealthyZone:
|
||||||
|
def test_no_actions_when_fresh_frames_arriving(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
actions = mon.tick(T0 + 0.1) # 100 ms — well within 500 ms
|
||||||
|
assert actions == []
|
||||||
|
assert mon.state == VescHealthState.HEALTHY
|
||||||
|
|
||||||
|
def test_no_transition_at_just_under_timeout(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5)
|
||||||
|
actions = mon.tick(T0 + 0.499)
|
||||||
|
assert actions == []
|
||||||
|
assert mon.state == VescHealthState.HEALTHY
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — timeout → DEGRADED
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestTimeoutToDegraded:
|
||||||
|
def test_transitions_at_timeout(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5)
|
||||||
|
actions = mon.tick(T0 + 0.5)
|
||||||
|
assert mon.state == VescHealthState.DEGRADED
|
||||||
|
assert Action.SEND_ALIVE in actions
|
||||||
|
assert Action.LOG_WARN in actions
|
||||||
|
|
||||||
|
def test_send_alive_not_repeated_immediately(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, alive_retry_s=0.2)
|
||||||
|
mon.tick(T0 + 0.5) # → DEGRADED, sends alive
|
||||||
|
actions = mon.tick(T0 + 0.55) # only 50 ms later → no retry yet
|
||||||
|
assert Action.SEND_ALIVE not in actions
|
||||||
|
|
||||||
|
def test_alive_retry_at_interval(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, alive_retry_s=0.2)
|
||||||
|
mon.tick(T0 + 0.5) # → DEGRADED, first alive
|
||||||
|
actions = mon.tick(T0 + 0.71) # 210 ms later → retry
|
||||||
|
assert Action.SEND_ALIVE in actions
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — DEGRADED → ESTOP
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestDegradedToEstop:
|
||||||
|
def test_escalates_at_estop_threshold(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
|
||||||
|
mon.tick(T0 + 0.5) # → DEGRADED
|
||||||
|
actions = mon.tick(T0 + 2.0)
|
||||||
|
assert mon.state == VescHealthState.ESTOP
|
||||||
|
assert Action.TRIGGER_ESTOP in actions
|
||||||
|
assert Action.LOG_ERROR in actions
|
||||||
|
|
||||||
|
def test_estop_keeps_asserting(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
|
||||||
|
mon.tick(T0 + 0.5)
|
||||||
|
mon.tick(T0 + 2.0) # → ESTOP
|
||||||
|
actions1 = mon.tick(T0 + 2.1)
|
||||||
|
actions2 = mon.tick(T0 + 3.0)
|
||||||
|
assert Action.TRIGGER_ESTOP in actions1
|
||||||
|
assert Action.TRIGGER_ESTOP in actions2
|
||||||
|
|
||||||
|
def test_no_duplicate_log_error_while_in_estop(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
|
||||||
|
mon.tick(T0 + 0.5)
|
||||||
|
actions_at_escalation = mon.tick(T0 + 2.0) # LOG_ERROR here
|
||||||
|
assert Action.LOG_ERROR in actions_at_escalation
|
||||||
|
later_actions = mon.tick(T0 + 2.1)
|
||||||
|
assert Action.LOG_ERROR not in later_actions # not repeated
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — recovery from DEGRADED
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestRecoveryFromDegraded:
|
||||||
|
def test_on_frame_clears_degraded(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5)
|
||||||
|
mon.tick(T0 + 0.5) # → DEGRADED
|
||||||
|
actions = mon.on_frame(T0 + 0.9)
|
||||||
|
assert mon.state == VescHealthState.HEALTHY
|
||||||
|
assert Action.LOG_RECOVERY in actions
|
||||||
|
|
||||||
|
def test_on_frame_does_not_trigger_clear_estop_from_degraded(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5)
|
||||||
|
mon.tick(T0 + 0.5) # → DEGRADED (not ESTOP)
|
||||||
|
actions = mon.on_frame(T0 + 0.9)
|
||||||
|
assert Action.CLEAR_ESTOP not in actions
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — recovery from ESTOP
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestRecoveryFromEstop:
|
||||||
|
def test_on_frame_clears_estop(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
|
||||||
|
mon.tick(T0 + 0.5)
|
||||||
|
mon.tick(T0 + 2.0) # → ESTOP
|
||||||
|
actions = mon.on_frame(T0 + 2.5)
|
||||||
|
assert mon.state == VescHealthState.HEALTHY
|
||||||
|
assert Action.LOG_RECOVERY in actions
|
||||||
|
assert Action.CLEAR_ESTOP in actions
|
||||||
|
|
||||||
|
def test_tick_after_recovery_is_clean(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
|
||||||
|
mon.tick(T0 + 0.5)
|
||||||
|
mon.tick(T0 + 2.0)
|
||||||
|
mon.on_frame(T0 + 2.5)
|
||||||
|
actions = mon.tick(T0 + 2.6)
|
||||||
|
assert actions == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — bus-off
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestBusOff:
|
||||||
|
def test_bus_off_triggers_estop(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
actions = mon.on_bus_off(T0 + 1.0)
|
||||||
|
assert mon.state == VescHealthState.BUS_OFF
|
||||||
|
assert Action.TRIGGER_ESTOP in actions
|
||||||
|
assert Action.LOG_ERROR in actions
|
||||||
|
|
||||||
|
def test_bus_off_repeated_call_still_asserts_estop(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
mon.on_bus_off(T0)
|
||||||
|
actions = mon.on_bus_off(T0 + 0.5)
|
||||||
|
assert Action.TRIGGER_ESTOP in actions
|
||||||
|
|
||||||
|
def test_bus_ok_clears_bus_off(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
mon.on_bus_off(T0)
|
||||||
|
actions = mon.on_bus_ok(T0 + 1.0)
|
||||||
|
assert mon.state == VescHealthState.HEALTHY
|
||||||
|
assert Action.CLEAR_ESTOP in actions
|
||||||
|
assert Action.LOG_RECOVERY in actions
|
||||||
|
|
||||||
|
def test_bus_ok_when_not_bus_off_is_noop(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
assert mon.on_bus_ok(T0) == []
|
||||||
|
|
||||||
|
def test_bus_off_tick_keeps_asserting_estop(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
mon.on_bus_off(T0)
|
||||||
|
actions = mon.tick(T0 + 1.0)
|
||||||
|
assert Action.TRIGGER_ESTOP in actions
|
||||||
|
|
||||||
|
def test_on_frame_clears_bus_off(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
mon.on_bus_off(T0)
|
||||||
|
actions = mon.on_frame(T0 + 0.5)
|
||||||
|
assert mon.state == VescHealthState.HEALTHY
|
||||||
|
assert Action.CLEAR_ESTOP in actions
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — elapsed helper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestElapsed:
|
||||||
|
def test_elapsed_after_frame(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
assert mon.elapsed(T0 + 0.3) == pytest.approx(0.3)
|
||||||
|
|
||||||
|
def test_elapsed_never_received(self):
|
||||||
|
mon = fresh_mon()
|
||||||
|
assert mon.elapsed(T0) == float("inf")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# VescMonitor — estop_active property
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestEstopActive:
|
||||||
|
def test_healthy_not_active(self):
|
||||||
|
assert not alive_mon().estop_active
|
||||||
|
|
||||||
|
def test_estop_state_is_active(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5, estop_s=2.0)
|
||||||
|
mon.tick(T0 + 0.5)
|
||||||
|
mon.tick(T0 + 2.0)
|
||||||
|
assert mon.estop_active
|
||||||
|
|
||||||
|
def test_bus_off_is_active(self):
|
||||||
|
mon = alive_mon(last_ts=T0)
|
||||||
|
mon.on_bus_off(T0)
|
||||||
|
assert mon.estop_active
|
||||||
|
|
||||||
|
def test_degraded_not_active(self):
|
||||||
|
mon = alive_mon(last_ts=T0, timeout_s=0.5)
|
||||||
|
mon.tick(T0 + 0.5)
|
||||||
|
assert not mon.estop_active
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# HealthFsm — dual-VESC wrapper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestHealthFsm:
|
||||||
|
def _make_fsm(self) -> HealthFsm:
|
||||||
|
fsm = HealthFsm(left_id=61, right_id=79, timeout_s=0.5, estop_s=2.0)
|
||||||
|
fsm.left.last_frame_ts = T0
|
||||||
|
fsm.right.last_frame_ts = T0
|
||||||
|
return fsm
|
||||||
|
|
||||||
|
def test_tick_returns_per_vesc_actions(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
result = fsm.tick(T0 + 0.1)
|
||||||
|
assert 61 in result
|
||||||
|
assert 79 in result
|
||||||
|
|
||||||
|
def test_on_frame_updates_left(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
fsm.left.last_frame_ts = T0 - 1.0 # stale
|
||||||
|
fsm.left.state = VescHealthState.DEGRADED
|
||||||
|
actions = fsm.on_frame(61, T0 + 0.1)
|
||||||
|
assert Action.LOG_RECOVERY in actions
|
||||||
|
assert fsm.left.state == VescHealthState.HEALTHY
|
||||||
|
|
||||||
|
def test_on_frame_unknown_id_is_noop(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
assert fsm.on_frame(99, T0) == []
|
||||||
|
|
||||||
|
def test_any_estop_true_when_one_vesc_estop(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
fsm.left.last_frame_ts = T0 - 3.0
|
||||||
|
fsm.tick(T0) # should escalate left to ESTOP
|
||||||
|
assert fsm.any_estop
|
||||||
|
|
||||||
|
def test_any_estop_false_when_both_healthy(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
fsm.tick(T0 + 0.1)
|
||||||
|
assert not fsm.any_estop
|
||||||
|
|
||||||
|
def test_bus_off_propagates_to_both(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
actions = fsm.on_bus_off(T0)
|
||||||
|
assert fsm.left.state == VescHealthState.BUS_OFF
|
||||||
|
assert fsm.right.state == VescHealthState.BUS_OFF
|
||||||
|
assert any(a == Action.TRIGGER_ESTOP for a in actions)
|
||||||
|
|
||||||
|
def test_bus_ok_clears_both(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
fsm.on_bus_off(T0)
|
||||||
|
actions = fsm.on_bus_ok(T0 + 1.0)
|
||||||
|
assert fsm.left.state == VescHealthState.HEALTHY
|
||||||
|
assert fsm.right.state == VescHealthState.HEALTHY
|
||||||
|
assert any(a == Action.CLEAR_ESTOP for a in actions)
|
||||||
|
|
||||||
|
def test_bus_ok_when_not_bus_off_is_noop(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
assert fsm.on_bus_ok(T0) == []
|
||||||
|
|
||||||
|
def test_left_timeout_right_healthy(self):
|
||||||
|
fsm = self._make_fsm()
|
||||||
|
fsm.left.last_frame_ts = T0 - 1.0 # stale
|
||||||
|
result = fsm.tick(T0)
|
||||||
|
assert Action.SEND_ALIVE in result[61]
|
||||||
|
assert result[79] == []
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _make_get_values_request (inline copy in node module)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestMakeGetValuesRequest:
|
||||||
|
def test_arb_id_encodes_target(self):
|
||||||
|
from saltybot_vesc_health.health_monitor_node import _make_get_values_request
|
||||||
|
arb_id, _ = _make_get_values_request(127, 61)
|
||||||
|
assert (arb_id & 0xFF) == 61
|
||||||
|
|
||||||
|
def test_payload_has_sender_and_comm_id(self):
|
||||||
|
from saltybot_vesc_health.health_monitor_node import (
|
||||||
|
_make_get_values_request,
|
||||||
|
_COMM_GET_VALUES,
|
||||||
|
)
|
||||||
|
_, payload = _make_get_values_request(42, 61)
|
||||||
|
assert payload[0] == 42
|
||||||
|
assert payload[2] == _COMM_GET_VALUES
|
||||||
|
|
||||||
|
def test_arb_id_is_extended(self):
|
||||||
|
from saltybot_vesc_health.health_monitor_node import _make_get_values_request
|
||||||
|
arb_id, _ = _make_get_values_request(127, 61)
|
||||||
|
assert arb_id > 0x7FF
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
Loading…
x
Reference in New Issue
Block a user