Compare commits

...

5 Commits

Author SHA1 Message Date
bc45208768 feat: Add Issue #194 - Speed limiter node (50Hz safety constraints)
Implements speed limiter ROS2 node that enforces safety constraints from:
  - Terrain type (via /saltybot/terrain_speed_scale)
  - Battery level (via /saltybot/speed_limit)
  - Emergency system state (via /saltybot/emergency)

Node computes minimum scale factor and applies to /cmd_vel, forwarding
limited commands to /saltybot/cmd_vel_limited. Publishes JSON telemetry
on /saltybot/speed_limit_info at 50Hz (20ms cycle).

Features:
  - Subscription to terrain, battery, and emergency constraints
  - Twist velocity scaling with min-of-three constraint logic
  - JSON telemetry with timestamp and all scale factors
  - 50Hz timer-based publishing for real-time control
  - Emergency state mapping (NOMINAL/RECOVERING/STOPPING/ESCALATED)
  - Comprehensive unit tests (18 test cases)

Package: saltybot_speed_limiter
Entry point: speed_limiter_node
Launch file: speed_limiter.launch.py

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-03-02 11:14:10 -05:00
b345128427 Merge pull request 'feat(jetson): camera health watchdog — 6 streams, WARNING/ERROR, v4l2 reset (issue #198)' (#199) from sl-perception/issue-198-camera-health into main 2026-03-02 11:12:32 -05:00
e6065e1531 feat(jetson): camera health watchdog node (issue #198)
Adds camera_health_node.py + _camera_state.py to saltybot_bringup:

• _camera_state.py  — pure-Python CameraState dataclass (no ROS2):
                       on_frame(), age_s, fps(window_s), status(),
                       should_reset() + mark_reset() with 30s cooldown

• camera_health_node.py — subscribes 6 image topics (D435i color/depth
                           + 4× IMX219 CSI front/right/rear/left);
                           1 Hz tick: WARNING at >2s silence, ERROR at
                           >10s + v4l2 stream-off/on reset for CSI cams;
                           publishes /saltybot/camera_health JSON with
                           per-camera status, age_s, fps, total_frames

• test/test_camera_health.py — 15 unit tests (15/15 pass, no ROS2 needed)
• setup.py — adds camera_health_monitor console_scripts entry point

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 11:11:48 -05:00
7678ddebe2 Merge pull request 'feat(jetson): AprilTag landmark detector — DICT_APRILTAG_36h11 10Hz 6-DOF (issue #191)' (#197) from sl-perception/issue-191-apriltag into main 2026-03-02 11:08:44 -05:00
d6d7e7b75a feat(jetson): AprilTag landmark detector — DICT_APRILTAG_36h11 (issue #191)
saltybot_apriltag_msgs (ament_cmake):
• DetectedTag.msg     — tag_id, family, decision_margin, corners[8],
                        center, 6-DOF pose + pose_valid flag
• DetectedTagArray.msg — DetectedTag[], count

saltybot_apriltag (ament_python, single node):
• _aruco_utils.py    — ROS2-free: ArucoBackend (cv2 4.6/4.7+ API shim),
                        rvec_tvec_to_pose (Rodrigues → Shepperd quaternion)
• apriltag_node.py   — 10 Hz timer; subscribes image + latched camera_info;
                        cv2.solvePnP IPPE_SQUARE for 6-DOF pose when K
                        available; publishes /saltybot/apriltags
• test/test_apriltag.py — 11 unit tests (11/11 pass, no ROS2 needed):
                          pose math + rendered tag detection + multi-tag

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 11:07:35 -05:00
26 changed files with 1470 additions and 0 deletions

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_apriltag</name>
<version>0.1.0</version>
<description>
AprilTag (DICT_APRILTAG_36h11) landmark detector for SaltyBot CSI cameras.
Detects tags in incoming frames and publishes DetectedTagArray at 10 Hz.
</description>
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>cv_bridge</depend>
<depend>saltybot_apriltag_msgs</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,85 @@
"""
_aruco_utils.py Pure-Python helpers for AprilTag detection (no ROS2 deps).
Importable without rclpy for unit testing.
"""
from __future__ import annotations
import math
import numpy as np
import cv2
_FAMILY = 'DICT_APRILTAG_36h11'
def rvec_tvec_to_pose(rvec: np.ndarray, tvec: np.ndarray):
"""
Convert OpenCV rotation + translation vectors to geometry_msgs/Pose.
Returns a plain namespace object to avoid importing geometry_msgs here.
"""
R, _ = cv2.Rodrigues(rvec)
m = R
t = m[0, 0] + m[1, 1] + m[2, 2]
if t > 0:
s = 0.5 / math.sqrt(t + 1.0)
qw = 0.25 / s
qx = (m[2, 1] - m[1, 2]) * s
qy = (m[0, 2] - m[2, 0]) * s
qz = (m[1, 0] - m[0, 1]) * s
elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]:
s = 2.0 * math.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2])
qw = (m[2, 1] - m[1, 2]) / s
qx = 0.25 * s
qy = (m[0, 1] + m[1, 0]) / s
qz = (m[0, 2] + m[2, 0]) / s
elif m[1, 1] > m[2, 2]:
s = 2.0 * math.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2])
qw = (m[0, 2] - m[2, 0]) / s
qx = (m[0, 1] + m[1, 0]) / s
qy = 0.25 * s
qz = (m[1, 2] + m[2, 1]) / s
else:
s = 2.0 * math.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1])
qw = (m[1, 0] - m[0, 1]) / s
qx = (m[0, 2] + m[2, 0]) / s
qy = (m[1, 2] + m[2, 1]) / s
qz = 0.25 * s
return (
float(tvec.flat[0]), float(tvec.flat[1]), float(tvec.flat[2]),
float(qx), float(qy), float(qz), float(qw),
)
class ArucoBackend:
"""Wraps cv2.aruco, handling the 4.6 vs 4.7+ API difference."""
def __init__(self, logger=None):
self._log = logger
dictionary = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_APRILTAG_36h11)
params = cv2.aruco.DetectorParameters()
try:
self._detector = cv2.aruco.ArucoDetector(dictionary, params)
self._new_api = True
if logger:
logger.info('[apriltag] ArucoDetector (OpenCV ≥ 4.7)')
except AttributeError:
self._dict = dictionary
self._params = params
self._new_api = False
if logger:
logger.info('[apriltag] legacy aruco.detectMarkers (OpenCV 4.6)')
def detect(self, gray: np.ndarray):
"""Returns (corners, ids); ids may be None."""
if self._new_api:
corners, ids, _ = self._detector.detectMarkers(gray)
else:
corners, ids, _ = cv2.aruco.detectMarkers(
gray, self._dict, parameters=self._params
)
return corners, ids

