Compare commits

..

1 Commits

Author SHA1 Message Date
6e1a2095b7 feat: Add Issue #194 - Speed limiter node (50Hz safety constraints)
Implements speed limiter ROS2 node that enforces safety constraints from:
  - Terrain type (via /saltybot/terrain_speed_scale)
  - Battery level (via /saltybot/speed_limit)
  - Emergency system state (via /saltybot/emergency)

Node computes minimum scale factor and applies to /cmd_vel, forwarding
limited commands to /saltybot/cmd_vel_limited. Publishes JSON telemetry
on /saltybot/speed_limit_info at 50Hz (20ms cycle).

Features:
  - Subscription to terrain, battery, and emergency constraints
  - Twist velocity scaling with min-of-three constraint logic
  - JSON telemetry with timestamp and all scale factors
  - 50Hz timer-based publishing for real-time control
  - Emergency state mapping (NOMINAL/RECOVERING/STOPPING/ESCALATED)
  - Comprehensive unit tests (18 test cases)

Package: saltybot_speed_limiter
Entry point: speed_limiter_node
Launch file: speed_limiter.launch.py

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-03-02 11:10:29 -05:00
16 changed files with 0 additions and 942 deletions

View File

@ -1,28 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_apriltag</name>
<version>0.1.0</version>
<description>
AprilTag (DICT_APRILTAG_36h11) landmark detector for SaltyBot CSI cameras.
Detects tags in incoming frames and publishes DetectedTagArray at 10 Hz.
</description>
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>cv_bridge</depend>
<depend>saltybot_apriltag_msgs</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -1,85 +0,0 @@
"""
_aruco_utils.py Pure-Python helpers for AprilTag detection (no ROS2 deps).
Importable without rclpy for unit testing.
"""
from __future__ import annotations
import math
import numpy as np
import cv2
_FAMILY = 'DICT_APRILTAG_36h11'
def rvec_tvec_to_pose(rvec: np.ndarray, tvec: np.ndarray):
"""
Convert OpenCV rotation + translation vectors to geometry_msgs/Pose.
Returns a plain namespace object to avoid importing geometry_msgs here.
"""
R, _ = cv2.Rodrigues(rvec)
m = R
t = m[0, 0] + m[1, 1] + m[2, 2]
if t > 0:
s = 0.5 / math.sqrt(t + 1.0)
qw = 0.25 / s
qx = (m[2, 1] - m[1, 2]) * s
qy = (m[0, 2] - m[2, 0]) * s
qz = (m[1, 0] - m[0, 1]) * s
elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]:
s = 2.0 * math.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2])
qw = (m[2, 1] - m[1, 2]) / s
qx = 0.25 * s
qy = (m[0, 1] + m[1, 0]) / s
qz = (m[0, 2] + m[2, 0]) / s
elif m[1, 1] > m[2, 2]:
s = 2.0 * math.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2])
qw = (m[0, 2] - m[2, 0]) / s
qx = (m[0, 1] + m[1, 0]) / s
qy = 0.25 * s
qz = (m[1, 2] + m[2, 1]) / s
else:
s = 2.0 * math.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1])
qw = (m[1, 0] - m[0, 1]) / s
qx = (m[0, 2] + m[2, 0]) / s
qy = (m[1, 2] + m[2, 1]) / s
qz = 0.25 * s
return (
float(tvec.flat[0]), float(tvec.flat[1]), float(tvec.flat[2]),
float(qx), float(qy), float(qz), float(qw),
)
class ArucoBackend:
"""Wraps cv2.aruco, handling the 4.6 vs 4.7+ API difference."""
def __init__(self, logger=None):
self._log = logger
dictionary = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_APRILTAG_36h11)
params = cv2.aruco.DetectorParameters()
try:
self._detector = cv2.aruco.ArucoDetector(dictionary, params)
self._new_api = True
if logger:
logger.info('[apriltag] ArucoDetector (OpenCV ≥ 4.7)')
except AttributeError:
self._dict = dictionary
self._params = params
self._new_api = False
if logger:
logger.info('[apriltag] legacy aruco.detectMarkers (OpenCV 4.6)')
def detect(self, gray: np.ndarray):
"""Returns (corners, ids); ids may be None."""
if self._new_api:
corners, ids, _ = self._detector.detectMarkers(gray)
else:
corners, ids, _ = cv2.aruco.detectMarkers(
gray, self._dict, parameters=self._params
)
return corners, ids

