Merge pull request 'feat(jetson): camera health watchdog — 6 streams, WARNING/ERROR, v4l2 reset (issue #198)' (#199) from sl-perception/issue-198-camera-health into main

This commit is contained in:
sl-jetson 2026-03-02 11:12:32 -05:00
commit b345128427
4 changed files with 379 additions and 0 deletions

View File

@ -0,0 +1,60 @@
"""
_camera_state.py Pure-Python camera health state (no ROS2 deps).
Importable without rclpy for unit testing.
"""
from __future__ import annotations
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class CameraState:
topic: str
v4l2_dev: Optional[str] # None for D435i (USB — no v4l2 reset)
last_frame_t: Optional[float] = None # monotonic seconds
total_frames: int = 0
_ts_window: deque = field(default_factory=lambda: deque(maxlen=200))
_last_reset_t: float = 0.0 # for reset cooldown
def on_frame(self) -> None:
now = time.monotonic()
self.last_frame_t = now
self.total_frames += 1
self._ts_window.append(now)
@property
def age_s(self) -> float:
if self.last_frame_t is None:
return float('inf')
return time.monotonic() - self.last_frame_t
def fps(self, window_s: float = 5.0) -> float:
now = time.monotonic()
recent = [t for t in self._ts_window if now - t <= window_s]
if len(recent) < 2:
return 0.0
return (len(recent) - 1) / (recent[-1] - recent[0])
def status(self, warn_thr: float, error_thr: float) -> str:
age = self.age_s
if age == float('inf'):
return 'never'
if age > error_thr:
return 'error'
if age > warn_thr:
return 'warn'
return 'ok'
def should_reset(self, cooldown_s: float = 30.0) -> bool:
return (
self.v4l2_dev is not None
and time.monotonic() - self._last_reset_t > cooldown_s
)
def mark_reset(self) -> None:
self._last_reset_t = time.monotonic()

View File

@ -0,0 +1,210 @@
"""
camera_health_node.py Camera watchdog for all SaltyBot image streams.
Monitors frame arrival timestamps for each configured camera topic.
On silence:
> warn_timeout_s (default 2s) WARNING log
> error_timeout_s (default 10s) ERROR log + v4l2 reset attempt (CSI only)
Publishes /saltybot/camera_health as std_msgs/String JSON at 1 Hz:
{
"ts": "2026-03-02T15:00:00Z",
"cameras": {
"d435i_color": {
"topic": "/camera/color/image_raw",
"status": "ok" | "warn" | "error" | "never",
"age_s": 1.23,
"fps": 29.8,
"total_frames": 1234
},
...
},
"healthy": 5,
"degraded": 1,
"total": 6
}
CSI camera map (topic /dev/videoN) is used for v4l2 reset.
D435i streams are USB no v4l2 reset (RealSense SDK manages recovery).
Parameters:
warn_timeout_s float 2.0
error_timeout_s float 10.0
publish_hz float 1.0
fps_window_s float 5.0 rolling window for FPS estimate
"""
from __future__ import annotations
import json
import subprocess
from datetime import datetime, timezone
from typing import Dict
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import Image
from std_msgs.msg import String
from ._camera_state import CameraState
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=3,
)
# Default cameras: name → (topic, csi_device_or_None)
_DEFAULT_CAMERAS: Dict[str, tuple] = {
'd435i_color': ('/camera/color/image_raw', None),
'd435i_depth': ('/camera/depth/image_rect_raw', None),
'csi_front': ('/camera/front/image_raw', '/dev/video0'),
'csi_right': ('/camera/right/image_raw', '/dev/video2'),
'csi_rear': ('/camera/rear/image_raw', '/dev/video4'),
'csi_left': ('/camera/left/image_raw', '/dev/video6'),
}
class CameraHealthNode(Node):
def __init__(self):
super().__init__('camera_health_monitor')
self.declare_parameter('warn_timeout_s', 2.0)
self.declare_parameter('error_timeout_s', 10.0)
self.declare_parameter('publish_hz', 1.0)
self.declare_parameter('fps_window_s', 5.0)
self._warn_thr = self.get_parameter('warn_timeout_s').value
self._error_thr = self.get_parameter('error_timeout_s').value
self._fps_win = self.get_parameter('fps_window_s').value
# Build camera states
self._cameras: Dict[str, CameraState] = {
name: CameraState(topic=topic, v4l2_dev=dev)
for name, (topic, dev) in _DEFAULT_CAMERAS.items()
}
# Subscribe to each topic
for name, cam in self._cameras.items():
self.create_subscription(
Image, cam.topic,
self._make_cb(name),
_SENSOR_QOS,
)
self._pub = self.create_publisher(String, '/saltybot/camera_health', 10)
self.create_timer(
1.0 / self.get_parameter('publish_hz').value, self._tick
)
self.get_logger().info(
f'camera_health_monitor ready — watching {len(self._cameras)} cameras '
f'warn={self._warn_thr}s error={self._error_thr}s'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _make_cb(self, name: str):
def _cb(msg: Image) -> None:
self._cameras[name].on_frame()
return _cb
# ── 1 Hz watchdog tick ────────────────────────────────────────────────────
def _tick(self) -> None:
healthy = degraded = 0
for name, cam in self._cameras.items():
status = cam.status(self._warn_thr, self._error_thr)
if status == 'ok':
healthy += 1
elif status in ('warn', 'error', 'never'):
degraded += 1
age = cam.age_s
age_str = f'{age:.1f}' if age != float('inf') else ''
if status == 'warn':
self.get_logger().warning(
f'[cam_health] {name} ({cam.topic}): '
f'no frame for {age_str}s'
)
elif status in ('error', 'never'):
self.get_logger().error(
f'[cam_health] {name} ({cam.topic}): '
f'no frame for {age_str}s — camera may be offline'
)
if status == 'error' and cam.should_reset():
self._v4l2_reset(name, cam)
payload = self._build_json(healthy, degraded)
msg = String()
msg.data = payload
self._pub.publish(msg)
# ── v4l2 reset (CSI cameras only) ─────────────────────────────────────────
def _v4l2_reset(self, name: str, cam: CameraState) -> None:
dev = cam.v4l2_dev
self.get_logger().warning(
f'[cam_health] attempting v4l2 reset for {name} on {dev}'
)
cam.mark_reset()
try:
# Toggle streaming off then on to nudge the driver
subprocess.run(
['v4l2-ctl', '-d', dev, '--stream-off'],
capture_output=True, timeout=2.0
)
subprocess.run(
['v4l2-ctl', '-d', dev, '--stream-on'],
capture_output=True, timeout=2.0
)
self.get_logger().info(
f'[cam_health] v4l2 reset sent to {dev} for {name}'
)
except (subprocess.TimeoutExpired, FileNotFoundError) as exc:
self.get_logger().warning(
f'[cam_health] v4l2 reset failed for {name}: {exc}'
)
# ── JSON builder ──────────────────────────────────────────────────────────
def _build_json(self, healthy: int, degraded: int) -> str:
cameras_out = {}
for name, cam in self._cameras.items():
age = cam.age_s
cameras_out[name] = {
'topic': cam.topic,
'status': cam.status(self._warn_thr, self._error_thr),
'age_s': round(age, 2) if age != float('inf') else None,
'fps': round(cam.fps(self._fps_win), 1),
'total_frames': cam.total_frames,
}
return json.dumps({
'ts': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
'cameras': cameras_out,
'healthy': healthy,
'degraded': degraded,
'total': len(self._cameras),
})
def main(args=None):
rclpy.init(args=args)
node = CameraHealthNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -29,6 +29,7 @@ setup(
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main', 'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,108 @@
"""
test_camera_health.py Unit tests for CameraState (no ROS2 required).
"""
import sys
import os
import time
import json
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._camera_state import CameraState
class TestCameraState:
def test_initial_status_never(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.status(2.0, 10.0) == 'never'
def test_age_inf_before_first_frame(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.age_s == float('inf')
def test_on_frame_sets_age_near_zero(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
assert cs.age_s < 0.1
def test_on_frame_increments_count(self):
cs = CameraState(topic='/test', v4l2_dev=None)
for _ in range(5):
cs.on_frame()
assert cs.total_frames == 5
def test_status_ok_after_fresh_frame(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
assert cs.status(2.0, 10.0) == 'ok'
def test_status_warn_after_2s(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 3.0 # fake 3 seconds ago
assert cs.status(2.0, 10.0) == 'warn'
def test_status_error_after_10s(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 11.0
assert cs.status(2.0, 10.0) == 'error'
def test_fps_zero_before_frames(self):
cs = CameraState(topic='/test', v4l2_dev=None)
assert cs.fps(5.0) == pytest.approx(0.0)
def test_fps_estimate(self):
cs = CameraState(topic='/test', v4l2_dev=None)
# Simulate 10 frames over 1 second
base = time.monotonic()
for i in range(11):
cs._ts_window.append(base + i * 0.1)
cs.last_frame_t = base + 1.0
assert cs.fps(5.0) == pytest.approx(10.0, abs=0.5)
def test_should_reset_false_for_d435i(self):
cs = CameraState(topic='/camera/color/image_raw', v4l2_dev=None)
assert not cs.should_reset()
def test_should_reset_true_for_csi_first_time(self):
cs = CameraState(topic='/camera/front/image_raw', v4l2_dev='/dev/video0')
assert cs.should_reset(cooldown_s=30.0)
def test_should_reset_false_within_cooldown(self):
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
cs.mark_reset()
assert not cs.should_reset(cooldown_s=30.0)
def test_should_reset_true_after_cooldown(self):
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
cs.mark_reset()
cs._last_reset_t -= 31.0 # fake cooldown elapsed
assert cs.should_reset(cooldown_s=30.0)
def test_custom_thresholds(self):
cs = CameraState(topic='/test', v4l2_dev=None)
cs.on_frame()
cs.last_frame_t -= 1.5
assert cs.status(1.0, 5.0) == 'warn'
assert cs.status(2.0, 5.0) == 'ok'
def test_fps_ignores_old_frames(self):
cs = CameraState(topic='/test', v4l2_dev=None)
old = time.monotonic() - 60.0
for _ in range(50):
cs._ts_window.append(old)
# Recent frames
now = time.monotonic()
for i in range(6):
cs._ts_window.append(now + i * 0.1)
cs.last_frame_t = now + 0.5
assert cs.fps(5.0) == pytest.approx(10.0, abs=1.0)
if __name__ == '__main__':
pytest.main([__file__, '-v'])