View File

@ -0,0 +1,212 @@
"""
apriltag_node.py AprilTag (DICT_APRILTAG_36h11) landmark detector.
Detects AprilTags in CSI camera frames and publishes results at 10 Hz.
When camera_info is available, estimates full 6-DOF pose per tag via
cv2.solvePnP (SOLVEPNP_IPPE_SQUARE).
Supports OpenCV 4.6 (legacy detectMarkers) and 4.7+ (ArucoDetector).
Subscribes:
<image_topic> sensor_msgs/Image default /camera/color/image_raw
<image_topic_dir>/camera_info sensor_msgs/CameraInfo latched intrinsics
Publishes:
/saltybot/apriltags saltybot_apriltag_msgs/DetectedTagArray 10 Hz
Parameters:
image_topic str '/camera/color/image_raw'
publish_hz float 10.0
tag_size_m float 0.10 physical tag side in metres (for pose estimation)
min_margin float 10.0 minimum corner-span pixels to keep a detection
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
import numpy as np
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import Point, Pose
try:
from saltybot_apriltag_msgs.msg import DetectedTag, DetectedTagArray
_MSGS_OK = True
except ImportError:
_MSGS_OK = False
from ._aruco_utils import ArucoBackend, rvec_tvec_to_pose
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
_LATCHED_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
_FAMILY = 'DICT_APRILTAG_36h11'
class AprilTagNode(Node):
def __init__(self):
super().__init__('apriltag_detector')
self.declare_parameter('image_topic', '/camera/color/image_raw')
self.declare_parameter('publish_hz', 10.0)
self.declare_parameter('tag_size_m', 0.10)
self.declare_parameter('min_margin', 10.0)
img_topic = self.get_parameter('image_topic').value
pub_hz = self.get_parameter('publish_hz').value
self._tag_size = self.get_parameter('tag_size_m').value
self._min_margin = self.get_parameter('min_margin').value
self._bridge = CvBridge()
self._backend = ArucoBackend(self.get_logger())
self._K: np.ndarray | None = None
self._dist: np.ndarray | None = None
self._latest_img: Image | None = None
# Derive camera_info topic from image topic namespace
info_topic = img_topic.rsplit('/', 1)[0] + '/camera_info'
self.create_subscription(Image, img_topic, self._on_image, _SENSOR_QOS)
self.create_subscription(CameraInfo, info_topic, self._on_camera_info, _LATCHED_QOS)
if _MSGS_OK:
self._pub = self.create_publisher(
DetectedTagArray, '/saltybot/apriltags', 10
)
else:
self._pub = None
self.get_logger().warning(
'[apriltag] saltybot_apriltag_msgs not built — '
'DetectedTagArray not published'
)
self.create_timer(1.0 / pub_hz, self._tick)
self.get_logger().info(
f'apriltag_detector ready — dict={_FAMILY} '
f'tag_size={self._tag_size}m topic={img_topic} hz={pub_hz}'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _on_image(self, msg: Image) -> None:
self._latest_img = msg
def _on_camera_info(self, msg: CameraInfo) -> None:
self._K = np.array(msg.k, dtype=np.float64).reshape(3, 3)
self._dist = np.array(msg.d, dtype=np.float64)
# ── 10 Hz tick ────────────────────────────────────────────────────────────
def _tick(self) -> None:
if self._latest_img is None or self._pub is None:
return
try:
gray = self._bridge.imgmsg_to_cv2(
self._latest_img, desired_encoding='mono8'
)
except Exception as exc:
self.get_logger().warning(f'[apriltag] decode error: {exc}')
return
corners, ids = self._backend.detect(gray)
stamp = self.get_clock().now().to_msg()
arr = self._build_array(corners, ids, self._latest_img.header, stamp)
self._pub.publish(arr)
# ── Message builder ───────────────────────────────────────────────────────
def _build_array(self, corners, ids, src_header, stamp) -> 'DetectedTagArray':
arr = DetectedTagArray()
arr.header.stamp = stamp
arr.header.frame_id = src_header.frame_id
if ids is None or len(ids) == 0:
arr.count = 0
return arr
half = self._tag_size / 2.0
obj_pts = np.array([
[-half, half, 0],
[ half, half, 0],
[ half, -half, 0],
[-half, -half, 0],
], dtype=np.float64)
for corner_set, tag_id in zip(corners, ids.flatten()):
pts = corner_set.reshape(4, 2)
margin = float(np.mean([
np.linalg.norm(pts[i] - pts[(i + 1) % 4]) for i in range(4)
]))
if margin < self._min_margin:
continue
cx = float(np.mean(pts[:, 0]))
cy = float(np.mean(pts[:, 1]))
tag = DetectedTag()
tag.header = arr.header
tag.tag_id = int(tag_id)
tag.family = _FAMILY
tag.decision_margin = margin
tag.corners = pts.flatten().tolist()
tag.center = Point(x=cx, y=cy, z=0.0)
if self._K is not None and self._dist is not None:
ok, rvec, tvec = cv2.solvePnP(
obj_pts, pts.astype(np.float64),
self._K, self._dist,
flags=cv2.SOLVEPNP_IPPE_SQUARE,
)
if ok:
tx, ty, tz, qx, qy, qz, qw = rvec_tvec_to_pose(rvec, tvec)
tag.pose.position.x = tx
tag.pose.position.y = ty
tag.pose.position.z = tz
tag.pose.orientation.x = qx
tag.pose.orientation.y = qy
tag.pose.orientation.z = qz
tag.pose.orientation.w = qw
tag.pose_valid = True
else:
tag.pose_valid = False
else:
tag.pose_valid = False
arr.tags.append(tag)
arr.count = len(arr.tags)
return arr
def main(args=None):
rclpy.init(args=args)
node = AprilTagNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_apriltag
[install]
install_scripts=$base/lib/saltybot_apriltag

View File

@ -0,0 +1,29 @@
from setuptools import setup, find_packages
from glob import glob
package_name = 'saltybot_apriltag'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch',
glob('launch/*.launch.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='SaltyLab',
maintainer_email='robot@saltylab.local',
description='AprilTag DICT_APRILTAG_36h11 detector — 10 Hz, 6-DOF pose',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'apriltag_detector = saltybot_apriltag.apriltag_node:main',
],
},
)

View File

@ -0,0 +1,142 @@
"""
test_apriltag.py Unit tests for AprilTag detector helpers.
Runs without ROS2 (no rclpy). Tests cover:
- _rvec_tvec_to_pose: rotation/translation Pose quaternion
- _ArucoBackend: detection on a synthetic rendered tag image
- AprilTagNode._build_array: message construction from mock detections
"""
from __future__ import annotations
import math
import sys
import os
import numpy as np
import pytest
import cv2
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_apriltag._aruco_utils import rvec_tvec_to_pose as _rvec_tvec_to_pose, ArucoBackend as _ArucoBackend
# ── _rvec_tvec_to_pose ────────────────────────────────────────────────────────
class TestRvecTvecToPose:
# _rvec_tvec_to_pose returns (tx, ty, tz, qx, qy, qz, qw)
def test_identity_rotation_gives_unit_quaternion(self):
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(np.zeros(3), np.zeros(3))
assert qw == pytest.approx(1.0, abs=1e-6)
assert qx == pytest.approx(0.0, abs=1e-6)
assert qy == pytest.approx(0.0, abs=1e-6)
assert qz == pytest.approx(0.0, abs=1e-6)
def test_translation_passed_through(self):
tvec = np.array([1.5, -0.3, 2.0])
tx, ty, tz, *_ = _rvec_tvec_to_pose(np.zeros(3), tvec)
assert tx == pytest.approx(1.5)
assert ty == pytest.approx(-0.3)
assert tz == pytest.approx(2.0)
def test_90deg_yaw_quaternion(self):
# 90° rotation around Z axis → qw=cos(45°), qz=sin(45°)
rvec = np.array([0.0, 0.0, math.pi / 2])
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, np.zeros(3))
assert qw == pytest.approx(math.cos(math.pi / 4), abs=1e-5)
assert qz == pytest.approx(math.sin(math.pi / 4), abs=1e-5)
assert qx == pytest.approx(0.0, abs=1e-5)
assert qy == pytest.approx(0.0, abs=1e-5)
def test_quaternion_is_unit_norm(self):
rvec = np.array([0.3, 0.7, -1.1])
tvec = np.array([0.5, 0.5, 0.5])
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, tvec)
norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
assert norm == pytest.approx(1.0, abs=1e-5)
def test_180deg_x_rotation(self):
rvec = np.array([math.pi, 0.0, 0.0])
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, np.zeros(3))
norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
assert norm == pytest.approx(1.0, abs=1e-5)
# ── _ArucoBackend ─────────────────────────────────────────────────────────────
class _FakeLogger:
def info(self, msg): pass
def warning(self, msg): pass
def debug(self, msg): pass
def _render_apriltag_36h11(tag_id: int = 0, size: int = 200) -> np.ndarray:
"""
Render an AprilTag DICT_APRILTAG_36h11 tag into a greyscale image.
Returns an image large enough for reliable detection.
"""
dictionary = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_APRILTAG_36h11)
# Draw the marker (OpenCV ≥ 4.6)
tag_img = np.zeros((size, size), dtype=np.uint8)
try:
cv2.aruco.generateImageMarker(dictionary, tag_id, size, tag_img, 1)
except AttributeError:
# Older API
tag_img = cv2.aruco.drawMarker(dictionary, tag_id, size)
# Embed in white canvas with border
canvas = np.ones((size + 80, size + 80), dtype=np.uint8) * 255
canvas[40:40 + size, 40:40 + size] = tag_img
return canvas
class TestArucoBackend:
def test_backend_initialises(self):
backend = _ArucoBackend(_FakeLogger())
assert backend is not None
def test_detect_returns_tuple(self):
backend = _ArucoBackend(_FakeLogger())
gray = np.ones((480, 640), dtype=np.uint8) * 200
corners, ids = backend.detect(gray)
assert isinstance(corners, (list, tuple))
def test_blank_image_no_detections(self):
backend = _ArucoBackend(_FakeLogger())
gray = np.ones((480, 640), dtype=np.uint8) * 200
_, ids = backend.detect(gray)
assert ids is None or len(ids) == 0
def test_rendered_tag_detected(self):
backend = _ArucoBackend(_FakeLogger())
tag_img = _render_apriltag_36h11(tag_id=5, size=200)
corners, ids = backend.detect(tag_img)
assert ids is not None and len(ids) == 1
assert int(ids.flatten()[0]) == 5
def test_rendered_tag_corners_shape(self):
backend = _ArucoBackend(_FakeLogger())
tag_img = _render_apriltag_36h11(tag_id=0, size=200)
corners, ids = backend.detect(tag_img)
assert ids is not None
assert len(corners) == 1
assert corners[0].reshape(4, 2).shape == (4, 2)
def test_multiple_tags_detected(self):
"""Place two tags side-by-side and expect both found."""
backend = _ArucoBackend(_FakeLogger())
t0 = _render_apriltag_36h11(tag_id=0, size=150)
t1 = _render_apriltag_36h11(tag_id=1, size=150)
canvas = np.ones((t0.shape[0], t0.shape[1] * 2 + 20), dtype=np.uint8) * 255
canvas[:, :t0.shape[1]] = t0
canvas[:, t0.shape[1] + 20:] = t1
_, ids = backend.detect(canvas)
assert ids is not None and len(ids) == 2
assert set(ids.flatten().tolist()) == {0, 1}
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@ -0,0 +1,16 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_apriltag_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/DetectedTag.msg"
"msg/DetectedTagArray.msg"
DEPENDENCIES std_msgs geometry_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,18 @@
# DetectedTag — single AprilTag detection result.
std_msgs/Header header
int32 tag_id # numeric ID encoded in the tag
string family # e.g. 'DICT_APRILTAG_36h11'
float32 decision_margin # detector confidence (higher = more certain)
# Corner pixel coordinates [x0,y0, x1,y1, x2,y2, x3,y3] (image frame, CW from TL)
float32[8] corners
# Tag centre in image pixels
geometry_msgs/Point center
# 6-DOF pose in camera optical frame (only valid when pose_valid=true)
geometry_msgs/Pose pose
bool pose_valid # false if camera_info not yet received

View File

@ -0,0 +1,7 @@
# DetectedTagArray — all AprilTags found in one frame.
# Published at ~10 Hz on /saltybot/apriltags.
std_msgs/Header header
saltybot_apriltag_msgs/DetectedTag[] tags
int32 count

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_apriltag_msgs</name>
<version>0.1.0</version>
<description>Message types for AprilTag landmark detection.</description>
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,60 @@
"""
_camera_state.py Pure-Python camera health state (no ROS2 deps).
Importable without rclpy for unit testing.
"""
from __future__ import annotations
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class CameraState:
topic: str
v4l2_dev: Optional[str] # None for D435i (USB — no v4l2 reset)
last_frame_t: Optional[float] = None # monotonic seconds
total_frames: int = 0
_ts_window: deque = field(default_factory=lambda: deque(maxlen=200))
_last_reset_t: float = 0.0 # for reset cooldown
def on_frame(self) -> None:
now = time.monotonic()
self.last_frame_t = now
self.total_frames += 1
self._ts_window.append(now)
@property
def age_s(self) -> float:
if self.last_frame_t is None:
return float('inf')
return time.monotonic() - self.last_frame_t
def fps(self, window_s: float = 5.0) -> float:
now = time.monotonic()
recent = [t for t in self._ts_window if now - t <= window_s]
if len(recent) < 2:
return 0.0
return (len(recent) - 1) / (recent[-1] - recent[0])
def status(self, warn_thr: float, error_thr: float) -> str:
age = self.age_s
if age == float('inf'):
return 'never'
if age > error_thr:
return 'error'
if age > warn_thr:
return 'warn'
return 'ok'
def should_reset(self, cooldown_s: float = 30.0) -> bool:
return (
self.v4l2_dev is not None
and time.monotonic() - self._last_reset_t > cooldown_s
)
def mark_reset(self) -> None:
self._last_reset_t = time.monotonic()

View File

@ -0,0 +1,210 @@
"""
camera_health_node.py Camera watchdog for all SaltyBot image streams.
Monitors frame arrival timestamps for each configured camera topic.
On silence:
> warn_timeout_s (default 2s) WARNING log
> error_timeout_s (default 10s) ERROR log + v4l2 reset attempt (CSI only)
Publishes /saltybot/camera_health as std_msgs/String JSON at 1 Hz:
{
"ts": "2026-03-02T15:00:00Z",
"cameras": {
"d435i_color": {
"topic": "/camera/color/image_raw",
"status": "ok" | "warn" | "error" | "never",
"age_s": 1.23,
"fps": 29.8,
"total_frames": 1234
},
...
},
"healthy": 5,
"degraded": 1,
"total": 6
}
CSI camera map (topic /dev/videoN) is used for v4l2 reset.
D435i streams are USB no v4l2 reset (RealSense SDK manages recovery).
Parameters:
warn_timeout_s float 2.0
error_timeout_s float 10.0
publish_hz float 1.0
fps_window_s float 5.0 rolling window for FPS estimate
"""
from __future__ import annotations
import json
import subprocess
from datetime import datetime, timezone
from typing import Dict
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import Image
from std_msgs.msg import String
from ._camera_state import CameraState
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=3,
)
# Default cameras: name → (topic, csi_device_or_None)
_DEFAULT_CAMERAS: Dict[str, tuple] = {
'd435i_color': ('/camera/color/image_raw', None),
'd435i_depth': ('/camera/depth/image_rect_raw', None),
'csi_front': ('/camera/front/image_raw', '/dev/video0'),
'csi_right': ('/camera/right/image_raw', '/dev/video2'),
'csi_rear': ('/camera/rear/image_raw', '/dev/video4'),
'csi_left': ('/camera/left/image_raw', '/dev/video6'),
}
class CameraHealthNode(Node):
def __init__(self):
super().__init__('camera_health_monitor')
self.declare_parameter('warn_timeout_s', 2.0)
self.declare_parameter('error_timeout_s', 10.0)
self.declare_parameter('publish_hz', 1.0)
self.declare_parameter('fps_window_s', 5.0)
self._warn_thr = self.get_parameter('warn_timeout_s').value
self._error_thr = self.get_parameter('error_timeout_s').value
self._fps_win = self.get_parameter('fps_window_s').value
# Build camera states
self._cameras: Dict[str, CameraState] = {
name: CameraState(topic=topic, v4l2_dev=dev)
for name, (topic, dev) in _DEFAULT_CAMERAS.items()
}
# Subscribe to each topic
for name, cam in self._cameras.items():
self.create_subscription(
Image, cam.topic,
self._make_cb(name),
_SENSOR_QOS,
)
self._pub = self.create_publisher(String, '/saltybot/camera_health', 10)
self.create_timer(
1.0 / self.get_parameter('publish_hz').value, self._tick
)
self.get_logger().info(
f'camera_health_monitor ready — watching {len(self._cameras)} cameras '
f'warn={self._warn_thr}s error={self._error_thr}s'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _make_cb(self, name: str):
def _cb(msg: Image) -> None:
self._cameras[name].on_frame()
return _cb
# ── 1 Hz watchdog tick ────────────────────────────────────────────────────
def _tick(self) -> None:
healthy = degraded = 0
for name, cam in self._cameras.items():
status = cam.status(self._warn_thr, self._error_thr)
if status == 'ok':
healthy += 1
elif status in ('warn', 'error', 'never'):
degraded += 1
age = cam.age_s
age_str = f'{age:.1f}' if age != float('inf') else ''
if status == 'warn':
self.get_logger().warning(
f'[cam_health] {name} ({cam.topic}): '
f'no frame for {age_str}s'
)
elif status in ('error', 'never'):
self.get_logger().error(
f'[cam_health] {name} ({cam.topic}): '
f'no frame for {age_str}s — camera may be offline'
)
if status == 'error' and cam.should_reset():
self._v4l2_reset(name, cam)
payload = self._build_json(healthy, degraded)
msg = String()
msg.data = payload
self._pub.publish(msg)
# ── v4l2 reset (CSI cameras only) ─────────────────────────────────────────
def _v4l2_reset(self, name: str, cam: CameraState) -> None:
dev = cam.v4l2_dev
self.get_logger().warning(
f'[cam_health] attempting v4l2 reset for {name} on {dev}'
)
cam.mark_reset()
try:
# Toggle streaming off then on to nudge the driver
subprocess.run(
['v4l2-ctl', '-d', dev, '--stream-off'],
capture_output=True, timeout=2.0
)
subprocess.run(
['v4l2-ctl', '-d', dev, '--stream-on'],
capture_output=True, timeout=2.0
)
self.get_logger().info(
f'[cam_health] v4l2 reset sent to {dev} for {name}'
)
except (subprocess.TimeoutExpired, FileNotFoundError) as exc:
self.get_logger().warning(
f'[cam_health] v4l2 reset failed for {name}: {exc}'
)
# ── JSON builder ──────────────────────────────────────────────────────────
def _build_json(self, healthy: int, degraded: int) -> str:
cameras_out = {}
for name, cam in self._cameras.items():
age = cam.age_s
cameras_out[name] = {
'topic': cam.topic,
'status': cam.status(self._warn_thr, self._error_thr),
'age_s': round(age, 2) if age != float('inf') else None,
'fps': round(cam.fps(self._fps_win), 1),
'total_frames': cam.total_frames,
}
return json.dumps({
'ts': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
'cameras': cameras_out,
'healthy': healthy,
'degraded': degraded,
'total': len(self._cameras),
})
def main(args=None):
rclpy.init(args=args)
node = CameraHealthNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -29,6 +29,7 @@ setup(
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main', 'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,108 @@
"""
test_camera_health.py Unit tests for CameraState (no ROS2 required).
"""
import sys
import os
import time
import json
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._camera_state import CameraState
class TestCameraState:
def test_initial_status_never(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.status(2.0, 10.0) == 'never'
def test_age_inf_before_first_frame(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.age_s == float('inf')
def test_on_frame_sets_age_near_zero(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
assert cs.age_s < 0.1
def test_on_frame_increments_count(self):
cs = CameraState(topic='/test', v4l2_dev=None)
for _ in range(5):
cs.on_frame()
assert cs.total_frames == 5
def test_status_ok_after_fresh_frame(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
assert cs.status(2.0, 10.0) == 'ok'
def test_status_warn_after_2s(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 3.0 # fake 3 seconds ago
assert cs.status(2.0, 10.0) == 'warn'
def test_status_error_after_10s(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 11.0
assert cs.status(2.0, 10.0) == 'error'
def test_fps_zero_before_frames(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.fps(5.0) == pytest.approx(0.0)
def test_fps_estimate(self):
cs = CameraState(topic='/test', v4l2_dev=None)
# Simulate 10 frames over 1 second
base = time.monotonic()
for i in range(11):
cs._ts_window.append(base + i * 0.1)
cs.last_frame_t = base + 1.0
assert cs.fps(5.0) == pytest.approx(10.0, abs=0.5)
def test_should_reset_false_for_d435i(self):
cs = CameraState(topic='/camera/color/image_raw', v4l2_dev=None)
assert not cs.should_reset()
def test_should_reset_true_for_csi_first_time(self):
cs = CameraState(topic='/camera/front/image_raw', v4l2_dev='/dev/video0')
assert cs.should_reset(cooldown_s=30.0)
def test_should_reset_false_within_cooldown(self):
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
cs.mark_reset()
assert not cs.should_reset(cooldown_s=30.0)
def test_should_reset_true_after_cooldown(self):
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
cs.mark_reset()
cs._last_reset_t -= 31.0 # fake cooldown elapsed
assert cs.should_reset(cooldown_s=30.0)
def test_custom_thresholds(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 1.5
assert cs.status(1.0, 5.0) == 'warn'
assert cs.status(2.0, 5.0) == 'ok'
def test_fps_ignores_old_frames(self):
cs = CameraState(topic='/test', v4l2_dev=None)
old = time.monotonic() - 60.0
for _ in range(50):
cs._ts_window.append(old)
# Recent frames
now = time.monotonic()
for i in range(6):
cs._ts_window.append(now + i * 0.1)
cs.last_frame_t = now + 0.5
assert cs.fps(5.0) == pytest.approx(10.0, abs=1.0)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@ -0,0 +1,6 @@
# Speed limiter node configuration
speed_limiter:
ros__parameters:
# Publishing frequency in Hz
frequency: 50 # 50 Hz = 20ms cycle

View File

@ -0,0 +1,36 @@
"""Launch file for speed_limiter_node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for speed limiter node."""
# Package directory
pkg_dir = get_package_share_directory("saltybot_speed_limiter")
# Parameters
config_file = os.path.join(pkg_dir, "config", "limiter_params.yaml")
# Declare launch arguments
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
# Speed limiter node
Node(
package="saltybot_speed_limiter",
executable="speed_limiter_node",
name="speed_limiter",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_speed_limiter</name>
<version>0.1.0</version>
<description>
Speed limiter node for SaltyBot: enforces safety constraints from terrain type,
battery level, and emergency system. Computes minimum scale factor from multiple
sources and applies to cmd_vel with JSON telemetry. Runs at 50Hz.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>saltybot_emergency_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""Speed limiter node for SaltyBot.
Enforces safety constraints from terrain type, battery level, and emergency system.
Computes minimum scale factor and applies to cmd_vel at 50Hz (20ms).
Published topics:
/saltybot/cmd_vel_limited (geometry_msgs/Twist)
/saltybot/speed_limit_info (std_msgs/String) - JSON telemetry
Subscribed topics:
/cmd_vel (geometry_msgs/Twist)
/saltybot/terrain_speed_scale (std_msgs/Float32) - 0.0 to 1.0
/saltybot/speed_limit (std_msgs/Float32) - 0.0 to 1.0, from battery level
/saltybot/emergency (saltybot_emergency_msgs/EmergencyEvent)
"""
import json
import rclpy
from rclpy.node import Node
from rclpy.timer import Timer
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32, String
from saltybot_emergency_msgs.msg import EmergencyEvent
class SpeedLimiterNode(Node):
"""ROS2 node that enforces speed limits from multiple constraints."""
def __init__(self):
super().__init__("speed_limiter")
# Parameters
self.declare_parameter("frequency", 50) # Hz
frequency = self.get_parameter("frequency").value
# Last received values (with defaults)
self.terrain_scale = 1.0
self.battery_limit = 1.0
self.emergency_scale = 1.0
self.last_cmd_vel = None
# Subscriptions
self.sub_cmd_vel = self.create_subscription(
Twist, "/cmd_vel", self._on_cmd_vel, 10
)
self.sub_terrain = self.create_subscription(
Float32, "/saltybot/terrain_speed_scale", self._on_terrain_scale, 10
)
self.sub_battery = self.create_subscription(
Float32, "/saltybot/speed_limit", self._on_battery_limit, 10
)
self.sub_emergency = self.create_subscription(
EmergencyEvent, "/saltybot/emergency", self._on_emergency, 10
)
# Publications
self.pub_cmd_vel_limited = self.create_publisher(
Twist, "/saltybot/cmd_vel_limited", 10
)
self.pub_info = self.create_publisher(
String, "/saltybot/speed_limit_info", 10
)
# Timer for periodic publishing at specified frequency
period = 1.0 / frequency
self.timer: Timer = self.create_timer(period, self._timer_callback)
self.get_logger().info(
f"Speed limiter initialized at {frequency}Hz. "
"Awaiting terrain_speed_scale, speed_limit, and emergency signals."
)
def _on_cmd_vel(self, msg: Twist) -> None:
"""Cache the last received cmd_vel."""
self.last_cmd_vel = msg
def _on_terrain_scale(self, msg: Float32) -> None:
"""Update terrain speed scale (0.0 to 1.0)."""
self.terrain_scale = max(0.0, min(1.0, msg.data))
def _on_battery_limit(self, msg: Float32) -> None:
"""Update battery-driven speed limit (0.0 to 1.0)."""
self.battery_limit = max(0.0, min(1.0, msg.data))
def _on_emergency(self, msg: EmergencyEvent) -> None:
"""Update emergency-driven speed scale based on system state."""
# Map emergency state to speed reduction factor
if msg.state == "ESCALATED":
self.emergency_scale = 0.0 # Complete stop
elif msg.state == "STOPPING":
self.emergency_scale = 0.1 # Minimal movement for safe deceleration
elif msg.state == "RECOVERING":
self.emergency_scale = 0.3 # Cautious recovery
else: # NOMINAL
self.emergency_scale = 1.0 # Full speed allowed
def _timer_callback(self) -> None:
"""Process and publish speed-limited cmd_vel at 50Hz."""
if self.last_cmd_vel is None:
return
# Compute minimum scale factor from all constraints
scale_factor = min(self.terrain_scale, self.battery_limit, self.emergency_scale)
# Apply scale factor to cmd_vel
limited_cmd_vel = self._scale_twist(self.last_cmd_vel, scale_factor)
# Publish limited command
self.pub_cmd_vel_limited.publish(limited_cmd_vel)
# Publish telemetry
info = {
"timestamp": self.get_clock().now().nanoseconds / 1e9,
"scale_factor": float(scale_factor),
"terrain_scale": float(self.terrain_scale),
"battery_limit": float(self.battery_limit),
"emergency_scale": float(self.emergency_scale),
"linear_x": float(limited_cmd_vel.linear.x),
"linear_y": float(limited_cmd_vel.linear.y),
"linear_z": float(limited_cmd_vel.linear.z),
"angular_x": float(limited_cmd_vel.angular.x),
"angular_y": float(limited_cmd_vel.angular.y),
"angular_z": float(limited_cmd_vel.angular.z),
}
info_msg = String(data=json.dumps(info))
self.pub_info.publish(info_msg)
@staticmethod
def _scale_twist(msg: Twist, scale: float) -> Twist:
"""Scale all velocities in a Twist message by a factor."""
scaled = Twist()
scaled.linear.x = msg.linear.x * scale
scaled.linear.y = msg.linear.y * scale
scaled.linear.z = msg.linear.z * scale
scaled.angular.x = msg.angular.x * scale
scaled.angular.y = msg.angular.y * scale
scaled.angular.z = msg.angular.z * scale
return scaled
def main(args=None):
rclpy.init(args=args)
node = SpeedLimiterNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_speed_limiter
[egg_info]
tag_date = 0

