feat(perception): QR code reader on CSI surround frames (Issue #233)

Adds cv2.QRCodeDetector-based QR reader that subscribes to all four IMX219
CSI camera streams, deduplicates detections with a 2 s per-payload cooldown,
and publishes /saltybot/qr_codes (QRDetectionArray) at 10 Hz.  New
QRDetection / QRDetectionArray messages added to saltybot_scene_msgs.
16/16 pure-Python tests pass (no ROS2 required).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-02 12:12:57 -05:00
parent e477284c2f
commit b2fdc3a500
7 changed files with 462 additions and 0 deletions

View File

@ -0,0 +1,121 @@
"""
_qr_detector.py QR code detection helper using cv2.QRCodeDetector (no ROS2 deps).
Exposes two public functions:
detect_qr(bgr) list of RawQR namedtuples
deduplicate(results, cache, cooldown_s) (new_results, updated_cache)
RawQR
-----
data : str decoded text
corners : list[float] [x0,y0, x1,y1, x2,y2, x3,y3] (pixel coords)
cx, cy : float centroid (pixels)
Deduplication
-------------
The cache maps decoded data last_seen_monotonic timestamp. A result is
considered a duplicate if the same data was published within cooldown_s seconds.
"""
from __future__ import annotations
from typing import Dict, List, NamedTuple, Tuple
import time
class RawQR(NamedTuple):
data: str
corners: List[float] # [x0,y0, x1,y1, x2,y2, x3,y3]
cx: float
cy: float
def detect_qr(bgr) -> List[RawQR]:
"""
Run cv2.QRCodeDetector on a BGR image.
Parameters
----------
bgr : np.ndarray (H, W, 3) uint8
Returns
-------
List of RawQR, one per decoded QR code found. Empty list if none found.
"""
import cv2
import numpy as np
detector = _get_detector()
# detectAndDecodeMulti returns (retval, decoded_info, points, straight_qrcode)
# points shape: (N, 4, 2) — 4 corners per QR code
try:
ok, decoded_list, points, _ = detector.detectAndDecodeMulti(bgr)
except cv2.error:
return []
if not ok or points is None:
return []
results: List[RawQR] = []
for data, quad in zip(decoded_list, points):
if not data: # empty string = decode failure
continue
flat = quad.reshape(-1).tolist() # [x0,y0, x1,y1, x2,y2, x3,y3]
cx = float(np.mean(quad[:, 0]))
cy = float(np.mean(quad[:, 1]))
results.append(RawQR(data=data, corners=flat, cx=cx, cy=cy))
return results
def deduplicate(
results: List[RawQR],
cache: Dict[str, float],
cooldown_s: float,
now: float | None = None,
) -> Tuple[List[RawQR], Dict[str, float]]:
"""
Filter results to those whose data has not been seen within cooldown_s.
Parameters
----------
results : raw detections from detect_qr()
cache : {data: last_published_monotonic_time} mutated in-place
cooldown_s : minimum seconds between re-publishing the same data
now : current time.monotonic() defaults to time.monotonic()
Returns
-------
(fresh_results, cache) fresh_results contains only newly-seen QRs
"""
if now is None:
now = time.monotonic()
fresh: List[RawQR] = []
for r in results:
last = cache.get(r.data, -float('inf'))
if now - last >= cooldown_s:
fresh.append(r)
cache[r.data] = now
# Evict entries older than 10× cooldown to prevent unbounded growth
evict_before = now - 10 * cooldown_s
stale_keys = [k for k, t in cache.items() if t < evict_before]
for k in stale_keys:
del cache[k]
return fresh, cache
# ── Module-level singleton detector (avoids re-creating on every call) ────────
_detector = None
def _get_detector():
global _detector
if _detector is None:
import cv2
_detector = cv2.QRCodeDetector()
return _detector

View File

@ -0,0 +1,156 @@
"""
qr_reader_node.py QR code reader on CSI surround camera frames (Issue #233).
Subscribes to all four IMX219 CSI camera streams, runs cv2.QRCodeDetector
on each frame, deduplicates detections with a per-data cooldown, and
publishes at up to 10 Hz.
Subscribes (BEST_EFFORT):
/camera/front/image_raw sensor_msgs/Image
/camera/left/image_raw sensor_msgs/Image
/camera/rear/image_raw sensor_msgs/Image
/camera/right/image_raw sensor_msgs/Image
Publishes:
/saltybot/qr_codes saltybot_scene_msgs/QRDetectionArray (10 Hz)
Algorithm
---------
1. Independent callbacks for each camera decode frames with QRCodeDetector.
2. Any newly decoded QR payload is checked against the dedup cache:
If the same payload was published within `dedup_cooldown_s`, skip.
3. A 10 Hz timer assembles all pending (deduplicated) detections and publishes
them in a single QRDetectionArray message, then clears the pending queue.
Parameters
----------
dedup_cooldown_s float 2.0 Minimum seconds between re-publishing the same QR data
publish_hz float 10.0 Publication rate (Hz)
cameras str "front,left,rear,right" Comma-separated camera names to subscribe
"""
from __future__ import annotations
import time
from threading import Lock
from typing import Dict, List
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from saltybot_scene_msgs.msg import QRDetection, QRDetectionArray
from ._qr_detector import detect_qr, deduplicate
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=2,
)
class QRReaderNode(Node):
def __init__(self) -> None:
super().__init__('qr_reader_node')
self.declare_parameter('dedup_cooldown_s', 2.0)
self.declare_parameter('publish_hz', 10.0)
self.declare_parameter('cameras', 'front,left,rear,right')
self._cooldown = self.get_parameter('dedup_cooldown_s').value
publish_hz = self.get_parameter('publish_hz').value
cameras_param = self.get_parameter('cameras').value
self._bridge = CvBridge()
# Dedup cache: {data: last_published_monotonic}
self._dedup_cache: Dict[str, float] = {}
# Pending queue (deduplicated results waiting for the next tick)
self._pending: List[QRDetection] = []
self._lock = Lock()
# Subscribe to each requested camera
camera_names = [c.strip() for c in cameras_param.split(',') if c.strip()]
for cam in camera_names:
topic = f'/camera/{cam}/image_raw'
self.create_subscription(
Image, topic,
lambda msg, c=cam: self._on_image(msg, c),
_SENSOR_QOS,
)
self.get_logger().info(f'Subscribed to {topic}')
# Publisher + tick timer
self._pub = self.create_publisher(QRDetectionArray, '/saltybot/qr_codes', 10)
self.create_timer(1.0 / publish_hz, self._tick)
self.get_logger().info(
f'qr_reader_node ready — cameras={camera_names} '
f'cooldown={self._cooldown}s hz={publish_hz}'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _on_image(self, msg: Image, camera_name: str) -> None:
try:
bgr = self._bridge.imgmsg_to_cv2(msg, 'bgr8')
except Exception as exc:
self.get_logger().error(
f'cv_bridge [{camera_name}]: {exc}', throttle_duration_sec=5.0)
return
raw_results = detect_qr(bgr)
if not raw_results:
return
now = time.monotonic()
with self._lock:
fresh, self._dedup_cache = deduplicate(
raw_results, self._dedup_cache, self._cooldown, now=now)
for r in fresh:
det = QRDetection()
det.header.stamp = msg.header.stamp
det.header.frame_id = msg.header.frame_id
det.data = r.data
det.camera = camera_name
det.corners = [float(v) for v in r.corners]
det.center.x = r.cx
det.center.y = r.cy
det.center.z = 0.0
self._pending.append(det)
# ── 10 Hz publish tick ────────────────────────────────────────────────────
def _tick(self) -> None:
with self._lock:
pending, self._pending = self._pending, []
msg = QRDetectionArray()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = ''
msg.detections = pending
self._pub.publish(msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = QRReaderNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -27,6 +27,8 @@ setup(
entry_points={
'console_scripts': [
'person_detector = saltybot_perception.person_detector_node:main',
# QR code reader on CSI frames (Issue #233)
'qr_reader = saltybot_perception.qr_reader_node:main',
],
},
)

View File

@ -0,0 +1,165 @@
"""
test_qr_detector.py Unit tests for _qr_detector helpers (no ROS2 required).
Covers:
- detect_qr: real cv2.QRCodeDetector on a synthesised QR image
- detect_qr: returns empty list for blank frame
- detect_qr: cv2.error does not propagate
- RawQR fields: corners length, centroid position
- deduplicate: passes fresh results through
- deduplicate: suppresses duplicates within cooldown
- deduplicate: re-emits after cooldown expires
- deduplicate: handles multiple different payloads
- deduplicate: handles empty input
- deduplicate: cache eviction (no unbounded growth)
"""
import sys
import os
import time
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_perception._qr_detector import RawQR, detect_qr, deduplicate
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_qr_image(data: str, size: int = 200):
"""Render a QR code into a BGR numpy array using cv2."""
import cv2
encoder = cv2.QRCodeEncoder.create()
qr_img = encoder.encode(data) # grayscale
# Scale up so detector is reliable
qr_big = cv2.resize(qr_img, (size, size), interpolation=cv2.INTER_NEAREST)
bgr = cv2.cvtColor(qr_big, cv2.COLOR_GRAY2BGR)
return bgr
def _blank_bgr(h=480, w=640):
return np.zeros((h, w, 3), dtype=np.uint8)
def _fake_raw(data='hello', cx=100.0, cy=100.0):
return RawQR(data=data, corners=[10,10, 110,10, 110,110, 10,110], cx=cx, cy=cy)
# ── detect_qr ─────────────────────────────────────────────────────────────────
class TestDetectQR:
def test_detects_real_qr_code(self):
img = _make_qr_image('https://example.com')
results = detect_qr(img)
assert len(results) == 1
assert results[0].data == 'https://example.com'
def test_blank_frame_returns_empty(self):
results = detect_qr(_blank_bgr())
assert results == []
def test_corners_has_eight_values(self):
img = _make_qr_image('ABC123')
results = detect_qr(img)
assert len(results) == 1
assert len(results[0].corners) == 8
def test_centroid_inside_image(self):
img = _make_qr_image('test')
h, w = img.shape[:2]
results = detect_qr(img)
assert len(results) == 1
r = results[0]
assert 0.0 <= r.cx <= w
assert 0.0 <= r.cy <= h
def test_data_matches_encoded_string(self):
payload = 'robot:zone-A:dock-3'
img = _make_qr_image(payload)
results = detect_qr(img)
assert any(r.data == payload for r in results)
def test_returns_list_type(self):
results = detect_qr(_blank_bgr())
assert isinstance(results, list)
# ── deduplicate ───────────────────────────────────────────────────────────────
class TestDeduplicate:
def test_fresh_result_passes_through(self):
raw = [_fake_raw('hello')]
cache = {}
fresh, cache = deduplicate(raw, cache, cooldown_s=2.0, now=0.0)
assert len(fresh) == 1
assert fresh[0].data == 'hello'
def test_duplicate_within_cooldown_suppressed(self):
raw = [_fake_raw('hello')]
cache = {'hello': 0.0} # last seen at t=0
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=1.0) # t=1 < 2s
assert fresh == []
def test_duplicate_at_exact_cooldown_passes(self):
raw = [_fake_raw('hello')]
cache = {'hello': 0.0}
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=2.0) # exactly at boundary
assert len(fresh) == 1
def test_re_emits_after_cooldown_expires(self):
raw = [_fake_raw('hello')]
cache = {'hello': 0.0}
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=3.0) # past cooldown
assert len(fresh) == 1
def test_multiple_different_payloads_all_pass(self):
raw = [_fake_raw('A'), _fake_raw('B'), _fake_raw('C')]
cache = {}
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=0.0)
assert len(fresh) == 3
def test_mixed_fresh_and_stale(self):
raw = [_fake_raw('seen'), _fake_raw('new')]
cache = {'seen': 0.0}
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=1.0)
assert len(fresh) == 1
assert fresh[0].data == 'new'
def test_empty_input_returns_empty(self):
fresh, cache = deduplicate([], {}, cooldown_s=2.0, now=0.0)
assert fresh == []
assert cache == {}
def test_cache_updated_after_pass_through(self):
raw = [_fake_raw('item')]
cache = {}
_, cache = deduplicate(raw, cache, cooldown_s=2.0, now=42.0)
assert cache['item'] == pytest.approx(42.0)
def test_cache_eviction_removes_old_entries(self):
# Entry last seen at t=0 should be evicted when now > 10×cooldown
cache = {'old': 0.0, 'recent': 50.0}
cooldown = 5.0
now = 0.0 + 10 * cooldown + 1 # = 51
# Run with empty results (just to trigger eviction)
_, cache = deduplicate([], cache, cooldown_s=cooldown, now=now)
assert 'old' not in cache # evicted
assert 'recent' in cache # kept (age = 1s < eviction threshold)
def test_defaults_now_to_current_time(self):
"""Calling without `now` should not crash."""
raw = [_fake_raw('x')]
cache = {}
before = time.monotonic()
fresh, cache = deduplicate(raw, cache, cooldown_s=2.0)
after = time.monotonic()
assert len(fresh) == 1
assert before <= cache['x'] <= after + 0.1
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@ -13,6 +13,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SceneObjectArray.msg"
"msg/RoomClassification.msg"
"msg/BehaviorHint.msg"
# Issue #233 QR code reader
"msg/QRDetection.msg"
"msg/QRDetectionArray.msg"
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
)

View File

@ -0,0 +1,12 @@
# QRDetection.msg — single QR code detection result (Issue #233)
#
# data: decoded text content of the QR code
# camera: camera name that detected it (e.g. "front", "left", "rear", "right")
# corners: pixel corners [x0,y0, x1,y1, x2,y2, x3,y3] (CW from top-left, image frame)
# center: centroid in image pixels
#
std_msgs/Header header
string data # decoded QR payload
string camera # source camera name
float32[8] corners # corner pixel coords [x0,y0, x1,y1, x2,y2, x3,y3]
geometry_msgs/Point center # QR centre in image pixels (z=0)

View File

@ -0,0 +1,3 @@
# QRDetectionArray.msg — all QR codes detected in one frame (Issue #233)
std_msgs/Header header
QRDetection[] detections