Compare commits
5 Commits
6e1a2095b7
...
bc45208768
| Author | SHA1 | Date | |
|---|---|---|---|
| bc45208768 | |||
| b345128427 | |||
| e6065e1531 | |||
| 7678ddebe2 | |||
| d6d7e7b75a |
28
jetson/ros2_ws/src/saltybot_apriltag/package.xml
Normal file
28
jetson/ros2_ws/src/saltybot_apriltag/package.xml
Normal file
@ -0,0 +1,28 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_apriltag</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
AprilTag (DICT_APRILTAG_36h11) landmark detector for SaltyBot CSI cameras.
|
||||
Detects tags in incoming frames and publishes DetectedTagArray at 10 Hz.
|
||||
</description>
|
||||
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>cv_bridge</depend>
|
||||
<depend>saltybot_apriltag_msgs</depend>
|
||||
|
||||
<exec_depend>python3-numpy</exec_depend>
|
||||
<exec_depend>python3-opencv</exec_depend>
|
||||
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,85 @@
|
||||
"""
|
||||
_aruco_utils.py — Pure-Python helpers for AprilTag detection (no ROS2 deps).
|
||||
|
||||
Importable without rclpy for unit testing.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
|
||||
_FAMILY = 'DICT_APRILTAG_36h11'
|
||||
|
||||
|
||||
def rvec_tvec_to_pose(rvec: np.ndarray, tvec: np.ndarray):
|
||||
"""
|
||||
Convert OpenCV rotation + translation vectors to geometry_msgs/Pose.
|
||||
Returns a plain namespace object to avoid importing geometry_msgs here.
|
||||
"""
|
||||
R, _ = cv2.Rodrigues(rvec)
|
||||
m = R
|
||||
|
||||
t = m[0, 0] + m[1, 1] + m[2, 2]
|
||||
if t > 0:
|
||||
s = 0.5 / math.sqrt(t + 1.0)
|
||||
qw = 0.25 / s
|
||||
qx = (m[2, 1] - m[1, 2]) * s
|
||||
qy = (m[0, 2] - m[2, 0]) * s
|
||||
qz = (m[1, 0] - m[0, 1]) * s
|
||||
elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]:
|
||||
s = 2.0 * math.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2])
|
||||
qw = (m[2, 1] - m[1, 2]) / s
|
||||
qx = 0.25 * s
|
||||
qy = (m[0, 1] + m[1, 0]) / s
|
||||
qz = (m[0, 2] + m[2, 0]) / s
|
||||
elif m[1, 1] > m[2, 2]:
|
||||
s = 2.0 * math.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2])
|
||||
qw = (m[0, 2] - m[2, 0]) / s
|
||||
qx = (m[0, 1] + m[1, 0]) / s
|
||||
qy = 0.25 * s
|
||||
qz = (m[1, 2] + m[2, 1]) / s
|
||||
else:
|
||||
s = 2.0 * math.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1])
|
||||
qw = (m[1, 0] - m[0, 1]) / s
|
||||
qx = (m[0, 2] + m[2, 0]) / s
|
||||
qy = (m[1, 2] + m[2, 1]) / s
|
||||
qz = 0.25 * s
|
||||
|
||||
return (
|
||||
float(tvec.flat[0]), float(tvec.flat[1]), float(tvec.flat[2]),
|
||||
float(qx), float(qy), float(qz), float(qw),
|
||||
)
|
||||
|
||||
|
||||
class ArucoBackend:
|
||||
"""Wraps cv2.aruco, handling the 4.6 vs 4.7+ API difference."""
|
||||
|
||||
def __init__(self, logger=None):
|
||||
self._log = logger
|
||||
dictionary = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_APRILTAG_36h11)
|
||||
params = cv2.aruco.DetectorParameters()
|
||||
|
||||
try:
|
||||
self._detector = cv2.aruco.ArucoDetector(dictionary, params)
|
||||
self._new_api = True
|
||||
if logger:
|
||||
logger.info('[apriltag] ArucoDetector (OpenCV ≥ 4.7)')
|
||||
except AttributeError:
|
||||
self._dict = dictionary
|
||||
self._params = params
|
||||
self._new_api = False
|
||||
if logger:
|
||||
logger.info('[apriltag] legacy aruco.detectMarkers (OpenCV 4.6)')
|
||||
|
||||
def detect(self, gray: np.ndarray):
|
||||
"""Returns (corners, ids); ids may be None."""
|
||||
if self._new_api:
|
||||
corners, ids, _ = self._detector.detectMarkers(gray)
|
||||
else:
|
||||
corners, ids, _ = cv2.aruco.detectMarkers(
|
||||
gray, self._dict, parameters=self._params
|
||||
)
|
||||
return corners, ids
|
||||
@ -0,0 +1,212 @@
|
||||
"""
|
||||
apriltag_node.py — AprilTag (DICT_APRILTAG_36h11) landmark detector.
|
||||
|
||||
Detects AprilTags in CSI camera frames and publishes results at 10 Hz.
|
||||
When camera_info is available, estimates full 6-DOF pose per tag via
|
||||
cv2.solvePnP (SOLVEPNP_IPPE_SQUARE).
|
||||
|
||||
Supports OpenCV 4.6 (legacy detectMarkers) and 4.7+ (ArucoDetector).
|
||||
|
||||
Subscribes:
|
||||
<image_topic> sensor_msgs/Image default /camera/color/image_raw
|
||||
<image_topic_dir>/camera_info sensor_msgs/CameraInfo latched intrinsics
|
||||
|
||||
Publishes:
|
||||
/saltybot/apriltags saltybot_apriltag_msgs/DetectedTagArray 10 Hz
|
||||
|
||||
Parameters:
|
||||
image_topic str '/camera/color/image_raw'
|
||||
publish_hz float 10.0
|
||||
tag_size_m float 0.10 physical tag side in metres (for pose estimation)
|
||||
min_margin float 10.0 minimum corner-span pixels to keep a detection
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
from sensor_msgs.msg import Image, CameraInfo
|
||||
from geometry_msgs.msg import Point, Pose
|
||||
|
||||
try:
|
||||
from saltybot_apriltag_msgs.msg import DetectedTag, DetectedTagArray
|
||||
_MSGS_OK = True
|
||||
except ImportError:
|
||||
_MSGS_OK = False
|
||||
|
||||
from ._aruco_utils import ArucoBackend, rvec_tvec_to_pose
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
_LATCHED_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=1,
|
||||
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
||||
)
|
||||
|
||||
_FAMILY = 'DICT_APRILTAG_36h11'
|
||||
|
||||
|
||||
class AprilTagNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('apriltag_detector')
|
||||
|
||||
self.declare_parameter('image_topic', '/camera/color/image_raw')
|
||||
self.declare_parameter('publish_hz', 10.0)
|
||||
self.declare_parameter('tag_size_m', 0.10)
|
||||
self.declare_parameter('min_margin', 10.0)
|
||||
|
||||
img_topic = self.get_parameter('image_topic').value
|
||||
pub_hz = self.get_parameter('publish_hz').value
|
||||
self._tag_size = self.get_parameter('tag_size_m').value
|
||||
self._min_margin = self.get_parameter('min_margin').value
|
||||
|
||||
self._bridge = CvBridge()
|
||||
self._backend = ArucoBackend(self.get_logger())
|
||||
|
||||
self._K: np.ndarray | None = None
|
||||
self._dist: np.ndarray | None = None
|
||||
self._latest_img: Image | None = None
|
||||
|
||||
# Derive camera_info topic from image topic namespace
|
||||
info_topic = img_topic.rsplit('/', 1)[0] + '/camera_info'
|
||||
|
||||
self.create_subscription(Image, img_topic, self._on_image, _SENSOR_QOS)
|
||||
self.create_subscription(CameraInfo, info_topic, self._on_camera_info, _LATCHED_QOS)
|
||||
|
||||
if _MSGS_OK:
|
||||
self._pub = self.create_publisher(
|
||||
DetectedTagArray, '/saltybot/apriltags', 10
|
||||
)
|
||||
else:
|
||||
self._pub = None
|
||||
self.get_logger().warning(
|
||||
'[apriltag] saltybot_apriltag_msgs not built — '
|
||||
'DetectedTagArray not published'
|
||||
)
|
||||
|
||||
self.create_timer(1.0 / pub_hz, self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f'apriltag_detector ready — dict={_FAMILY} '
|
||||
f'tag_size={self._tag_size}m topic={img_topic} hz={pub_hz}'
|
||||
)
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_image(self, msg: Image) -> None:
|
||||
self._latest_img = msg
|
||||
|
||||
def _on_camera_info(self, msg: CameraInfo) -> None:
|
||||
self._K = np.array(msg.k, dtype=np.float64).reshape(3, 3)
|
||||
self._dist = np.array(msg.d, dtype=np.float64)
|
||||
|
||||
# ── 10 Hz tick ────────────────────────────────────────────────────────────
|
||||
|
||||
def _tick(self) -> None:
|
||||
if self._latest_img is None or self._pub is None:
|
||||
return
|
||||
|
||||
try:
|
||||
gray = self._bridge.imgmsg_to_cv2(
|
||||
self._latest_img, desired_encoding='mono8'
|
||||
)
|
||||
except Exception as exc:
|
||||
self.get_logger().warning(f'[apriltag] decode error: {exc}')
|
||||
return
|
||||
|
||||
corners, ids = self._backend.detect(gray)
|
||||
|
||||
stamp = self.get_clock().now().to_msg()
|
||||
arr = self._build_array(corners, ids, self._latest_img.header, stamp)
|
||||
self._pub.publish(arr)
|
||||
|
||||
# ── Message builder ───────────────────────────────────────────────────────
|
||||
|
||||
def _build_array(self, corners, ids, src_header, stamp) -> 'DetectedTagArray':
|
||||
arr = DetectedTagArray()
|
||||
arr.header.stamp = stamp
|
||||
arr.header.frame_id = src_header.frame_id
|
||||
|
||||
if ids is None or len(ids) == 0:
|
||||
arr.count = 0
|
||||
return arr
|
||||
|
||||
half = self._tag_size / 2.0
|
||||
obj_pts = np.array([
|
||||
[-half, half, 0],
|
||||
[ half, half, 0],
|
||||
[ half, -half, 0],
|
||||
[-half, -half, 0],
|
||||
], dtype=np.float64)
|
||||
|
||||
for corner_set, tag_id in zip(corners, ids.flatten()):
|
||||
pts = corner_set.reshape(4, 2)
|
||||
margin = float(np.mean([
|
||||
np.linalg.norm(pts[i] - pts[(i + 1) % 4]) for i in range(4)
|
||||
]))
|
||||
if margin < self._min_margin:
|
||||
continue
|
||||
|
||||
cx = float(np.mean(pts[:, 0]))
|
||||
cy = float(np.mean(pts[:, 1]))
|
||||
|
||||
tag = DetectedTag()
|
||||
tag.header = arr.header
|
||||
tag.tag_id = int(tag_id)
|
||||
tag.family = _FAMILY
|
||||
tag.decision_margin = margin
|
||||
tag.corners = pts.flatten().tolist()
|
||||
tag.center = Point(x=cx, y=cy, z=0.0)
|
||||
|
||||
if self._K is not None and self._dist is not None:
|
||||
ok, rvec, tvec = cv2.solvePnP(
|
||||
obj_pts, pts.astype(np.float64),
|
||||
self._K, self._dist,
|
||||
flags=cv2.SOLVEPNP_IPPE_SQUARE,
|
||||
)
|
||||
if ok:
|
||||
tx, ty, tz, qx, qy, qz, qw = rvec_tvec_to_pose(rvec, tvec)
|
||||
tag.pose.position.x = tx
|
||||
tag.pose.position.y = ty
|
||||
tag.pose.position.z = tz
|
||||
tag.pose.orientation.x = qx
|
||||
tag.pose.orientation.y = qy
|
||||
tag.pose.orientation.z = qz
|
||||
tag.pose.orientation.w = qw
|
||||
tag.pose_valid = True
|
||||
else:
|
||||
tag.pose_valid = False
|
||||
else:
|
||||
tag.pose_valid = False
|
||||
|
||||
arr.tags.append(tag)
|
||||
|
||||
arr.count = len(arr.tags)
|
||||
return arr
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = AprilTagNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_apriltag/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_apriltag/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_apriltag
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_apriltag
|
||||
29
jetson/ros2_ws/src/saltybot_apriltag/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_apriltag/setup.py
Normal file
@ -0,0 +1,29 @@
|
||||
from setuptools import setup, find_packages
|
||||
from glob import glob
|
||||
|
||||
package_name = 'saltybot_apriltag'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.1.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
('share/' + package_name + '/launch',
|
||||
glob('launch/*.launch.py')),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
maintainer='SaltyLab',
|
||||
maintainer_email='robot@saltylab.local',
|
||||
description='AprilTag DICT_APRILTAG_36h11 detector — 10 Hz, 6-DOF pose',
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'apriltag_detector = saltybot_apriltag.apriltag_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
142
jetson/ros2_ws/src/saltybot_apriltag/test/test_apriltag.py
Normal file
142
jetson/ros2_ws/src/saltybot_apriltag/test/test_apriltag.py
Normal file
@ -0,0 +1,142 @@
|
||||
"""
|
||||
test_apriltag.py — Unit tests for AprilTag detector helpers.
|
||||
|
||||
Runs without ROS2 (no rclpy). Tests cover:
|
||||
- _rvec_tvec_to_pose: rotation/translation → Pose quaternion
|
||||
- _ArucoBackend: detection on a synthetic rendered tag image
|
||||
- AprilTagNode._build_array: message construction from mock detections
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import sys
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
import cv2
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_apriltag._aruco_utils import rvec_tvec_to_pose as _rvec_tvec_to_pose, ArucoBackend as _ArucoBackend
|
||||
|
||||
|
||||
# ── _rvec_tvec_to_pose ────────────────────────────────────────────────────────
|
||||
|
||||
class TestRvecTvecToPose:
|
||||
# _rvec_tvec_to_pose returns (tx, ty, tz, qx, qy, qz, qw)
|
||||
|
||||
def test_identity_rotation_gives_unit_quaternion(self):
|
||||
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(np.zeros(3), np.zeros(3))
|
||||
assert qw == pytest.approx(1.0, abs=1e-6)
|
||||
assert qx == pytest.approx(0.0, abs=1e-6)
|
||||
assert qy == pytest.approx(0.0, abs=1e-6)
|
||||
assert qz == pytest.approx(0.0, abs=1e-6)
|
||||
|
||||
def test_translation_passed_through(self):
|
||||
tvec = np.array([1.5, -0.3, 2.0])
|
||||
tx, ty, tz, *_ = _rvec_tvec_to_pose(np.zeros(3), tvec)
|
||||
assert tx == pytest.approx(1.5)
|
||||
assert ty == pytest.approx(-0.3)
|
||||
assert tz == pytest.approx(2.0)
|
||||
|
||||
def test_90deg_yaw_quaternion(self):
|
||||
# 90° rotation around Z axis → qw=cos(45°), qz=sin(45°)
|
||||
rvec = np.array([0.0, 0.0, math.pi / 2])
|
||||
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, np.zeros(3))
|
||||
assert qw == pytest.approx(math.cos(math.pi / 4), abs=1e-5)
|
||||
assert qz == pytest.approx(math.sin(math.pi / 4), abs=1e-5)
|
||||
assert qx == pytest.approx(0.0, abs=1e-5)
|
||||
assert qy == pytest.approx(0.0, abs=1e-5)
|
||||
|
||||
def test_quaternion_is_unit_norm(self):
|
||||
rvec = np.array([0.3, 0.7, -1.1])
|
||||
tvec = np.array([0.5, 0.5, 0.5])
|
||||
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, tvec)
|
||||
norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
|
||||
assert norm == pytest.approx(1.0, abs=1e-5)
|
||||
|
||||
def test_180deg_x_rotation(self):
|
||||
rvec = np.array([math.pi, 0.0, 0.0])
|
||||
tx, ty, tz, qx, qy, qz, qw = _rvec_tvec_to_pose(rvec, np.zeros(3))
|
||||
norm = math.sqrt(qx**2 + qy**2 + qz**2 + qw**2)
|
||||
assert norm == pytest.approx(1.0, abs=1e-5)
|
||||
|
||||
|
||||
# ── _ArucoBackend ─────────────────────────────────────────────────────────────
|
||||
|
||||
class _FakeLogger:
|
||||
def info(self, msg): pass
|
||||
def warning(self, msg): pass
|
||||
def debug(self, msg): pass
|
||||
|
||||
|
||||
def _render_apriltag_36h11(tag_id: int = 0, size: int = 200) -> np.ndarray:
|
||||
"""
|
||||
Render an AprilTag DICT_APRILTAG_36h11 tag into a greyscale image.
|
||||
Returns an image large enough for reliable detection.
|
||||
"""
|
||||
dictionary = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_APRILTAG_36h11)
|
||||
# Draw the marker (OpenCV ≥ 4.6)
|
||||
tag_img = np.zeros((size, size), dtype=np.uint8)
|
||||
try:
|
||||
cv2.aruco.generateImageMarker(dictionary, tag_id, size, tag_img, 1)
|
||||
except AttributeError:
|
||||
# Older API
|
||||
tag_img = cv2.aruco.drawMarker(dictionary, tag_id, size)
|
||||
|
||||
# Embed in white canvas with border
|
||||
canvas = np.ones((size + 80, size + 80), dtype=np.uint8) * 255
|
||||
canvas[40:40 + size, 40:40 + size] = tag_img
|
||||
return canvas
|
||||
|
||||
|
||||
class TestArucoBackend:
|
||||
|
||||
def test_backend_initialises(self):
|
||||
backend = _ArucoBackend(_FakeLogger())
|
||||
assert backend is not None
|
||||
|
||||
def test_detect_returns_tuple(self):
|
||||
backend = _ArucoBackend(_FakeLogger())
|
||||
gray = np.ones((480, 640), dtype=np.uint8) * 200
|
||||
corners, ids = backend.detect(gray)
|
||||
assert isinstance(corners, (list, tuple))
|
||||
|
||||
def test_blank_image_no_detections(self):
|
||||
backend = _ArucoBackend(_FakeLogger())
|
||||
gray = np.ones((480, 640), dtype=np.uint8) * 200
|
||||
_, ids = backend.detect(gray)
|
||||
assert ids is None or len(ids) == 0
|
||||
|
||||
def test_rendered_tag_detected(self):
|
||||
backend = _ArucoBackend(_FakeLogger())
|
||||
tag_img = _render_apriltag_36h11(tag_id=5, size=200)
|
||||
corners, ids = backend.detect(tag_img)
|
||||
assert ids is not None and len(ids) == 1
|
||||
assert int(ids.flatten()[0]) == 5
|
||||
|
||||
def test_rendered_tag_corners_shape(self):
|
||||
backend = _ArucoBackend(_FakeLogger())
|
||||
tag_img = _render_apriltag_36h11(tag_id=0, size=200)
|
||||
corners, ids = backend.detect(tag_img)
|
||||
assert ids is not None
|
||||
assert len(corners) == 1
|
||||
assert corners[0].reshape(4, 2).shape == (4, 2)
|
||||
|
||||
def test_multiple_tags_detected(self):
|
||||
"""Place two tags side-by-side and expect both found."""
|
||||
backend = _ArucoBackend(_FakeLogger())
|
||||
t0 = _render_apriltag_36h11(tag_id=0, size=150)
|
||||
t1 = _render_apriltag_36h11(tag_id=1, size=150)
|
||||
canvas = np.ones((t0.shape[0], t0.shape[1] * 2 + 20), dtype=np.uint8) * 255
|
||||
canvas[:, :t0.shape[1]] = t0
|
||||
canvas[:, t0.shape[1] + 20:] = t1
|
||||
_, ids = backend.detect(canvas)
|
||||
assert ids is not None and len(ids) == 2
|
||||
assert set(ids.flatten().tolist()) == {0, 1}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
16
jetson/ros2_ws/src/saltybot_apriltag_msgs/CMakeLists.txt
Normal file
16
jetson/ros2_ws/src/saltybot_apriltag_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,16 @@
|
||||
cmake_minimum_required(VERSION 3.8)
|
||||
project(saltybot_apriltag_msgs)
|
||||
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(rosidl_default_generators REQUIRED)
|
||||
find_package(std_msgs REQUIRED)
|
||||
find_package(geometry_msgs REQUIRED)
|
||||
|
||||
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/DetectedTag.msg"
|
||||
"msg/DetectedTagArray.msg"
|
||||
DEPENDENCIES std_msgs geometry_msgs
|
||||
)
|
||||
|
||||
ament_export_dependencies(rosidl_default_runtime)
|
||||
ament_package()
|
||||
@ -0,0 +1,18 @@
|
||||
# DetectedTag — single AprilTag detection result.
|
||||
|
||||
std_msgs/Header header
|
||||
|
||||
int32 tag_id # numeric ID encoded in the tag
|
||||
string family # e.g. 'DICT_APRILTAG_36h11'
|
||||
|
||||
float32 decision_margin # detector confidence (higher = more certain)
|
||||
|
||||
# Corner pixel coordinates [x0,y0, x1,y1, x2,y2, x3,y3] (image frame, CW from TL)
|
||||
float32[8] corners
|
||||
|
||||
# Tag centre in image pixels
|
||||
geometry_msgs/Point center
|
||||
|
||||
# 6-DOF pose in camera optical frame (only valid when pose_valid=true)
|
||||
geometry_msgs/Pose pose
|
||||
bool pose_valid # false if camera_info not yet received
|
||||
@ -0,0 +1,7 @@
|
||||
# DetectedTagArray — all AprilTags found in one frame.
|
||||
# Published at ~10 Hz on /saltybot/apriltags.
|
||||
|
||||
std_msgs/Header header
|
||||
|
||||
saltybot_apriltag_msgs/DetectedTag[] tags
|
||||
int32 count
|
||||
22
jetson/ros2_ws/src/saltybot_apriltag_msgs/package.xml
Normal file
22
jetson/ros2_ws/src/saltybot_apriltag_msgs/package.xml
Normal file
@ -0,0 +1,22 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_apriltag_msgs</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Message types for AprilTag landmark detection.</description>
|
||||
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||
|
||||
<depend>std_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
|
||||
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,60 @@
|
||||
"""
|
||||
_camera_state.py — Pure-Python camera health state (no ROS2 deps).
|
||||
Importable without rclpy for unit testing.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class CameraState:
|
||||
topic: str
|
||||
v4l2_dev: Optional[str] # None for D435i (USB — no v4l2 reset)
|
||||
|
||||
last_frame_t: Optional[float] = None # monotonic seconds
|
||||
total_frames: int = 0
|
||||
_ts_window: deque = field(default_factory=lambda: deque(maxlen=200))
|
||||
_last_reset_t: float = 0.0 # for reset cooldown
|
||||
|
||||
def on_frame(self) -> None:
|
||||
now = time.monotonic()
|
||||
self.last_frame_t = now
|
||||
self.total_frames += 1
|
||||
self._ts_window.append(now)
|
||||
|
||||
@property
|
||||
def age_s(self) -> float:
|
||||
if self.last_frame_t is None:
|
||||
return float('inf')
|
||||
return time.monotonic() - self.last_frame_t
|
||||
|
||||
def fps(self, window_s: float = 5.0) -> float:
|
||||
now = time.monotonic()
|
||||
recent = [t for t in self._ts_window if now - t <= window_s]
|
||||
if len(recent) < 2:
|
||||
return 0.0
|
||||
return (len(recent) - 1) / (recent[-1] - recent[0])
|
||||
|
||||
def status(self, warn_thr: float, error_thr: float) -> str:
|
||||
age = self.age_s
|
||||
if age == float('inf'):
|
||||
return 'never'
|
||||
if age > error_thr:
|
||||
return 'error'
|
||||
if age > warn_thr:
|
||||
return 'warn'
|
||||
return 'ok'
|
||||
|
||||
def should_reset(self, cooldown_s: float = 30.0) -> bool:
|
||||
return (
|
||||
self.v4l2_dev is not None
|
||||
and time.monotonic() - self._last_reset_t > cooldown_s
|
||||
)
|
||||
|
||||
def mark_reset(self) -> None:
|
||||
self._last_reset_t = time.monotonic()
|
||||
@ -0,0 +1,210 @@
|
||||
"""
|
||||
camera_health_node.py — Camera watchdog for all SaltyBot image streams.
|
||||
|
||||
Monitors frame arrival timestamps for each configured camera topic.
|
||||
On silence:
|
||||
> warn_timeout_s (default 2s) → WARNING log
|
||||
> error_timeout_s (default 10s) → ERROR log + v4l2 reset attempt (CSI only)
|
||||
|
||||
Publishes /saltybot/camera_health as std_msgs/String JSON at 1 Hz:
|
||||
{
|
||||
"ts": "2026-03-02T15:00:00Z",
|
||||
"cameras": {
|
||||
"d435i_color": {
|
||||
"topic": "/camera/color/image_raw",
|
||||
"status": "ok" | "warn" | "error" | "never",
|
||||
"age_s": 1.23,
|
||||
"fps": 29.8,
|
||||
"total_frames": 1234
|
||||
},
|
||||
...
|
||||
},
|
||||
"healthy": 5,
|
||||
"degraded": 1,
|
||||
"total": 6
|
||||
}
|
||||
|
||||
CSI camera map (topic → /dev/videoN) is used for v4l2 reset.
|
||||
D435i streams are USB — no v4l2 reset (RealSense SDK manages recovery).
|
||||
|
||||
Parameters:
|
||||
warn_timeout_s float 2.0
|
||||
error_timeout_s float 10.0
|
||||
publish_hz float 1.0
|
||||
fps_window_s float 5.0 rolling window for FPS estimate
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from sensor_msgs.msg import Image
|
||||
from std_msgs.msg import String
|
||||
|
||||
from ._camera_state import CameraState
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=3,
|
||||
)
|
||||
|
||||
# Default cameras: name → (topic, csi_device_or_None)
|
||||
_DEFAULT_CAMERAS: Dict[str, tuple] = {
|
||||
'd435i_color': ('/camera/color/image_raw', None),
|
||||
'd435i_depth': ('/camera/depth/image_rect_raw', None),
|
||||
'csi_front': ('/camera/front/image_raw', '/dev/video0'),
|
||||
'csi_right': ('/camera/right/image_raw', '/dev/video2'),
|
||||
'csi_rear': ('/camera/rear/image_raw', '/dev/video4'),
|
||||
'csi_left': ('/camera/left/image_raw', '/dev/video6'),
|
||||
}
|
||||
|
||||
|
||||
class CameraHealthNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('camera_health_monitor')
|
||||
|
||||
self.declare_parameter('warn_timeout_s', 2.0)
|
||||
self.declare_parameter('error_timeout_s', 10.0)
|
||||
self.declare_parameter('publish_hz', 1.0)
|
||||
self.declare_parameter('fps_window_s', 5.0)
|
||||
|
||||
self._warn_thr = self.get_parameter('warn_timeout_s').value
|
||||
self._error_thr = self.get_parameter('error_timeout_s').value
|
||||
self._fps_win = self.get_parameter('fps_window_s').value
|
||||
|
||||
# Build camera states
|
||||
self._cameras: Dict[str, CameraState] = {
|
||||
name: CameraState(topic=topic, v4l2_dev=dev)
|
||||
for name, (topic, dev) in _DEFAULT_CAMERAS.items()
|
||||
}
|
||||
|
||||
# Subscribe to each topic
|
||||
for name, cam in self._cameras.items():
|
||||
self.create_subscription(
|
||||
Image, cam.topic,
|
||||
self._make_cb(name),
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
|
||||
self._pub = self.create_publisher(String, '/saltybot/camera_health', 10)
|
||||
self.create_timer(
|
||||
1.0 / self.get_parameter('publish_hz').value, self._tick
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f'camera_health_monitor ready — watching {len(self._cameras)} cameras '
|
||||
f'warn={self._warn_thr}s error={self._error_thr}s'
|
||||
)
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_cb(self, name: str):
|
||||
def _cb(msg: Image) -> None:
|
||||
self._cameras[name].on_frame()
|
||||
return _cb
|
||||
|
||||
# ── 1 Hz watchdog tick ────────────────────────────────────────────────────
|
||||
|
||||
def _tick(self) -> None:
|
||||
healthy = degraded = 0
|
||||
|
||||
for name, cam in self._cameras.items():
|
||||
status = cam.status(self._warn_thr, self._error_thr)
|
||||
|
||||
if status == 'ok':
|
||||
healthy += 1
|
||||
elif status in ('warn', 'error', 'never'):
|
||||
degraded += 1
|
||||
|
||||
age = cam.age_s
|
||||
age_str = f'{age:.1f}' if age != float('inf') else '∞'
|
||||
|
||||
if status == 'warn':
|
||||
self.get_logger().warning(
|
||||
f'[cam_health] {name} ({cam.topic}): '
|
||||
f'no frame for {age_str}s'
|
||||
)
|
||||
elif status in ('error', 'never'):
|
||||
self.get_logger().error(
|
||||
f'[cam_health] {name} ({cam.topic}): '
|
||||
f'no frame for {age_str}s — camera may be offline'
|
||||
)
|
||||
if status == 'error' and cam.should_reset():
|
||||
self._v4l2_reset(name, cam)
|
||||
|
||||
payload = self._build_json(healthy, degraded)
|
||||
msg = String()
|
||||
msg.data = payload
|
||||
self._pub.publish(msg)
|
||||
|
||||
# ── v4l2 reset (CSI cameras only) ─────────────────────────────────────────
|
||||
|
||||
def _v4l2_reset(self, name: str, cam: CameraState) -> None:
|
||||
dev = cam.v4l2_dev
|
||||
self.get_logger().warning(
|
||||
f'[cam_health] attempting v4l2 reset for {name} on {dev}'
|
||||
)
|
||||
cam.mark_reset()
|
||||
try:
|
||||
# Toggle streaming off then on to nudge the driver
|
||||
subprocess.run(
|
||||
['v4l2-ctl', '-d', dev, '--stream-off'],
|
||||
capture_output=True, timeout=2.0
|
||||
)
|
||||
subprocess.run(
|
||||
['v4l2-ctl', '-d', dev, '--stream-on'],
|
||||
capture_output=True, timeout=2.0
|
||||
)
|
||||
self.get_logger().info(
|
||||
f'[cam_health] v4l2 reset sent to {dev} for {name}'
|
||||
)
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError) as exc:
|
||||
self.get_logger().warning(
|
||||
f'[cam_health] v4l2 reset failed for {name}: {exc}'
|
||||
)
|
||||
|
||||
# ── JSON builder ──────────────────────────────────────────────────────────
|
||||
|
||||
def _build_json(self, healthy: int, degraded: int) -> str:
|
||||
cameras_out = {}
|
||||
for name, cam in self._cameras.items():
|
||||
age = cam.age_s
|
||||
cameras_out[name] = {
|
||||
'topic': cam.topic,
|
||||
'status': cam.status(self._warn_thr, self._error_thr),
|
||||
'age_s': round(age, 2) if age != float('inf') else None,
|
||||
'fps': round(cam.fps(self._fps_win), 1),
|
||||
'total_frames': cam.total_frames,
|
||||
}
|
||||
|
||||
return json.dumps({
|
||||
'ts': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
|
||||
'cameras': cameras_out,
|
||||
'healthy': healthy,
|
||||
'degraded': degraded,
|
||||
'total': len(self._cameras),
|
||||
})
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = CameraHealthNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -29,6 +29,7 @@ setup(
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
|
||||
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
108
jetson/ros2_ws/src/saltybot_bringup/test/test_camera_health.py
Normal file
108
jetson/ros2_ws/src/saltybot_bringup/test/test_camera_health.py
Normal file
@ -0,0 +1,108 @@
|
||||
"""
|
||||
test_camera_health.py — Unit tests for CameraState (no ROS2 required).
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_bringup._camera_state import CameraState
|
||||
|
||||
|
||||
class TestCameraState:
|
||||
|
||||
def test_initial_status_never(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
assert cs.status(2.0, 10.0) == 'never'
|
||||
|
||||
def test_age_inf_before_first_frame(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
assert cs.age_s == float('inf')
|
||||
|
||||
def test_on_frame_sets_age_near_zero(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
cs.on_frame()
|
||||
assert cs.age_s < 0.1
|
||||
|
||||
def test_on_frame_increments_count(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
for _ in range(5):
|
||||
cs.on_frame()
|
||||
assert cs.total_frames == 5
|
||||
|
||||
def test_status_ok_after_fresh_frame(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
cs.on_frame()
|
||||
assert cs.status(2.0, 10.0) == 'ok'
|
||||
|
||||
def test_status_warn_after_2s(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
cs.on_frame()
|
||||
cs.last_frame_t -= 3.0 # fake 3 seconds ago
|
||||
assert cs.status(2.0, 10.0) == 'warn'
|
||||
|
||||
def test_status_error_after_10s(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
cs.on_frame()
|
||||
cs.last_frame_t -= 11.0
|
||||
assert cs.status(2.0, 10.0) == 'error'
|
||||
|
||||
def test_fps_zero_before_frames(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
assert cs.fps(5.0) == pytest.approx(0.0)
|
||||
|
||||
def test_fps_estimate(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
# Simulate 10 frames over 1 second
|
||||
base = time.monotonic()
|
||||
for i in range(11):
|
||||
cs._ts_window.append(base + i * 0.1)
|
||||
cs.last_frame_t = base + 1.0
|
||||
assert cs.fps(5.0) == pytest.approx(10.0, abs=0.5)
|
||||
|
||||
def test_should_reset_false_for_d435i(self):
|
||||
cs = CameraState(topic='/camera/color/image_raw', v4l2_dev=None)
|
||||
assert not cs.should_reset()
|
||||
|
||||
def test_should_reset_true_for_csi_first_time(self):
|
||||
cs = CameraState(topic='/camera/front/image_raw', v4l2_dev='/dev/video0')
|
||||
assert cs.should_reset(cooldown_s=30.0)
|
||||
|
||||
def test_should_reset_false_within_cooldown(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
|
||||
cs.mark_reset()
|
||||
assert not cs.should_reset(cooldown_s=30.0)
|
||||
|
||||
def test_should_reset_true_after_cooldown(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev='/dev/video0')
|
||||
cs.mark_reset()
|
||||
cs._last_reset_t -= 31.0 # fake cooldown elapsed
|
||||
assert cs.should_reset(cooldown_s=30.0)
|
||||
|
||||
def test_custom_thresholds(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
cs.on_frame()
|
||||
cs.last_frame_t -= 1.5
|
||||
assert cs.status(1.0, 5.0) == 'warn'
|
||||
assert cs.status(2.0, 5.0) == 'ok'
|
||||
|
||||
def test_fps_ignores_old_frames(self):
|
||||
cs = CameraState(topic='/test', v4l2_dev=None)
|
||||
old = time.monotonic() - 60.0
|
||||
for _ in range(50):
|
||||
cs._ts_window.append(old)
|
||||
# Recent frames
|
||||
now = time.monotonic()
|
||||
for i in range(6):
|
||||
cs._ts_window.append(now + i * 0.1)
|
||||
cs.last_frame_t = now + 0.5
|
||||
assert cs.fps(5.0) == pytest.approx(10.0, abs=1.0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
Loading…
x
Reference in New Issue
Block a user