View File

@ -0,0 +1,29 @@
from setuptools import setup
package_name = "saltybot_speed_limiter"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/speed_limiter.launch.py"]),
(f"share/{package_name}/config", ["config/limiter_params.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description=(
"Speed limiter: enforces terrain, battery, and emergency constraints on cmd_vel"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"speed_limiter_node = saltybot_speed_limiter.speed_limiter_node:main",
],
},
)

View File

@ -0,0 +1,269 @@
"""Unit tests for speed_limiter_node."""
import pytest
import json
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32, String
from saltybot_emergency_msgs.msg import EmergencyEvent
import rclpy
from rclpy.executors import SingleThreadedExecutor
# Import the node under test
from saltybot_speed_limiter.speed_limiter_node import SpeedLimiterNode
@pytest.fixture
def rclpy_fixture():
"""Initialize and cleanup rclpy."""
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
"""Create a speed limiter node instance."""
node = SpeedLimiterNode()
yield node
node.destroy_node()
class TestSpeedLimiterNode:
"""Test suite for SpeedLimiterNode."""
def test_node_initialization(self, node):
"""Test that node initializes with correct defaults."""
assert node.terrain_scale == 1.0
assert node.battery_limit == 1.0
assert node.emergency_scale == 1.0
assert node.last_cmd_vel is None
def test_terrain_scale_subscription(self, node):
"""Test terrain scale value update."""
msg = Float32(data=0.5)
node._on_terrain_scale(msg)
assert node.terrain_scale == 0.5
def test_terrain_scale_clamping(self, node):
"""Test that terrain scale is clamped to [0, 1]."""
node._on_terrain_scale(Float32(data=1.5))
assert node.terrain_scale == 1.0
node._on_terrain_scale(Float32(data=-0.5))
assert node.terrain_scale == 0.0
def test_battery_limit_subscription(self, node):
"""Test battery limit value update."""
msg = Float32(data=0.7)
node._on_battery_limit(msg)
assert node.battery_limit == 0.7
def test_battery_limit_clamping(self, node):
"""Test that battery limit is clamped to [0, 1]."""
node._on_battery_limit(Float32(data=2.0))
assert node.battery_limit == 1.0
node._on_battery_limit(Float32(data=-1.0))
assert node.battery_limit == 0.0
def test_emergency_nominal_state(self, node):
"""Test emergency scale in NOMINAL state."""
msg = EmergencyEvent()
msg.state = "NOMINAL"
node._on_emergency(msg)
assert node.emergency_scale == 1.0
def test_emergency_recovering_state(self, node):
"""Test emergency scale in RECOVERING state."""
msg = EmergencyEvent()
msg.state = "RECOVERING"
node._on_emergency(msg)
assert node.emergency_scale == 0.3
def test_emergency_stopping_state(self, node):
"""Test emergency scale in STOPPING state."""
msg = EmergencyEvent()
msg.state = "STOPPING"
node._on_emergency(msg)
assert node.emergency_scale == 0.1
def test_emergency_escalated_state(self, node):
"""Test emergency scale in ESCALATED state (complete stop)."""
msg = EmergencyEvent()
msg.state = "ESCALATED"
node._on_emergency(msg)
assert node.emergency_scale == 0.0
def test_cmd_vel_caching(self, node):
"""Test that cmd_vel messages are cached."""
msg = Twist()
msg.linear.x = 1.0
node._on_cmd_vel(msg)
assert node.last_cmd_vel is not None
assert node.last_cmd_vel.linear.x == 1.0
def test_scale_twist_zero(self, node):
"""Test scaling a twist by zero."""
msg = Twist()
msg.linear.x = 1.0
msg.angular.z = 0.5
result = node._scale_twist(msg, 0.0)
assert result.linear.x == 0.0
assert result.angular.z == 0.0
def test_scale_twist_half(self, node):
"""Test scaling a twist by 0.5."""
msg = Twist()
msg.linear.x = 2.0
msg.linear.y = 4.0
msg.angular.z = 1.0
result = node._scale_twist(msg, 0.5)
assert abs(result.linear.x - 1.0) < 1e-6
assert abs(result.linear.y - 2.0) < 1e-6
assert abs(result.angular.z - 0.5) < 1e-6
def test_scale_twist_full(self, node):
"""Test scaling a twist by 1.0 (no change)."""
msg = Twist()
msg.linear.x = 1.5
msg.angular.z = 0.8
result = node._scale_twist(msg, 1.0)
assert abs(result.linear.x - 1.5) < 1e-6
assert abs(result.angular.z - 0.8) < 1e-6
def test_scale_twist_all_components(self, node):
"""Test that all components of twist are scaled."""
msg = Twist()
msg.linear.x = 1.0
msg.linear.y = 2.0
msg.linear.z = 3.0
msg.angular.x = 0.1
msg.angular.y = 0.2
msg.angular.z = 0.3
result = node._scale_twist(msg, 0.5)
assert abs(result.linear.x - 0.5) < 1e-6
assert abs(result.linear.y - 1.0) < 1e-6
assert abs(result.linear.z - 1.5) < 1e-6
assert abs(result.angular.x - 0.05) < 1e-6
assert abs(result.angular.y - 0.1) < 1e-6
assert abs(result.angular.z - 0.15) < 1e-6
def test_min_scale_factor_all_one(self, node):
"""Test min-of-three logic with all scales = 1.0."""
node.terrain_scale = 1.0
node.battery_limit = 1.0
node.emergency_scale = 1.0
# This would be tested via _timer_callback, but we'll test the logic directly
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
assert scale == 1.0
def test_min_scale_factor_terrain_limiting(self, node):
"""Test that terrain scale limits when it's the minimum."""
node.terrain_scale = 0.3
node.battery_limit = 0.8
node.emergency_scale = 0.9
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
assert scale == 0.3
def test_min_scale_factor_battery_limiting(self, node):
"""Test that battery limit constrains when it's the minimum."""
node.terrain_scale = 0.9
node.battery_limit = 0.2
node.emergency_scale = 0.8
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
assert scale == 0.2
def test_min_scale_factor_emergency_limiting(self, node):
"""Test that emergency scale limits when it's the minimum."""
node.terrain_scale = 0.9
node.battery_limit = 0.8
node.emergency_scale = 0.0
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
assert scale == 0.0
def test_timer_callback_no_cmd_vel(self, node):
"""Test timer callback with no cmd_vel received."""
# Should not crash or publish if last_cmd_vel is None
node.last_cmd_vel = None
node._timer_callback() # Should return early
def test_info_json_structure(self, node):
"""Test that published info has correct JSON structure."""
node.last_cmd_vel = Twist()
node.last_cmd_vel.linear.x = 1.0
node.terrain_scale = 0.5
node.battery_limit = 0.8
node.emergency_scale = 1.0
# Manually create info dict as the node would
scale_factor = min(0.5, 0.8, 1.0)
limited = node._scale_twist(node.last_cmd_vel, scale_factor)
info = {
"timestamp": node.get_clock().now().nanoseconds / 1e9,
"scale_factor": float(scale_factor),
"terrain_scale": float(0.5),
"battery_limit": float(0.8),
"emergency_scale": float(1.0),
"linear_x": float(limited.linear.x),
"linear_y": float(limited.linear.y),
"linear_z": float(limited.linear.z),
"angular_x": float(limited.angular.x),
"angular_y": float(limited.angular.y),
"angular_z": float(limited.angular.z),
}
# Verify it can be serialized/deserialized
json_str = json.dumps(info)
parsed = json.loads(json_str)
assert parsed["scale_factor"] == 0.5
assert parsed["terrain_scale"] == 0.5
assert parsed["battery_limit"] == 0.8
assert parsed["emergency_scale"] == 1.0
assert abs(parsed["linear_x"] - 0.5) < 1e-6
class TestSpeedLimiterScenarios:
"""Integration-style tests for realistic scenarios."""
def test_scenario_rocky_terrain_low_battery(self, node):
"""Scenario: rocky terrain with low battery."""
# Rocky terrain severely limits speed
node._on_terrain_scale(Float32(data=0.2))
# Battery is critical
node._on_battery_limit(Float32(data=0.3))
# No emergency
msg = EmergencyEvent()
msg.state = "NOMINAL"
node._on_emergency(msg)
# Scale should be min(0.2, 0.3, 1.0) = 0.2
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
assert scale == 0.2
def test_scenario_emergency_stop(self, node):
"""Scenario: emergency escalation triggers immediate stop."""
# Nominal terrain and battery levels
node._on_terrain_scale(Float32(data=1.0))
node._on_battery_limit(Float32(data=1.0))
# Emergency escalation
msg = EmergencyEvent()
msg.state = "ESCALATED"
node._on_emergency(msg)
# Scale should be 0.0 due to emergency override
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
assert scale == 0.0
def test_scenario_smooth_terrain_good_battery(self, node):
"""Scenario: ideal conditions."""
node._on_terrain_scale(Float32(data=1.0))
node._on_battery_limit(Float32(data=0.9))
msg = EmergencyEvent()
msg.state = "NOMINAL"
node._on_emergency(msg)
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
assert scale == 0.9