View File

@ -1,212 +0,0 @@
"""
apriltag_node.py AprilTag (DICT_APRILTAG_36h11) landmark detector.
Detects AprilTags in CSI camera frames and publishes results at 10 Hz.
When camera_info is available, estimates full 6-DOF pose per tag via
cv2.solvePnP (SOLVEPNP_IPPE_SQUARE).
Supports OpenCV 4.6 (legacy detectMarkers) and 4.7+ (ArucoDetector).
Subscribes:
<image_topic> sensor_msgs/Image default /camera/color/image_raw
<image_topic_dir>/camera_info sensor_msgs/CameraInfo latched intrinsics
Publishes:
/saltybot/apriltags saltybot_apriltag_msgs/DetectedTagArray 10 Hz
Parameters:
image_topic str '/camera/color/image_raw'
publish_hz float 10.0
tag_size_m float 0.10 physical tag side in metres (for pose estimation)
min_margin float 10.0 minimum corner-span pixels to keep a detection
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
import numpy as np
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import Point, Pose
try:
from saltybot_apriltag_msgs.msg import DetectedTag, DetectedTagArray
_MSGS_OK = True
except ImportError:
_MSGS_OK = False
from ._aruco_utils import ArucoBackend, rvec_tvec_to_pose
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
_LATCHED_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
_FAMILY = 'DICT_APRILTAG_36h11'
class AprilTagNode(Node):
def __init__(self):
super().__init__('apriltag_detector')
self.declare_parameter('image_topic', '/camera/color/image_raw')
self.declare_parameter('publish_hz', 10.0)
self.declare_parameter('tag_size_m', 0.10)
self.declare_parameter('min_margin', 10.0)
img_topic = self.get_parameter('image_topic').value
pub_hz = self.get_parameter('publish_hz').value
self._tag_size = self.get_parameter('tag_size_m').value
self._min_margin = self.get_parameter('min_margin').value
self._bridge = CvBridge()
self._backend = ArucoBackend(self.get_logger())
self._K: np.ndarray | None = None
self._dist: np.ndarray | None = None
self._latest_img: Image | None = None
# Derive camera_info topic from image topic namespace
info_topic = img_topic.rsplit('/', 1)[0] + '/camera_info'
self.create_subscription(Image, img_topic, self._on_image, _SENSOR_QOS)
self.create_subscription(CameraInfo, info_topic, self._on_camera_info, _LATCHED_QOS)
if _MSGS_OK:
self._pub = self.create_publisher(
DetectedTagArray, '/saltybot/apriltags', 10
)
else:
self._pub = None
self.get_logger().warning(
'[apriltag] saltybot_apriltag_msgs not built — '
'DetectedTagArray not published'
)
self.create_timer(1.0 / pub_hz, self._tick)
self.get_logger().info(
f'apriltag_detector ready — dict={_FAMILY} '
f'tag_size={self._tag_size}m topic={img_topic} hz={pub_hz}'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _on_image(self, msg: Image) -> None:
self._latest_img = msg
def _on_camera_info(self, msg: CameraInfo) -> None:
self._K = np.array(msg.k, dtype=np.float64).reshape(3, 3)
self._dist = np.array(msg.d, dtype=np.float64)
# ── 10 Hz tick ────────────────────────────────────────────────────────────
def _tick(self) -> None:
if self._latest_img is None or self._pub is None:
return
try:
gray = self._bridge.imgmsg_to_cv2(
self._latest_img, desired_encoding='mono8'
)
except Exception as exc:
self.get_logger().warning(f'[apriltag] decode error: {exc}')
return
corners, ids = self._backend.detect(gray)
stamp = self.get_clock().now().to_msg()
arr = self._build_array(corners, ids, self._latest_img.header, stamp)
self._pub.publish(arr)
# ── Message builder ───────────────────────────────────────────────────────
def _build_array(self, corners, ids, src_header, stamp) -> 'DetectedTagArray':
arr = DetectedTagArray()
arr.header.stamp = stamp
arr.header.frame_id = src_header.frame_id
if ids is None or len(ids) == 0:
arr.count = 0
return arr
half = self._tag_size / 2.0
obj_pts = np.array([
[-half, half, 0],
[ half, half, 0],
[ half, -half, 0],
[-half, -half, 0],
], dtype=np.float64)
for corner_set, tag_id in zip(corners, ids.flatten()):
pts = corner_set.reshape(4, 2)
margin = float(np.mean([
np.linalg.norm(pts[i] - pts[(i + 1) % 4]) for i in range(4)
]))
if margin < self._min_margin:
continue
cx = float(np.mean(pts[:, 0]))
cy = float(np.mean(pts[:, 1]))
tag = DetectedTag()
tag.header = arr.header
tag.tag_id = int(tag_id)
tag.family = _FAMILY
tag.decision_margin = margin
tag.corners = pts.flatten().tolist()
tag.center = Point(x=cx, y=cy, z=0.0)
if self._K is not None and self._dist is not None:
ok, rvec, tvec = cv2.solvePnP(
obj_pts, pts.astype(np.float64),
self._K, self._dist,
flags=cv2.SOLVEPNP_IPPE_SQUARE,
)
if ok:
tx, ty, tz, qx, qy, qz, qw = rvec_tvec_to_pose(rvec, tvec)
tag.pose.position.x = tx
tag.pose.position.y = ty
tag.pose.position.z = tz
tag.pose.orientation.x = qx
tag.pose.orientation.y = qy
tag.pose.orientation.z = qz
tag.pose.orientation.w = qw
tag.pose_valid = True
else:
tag.pose_valid = False
else:
tag.pose_valid = False
arr.tags.append(tag)
arr.count = len(arr.tags)
return arr
def main(args=None):
rclpy.init(args=args)
node = AprilTagNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -1,4 +0,0 @@
[develop]
script_dir=$base/lib/saltybot_apriltag
[install]
install_scripts=$base/lib/saltybot_apriltag

View File

@ -1,29 +0,0 @@
from setuptools import setup, find_packages
from glob import glob
package_name = 'saltybot_apriltag'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch',
glob('launch/*.launch.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='SaltyLab',
maintainer_email='robot@saltylab.local',
description='AprilTag DICT_APRILTAG_36h11 detector — 10 Hz, 6-DOF pose',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'apriltag_detector = saltybot_apriltag.apriltag_node:main',
],
},
)

View File

@ -1,142 +0,0 @@
"""
test_apriltag.py Unit tests for AprilTag detector helpers.
Runs without ROS2 (no rclpy). Tests cover:
- _rvec_tvec_to_pose: rotation/translation Pose quaternion
- _ArucoBackend: detection on a synthetic rendered tag image
- AprilTagNode._build_array: message construction from mock detections
"""
from __future__ import annotations
import math
import sys
import os
import numpy as np
import pytest
import cv2
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_apriltag._aruco_utils import rvec_tvec_to_pose as _rvec_tvec_to_pose, ArucoBackend as _ArucoBackend
# ── _rvec_tvec_to_pose ────────────────────────────────────────────────────────
class TestRvecTvecToPose:
# _rvec_tvec_to_pose returns (tx, ty, tz, qx, qy, qz, qw)
def test_identity_rotation_gives_unit_quaternion(self):
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(np.zeros(3), np.zeros(3))
assert qw == pytest.approx(1.0, abs=1e-6)
assert qx == pytest.approx(0.0, abs=1e-6)
assert qy == pytest.approx(0.0, abs=1e-6)
assert qz == pytest.approx(0.0, abs=1e-6)
def test_translation_passed_through(self):
tvec = np.array([1.5, -0.3, 2.0])
tx, ty, tz, *_ = _rvec_tvec_to_pose(np.zeros(3), tvec)
assert tx == pytest.approx(1.5)
assert ty == pytest.approx(-0.3)
assert tz == pytest.approx(2.0)
def test_90deg_yaw_quaternion(self):
# 90° rotation around Z axis → qw=cos(45°), qz=sin(45°)
rvec = np.array([0.0, 0.0, math.pi / 2])
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, np.zeros(3))
assert qw == pytest.approx(math.cos(math.pi / 4), abs=1e-5)
assert qz == pytest.approx(math.sin(math.pi / 4), abs=1e-5)
assert qx == pytest.approx(0.0, abs=1e-5)
assert qy == pytest.approx(0.0, abs=1e-5)
def test_quaternion_is_unit_norm(self):
rvec = np.array([0.3, 0.7, -1.1])
tvec = np.array([0.5, 0.5, 0.5])
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, tvec)
norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
assert norm == pytest.approx(1.0, abs=1e-5)
def test_180deg_x_rotation(self):
rvec = np.array([math.pi, 0.0, 0.0])
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, np.zeros(3))
norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
assert norm == pytest.approx(1.0, abs=1e-5)
# ── _ArucoBackend ─────────────────────────────────────────────────────────────
class _FakeLogger:
def info(self, msg): pass
def warning(self, msg): pass
def debug(self, msg): pass
def _render_apriltag_36h11(tag_id: int = 0, size: int = 200) -> np.ndarray:
"""
Render an AprilTag DICT_APRILTAG_36h11 tag into a greyscale image.
Returns an image large enough for reliable detection.
"""
dictionary = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_APRILTAG_36h11)
# Draw the marker (OpenCV ≥ 4.6)
tag_img = np.zeros((size, size), dtype=np.uint8)
try:
cv2.aruco.generateImageMarker(dictionary, tag_id, size, tag_img, 1)
except AttributeError:
# Older API
tag_img = cv2.aruco.drawMarker(dictionary, tag_id, size)
# Embed in white canvas with border
canvas = np.ones((size + 80, size + 80), dtype=np.uint8) * 255
canvas[40:40 + size, 40:40 + size] = tag_img
return canvas
class TestArucoBackend:
def test_backend_initialises(self):
backend = _ArucoBackend(_FakeLogger())
assert backend is not None
def test_detect_returns_tuple(self):
backend = _ArucoBackend(_FakeLogger())
gray = np.ones((480, 640), dtype=np.uint8) * 200
corners, ids = backend.detect(gray)
assert isinstance(corners, (list, tuple))
def test_blank_image_no_detections(self):
backend = _ArucoBackend(_FakeLogger())
gray = np.ones((480, 640), dtype=np.uint8) * 200
_, ids = backend.detect(gray)
assert ids is None or len(ids) == 0
def test_rendered_tag_detected(self):
backend = _ArucoBackend(_FakeLogger())
tag_img = _render_apriltag_36h11(tag_id=5, size=200)
corners, ids = backend.detect(tag_img)
assert ids is not None and len(ids) == 1
assert int(ids.flatten()[0]) == 5
def test_rendered_tag_corners_shape(self):
backend = _ArucoBackend(_FakeLogger())
tag_img = _render_apriltag_36h11(tag_id=0, size=200)
corners, ids = backend.detect(tag_img)
assert ids is not None
assert len(corners) == 1
assert corners[0].reshape(4, 2).shape == (4, 2)
def test_multiple_tags_detected(self):
"""Place two tags side-by-side and expect both found."""
backend = _ArucoBackend(_FakeLogger())
t0 = _render_apriltag_36h11(tag_id=0, size=150)
t1 = _render_apriltag_36h11(tag_id=1, size=150)
canvas = np.ones((t0.shape[0], t0.shape[1] * 2 + 20), dtype=np.uint8) * 255
canvas[:, :t0.shape[1]] = t0
canvas[:, t0.shape[1] + 20:] = t1
_, ids = backend.detect(canvas)
assert ids is not None and len(ids) == 2
assert set(ids.flatten().tolist()) == {0, 1}
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@ -1,16 +0,0 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_apriltag_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/DetectedTag.msg"
"msg/DetectedTagArray.msg"
DEPENDENCIES std_msgs geometry_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -1,18 +0,0 @@
# DetectedTag — single AprilTag detection result.
std_msgs/Header header
int32 tag_id # numeric ID encoded in the tag
string family # e.g. 'DICT_APRILTAG_36h11'
float32 decision_margin # detector confidence (higher = more certain)
# Corner pixel coordinates [x0,y0, x1,y1, x2,y2, x3,y3] (image frame, CW from TL)
float32[8] corners
# Tag centre in image pixels
geometry_msgs/Point center
# 6-DOF pose in camera optical frame (only valid when pose_valid=true)
geometry_msgs/Pose pose
bool pose_valid # false if camera_info not yet received

View File

@ -1,7 +0,0 @@
# DetectedTagArray — all AprilTags found in one frame.
# Published at ~10 Hz on /saltybot/apriltags.
std_msgs/Header header
saltybot_apriltag_msgs/DetectedTag[] tags
int32 count

View File

@ -1,22 +0,0 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_apriltag_msgs</name>
<version>0.1.0</version>
<description>Message types for AprilTag landmark detection.</description>
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -1,60 +0,0 @@
"""
_camera_state.py Pure-Python camera health state (no ROS2 deps).
Importable without rclpy for unit testing.
"""
from __future__ import annotations
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class CameraState:
topic: str
v4l2_dev: Optional[str] # None for D435i (USB — no v4l2 reset)
last_frame_t: Optional[float] = None # monotonic seconds
total_frames: int = 0
_ts_window: deque = field(default_factory=lambda: deque(maxlen=200))
_last_reset_t: float = 0.0 # for reset cooldown
def on_frame(self) -> None:
now = time.monotonic()
self.last_frame_t = now
self.total_frames += 1
self._ts_window.append(now)
@property
def age_s(self) -> float:
if self.last_frame_t is None:
return float('inf')
return time.monotonic() - self.last_frame_t
def fps(self, window_s: float = 5.0) -> float:
now = time.monotonic()
recent = [t for t in self._ts_window if now - t <= window_s]
if len(recent) < 2:
return 0.0
return (len(recent) - 1) / (recent[-1] - recent[0])
def status(self, warn_thr: float, error_thr: float) -> str:
age = self.age_s
if age == float('inf'):
return 'never'
if age > error_thr:
return 'error'
if age > warn_thr:
return 'warn'
return 'ok'
def should_reset(self, cooldown_s: float = 30.0) -> bool:
return (
self.v4l2_dev is not None
and time.monotonic() - self._last_reset_t > cooldown_s
)
def mark_reset(self) -> None:
self._last_reset_t = time.monotonic()

View File

@ -1,210 +0,0 @@
"""
camera_health_node.py Camera watchdog for all SaltyBot image streams.
Monitors frame arrival timestamps for each configured camera topic.
On silence:
> warn_timeout_s (default 2s) WARNING log
> error_timeout_s (default 10s) ERROR log + v4l2 reset attempt (CSI only)
Publishes /saltybot/camera_health as std_msgs/String JSON at 1 Hz:
{
"ts": "2026-03-02T15:00:00Z",
"cameras": {
"d435i_color": {
"topic": "/camera/color/image_raw",
"status": "ok" | "warn" | "error" | "never",
"age_s": 1.23,
"fps": 29.8,
"total_frames": 1234
},
...
},
"healthy": 5,
"degraded": 1,
"total": 6
}
CSI camera map (topic /dev/videoN) is used for v4l2 reset.
D435i streams are USB no v4l2 reset (RealSense SDK manages recovery).
Parameters:
warn_timeout_s float 2.0
error_timeout_s float 10.0
publish_hz float 1.0
fps_window_s float 5.0 rolling window for FPS estimate
"""
from __future__ import annotations
import json
import subprocess
from datetime import datetime, timezone
from typing import Dict
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import Image
from std_msgs.msg import String
from ._camera_state import CameraState
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=3,
)
# Default cameras: name → (topic, csi_device_or_None)
_DEFAULT_CAMERAS: Dict[str, tuple] = {
'd435i_color': ('/camera/color/image_raw', None),
'd435i_depth': ('/camera/depth/image_rect_raw', None),
'csi_front': ('/camera/front/image_raw', '/dev/video0'),
'csi_right': ('/camera/right/image_raw', '/dev/video2'),
'csi_rear': ('/camera/rear/image_raw', '/dev/video4'),
'csi_left': ('/camera/left/image_raw', '/dev/video6'),
}
class CameraHealthNode(Node):
def __init__(self):
super().__init__('camera_health_monitor')
self.declare_parameter('warn_timeout_s', 2.0)
self.declare_parameter('error_timeout_s', 10.0)
self.declare_parameter('publish_hz', 1.0)
self.declare_parameter('fps_window_s', 5.0)
self._warn_thr = self.get_parameter('warn_timeout_s').value
self._error_thr = self.get_parameter('error_timeout_s').value
self._fps_win = self.get_parameter('fps_window_s').value
# Build camera states
self._cameras: Dict[str, CameraState] = {
name: CameraState(topic=topic, v4l2_dev=dev)
for name, (topic, dev) in _DEFAULT_CAMERAS.items()
}
# Subscribe to each topic
for name, cam in self._cameras.items():
self.create_subscription(
Image, cam.topic,
self._make_cb(name),
_SENSOR_QOS,
)
self._pub = self.create_publisher(String, '/saltybot/camera_health', 10)
self.create_timer(
1.0 / self.get_parameter('publish_hz').value, self._tick
)
self.get_logger().info(
f'camera_health_monitor ready — watching {len(self._cameras)} cameras '
f'warn={self._warn_thr}s error={self._error_thr}s'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _make_cb(self, name: str):
def _cb(msg: Image) -> None:
self._cameras[name].on_frame()
return _cb
# ── 1 Hz watchdog tick ────────────────────────────────────────────────────
def _tick(self) -> None:
healthy = degraded = 0
for name, cam in self._cameras.items():
status = cam.status(self._warn_thr, self._error_thr)
if status == 'ok':
healthy += 1
elif status in ('warn', 'error', 'never'):
degraded += 1
age = cam.age_s
age_str = f'{age:.1f}' if age != float('inf') else ''
if status == 'warn':
self.get_logger().warning(
f'[cam_health] {name} ({cam.topic}): '
f'no frame for {age_str}s'
)
elif status in ('error', 'never'):
self.get_logger().error(
f'[cam_health] {name} ({cam.topic}): '
f'no frame for {age_str}s — camera may be offline'
)
if status == 'error' and cam.should_reset():
self._v4l2_reset(name, cam)
payload = self._build_json(healthy, degraded)
msg = String()
msg.data = payload
self._pub.publish(msg)
# ── v4l2 reset (CSI cameras only) ─────────────────────────────────────────
def _v4l2_reset(self, name: str, cam: CameraState) -> None:
dev = cam.v4l2_dev
self.get_logger().warning(
f'[cam_health] attempting v4l2 reset for {name} on {dev}'
)
cam.mark_reset()
try:
# Toggle streaming off then on to nudge the driver
subprocess.run(
['v4l2-ctl', '-d', dev, '--stream-off'],
capture_output=True, timeout=2.0
)
subprocess.run(
['v4l2-ctl', '-d', dev, '--stream-on'],
capture_output=True, timeout=2.0
)
self.get_logger().info(
f'[cam_health] v4l2 reset sent to {dev} for {name}'
)
except (subprocess.TimeoutExpired, FileNotFoundError) as exc:
self.get_logger().warning(
f'[cam_health] v4l2 reset failed for {name}: {exc}'
)
# ── JSON builder ──────────────────────────────────────────────────────────
def _build_json(self, healthy: int, degraded: int) -> str:
cameras_out = {}
for name, cam in self._cameras.items():
age = cam.age_s
cameras_out[name] = {
'topic': cam.topic,
'status': cam.status(self._warn_thr, self._error_thr),
'age_s': round(age, 2) if age != float('inf') else None,
'fps': round(cam.fps(self._fps_win), 1),
'total_frames': cam.total_frames,
}
return json.dumps({
'ts': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
'cameras': cameras_out,
'healthy': healthy,
'degraded': degraded,
'total': len(self._cameras),
})
def main(args=None):
rclpy.init(args=args)
node = CameraHealthNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -29,7 +29,6 @@ setup(
entry_points={
'console_scripts': [
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
],
},
)

View File

@ -1,108 +0,0 @@
"""
test_camera_health.py Unit tests for CameraState (no ROS2 required).
"""
import sys
import os
import time
import json
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._camera_state import CameraState
class TestCameraState:
def test_initial_status_never(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.status(2.0, 10.0) == 'never'
def test_age_inf_before_first_frame(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.age_s == float('inf')
def test_on_frame_sets_age_near_zero(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
assert cs.age_s < 0.1
def test_on_frame_increments_count(self):
cs = CameraState(topic='/test', v4l2_dev=None)
for _ in range(5):
cs.on_frame()
assert cs.total_frames == 5
def test_status_ok_after_fresh_frame(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
assert cs.status(2.0, 10.0) == 'ok'
def test_status_warn_after_2s(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 3.0 # fake 3 seconds ago
assert cs.status(2.0, 10.0) == 'warn'
def test_status_error_after_10s(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 11.0
assert cs.status(2.0, 10.0) == 'error'
def test_fps_zero_before_frames(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.fps(5.0) == pytest.approx(0.0)
def test_fps_estimate(self):
cs = CameraState(topic='/test', v4l2_dev=None)
# Simulate 10 frames over 1 second
base = time.monotonic()
for i in range(11):
cs._ts_window.append(base + i * 0.1)
cs.last_frame_t = base + 1.0
assert cs.fps(5.0) == pytest.approx(10.0, abs=0.5)
def test_should_reset_false_for_d435i(self):
cs = CameraState(topic='/camera/color/image_raw', v4l2_dev=None)
assert not cs.should_reset()
def test_should_reset_true_for_csi_first_time(self):
cs = CameraState(topic='/camera/front/image_raw', v4l2_dev='/dev/video0')
assert cs.should_reset(cooldown_s=30.0)
def test_should_reset_false_within_cooldown(self):
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
cs.mark_reset()
assert not cs.should_reset(cooldown_s=30.0)
def test_should_reset_true_after_cooldown(self):
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
cs.mark_reset()
cs._last_reset_t -= 31.0 # fake cooldown elapsed
assert cs.should_reset(cooldown_s=30.0)
def test_custom_thresholds(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 1.5
assert cs.status(1.0, 5.0) == 'warn'
assert cs.status(2.0, 5.0) == 'ok'
def test_fps_ignores_old_frames(self):
cs = CameraState(topic='/test', v4l2_dev=None)
old = time.monotonic() - 60.0
for _ in range(50):
cs._ts_window.append(old)
# Recent frames
now = time.monotonic()
for i in range(6):
cs._ts_window.append(now + i * 0.1)
cs.last_frame_t = now + 0.5
assert cs.fps(5.0) == pytest.approx(10.0, abs=1.0)
if __name__ == '__main__':
pytest.main([__file__, '-v'])