Merge pull request 'feat(perception): QR code reader on CSI surround frames (Issue #233)' (#237) from sl-perception/issue-233-qr-reader into main
This commit is contained in:
commit
80a049fb0e
@ -0,0 +1,121 @@
|
||||
"""
|
||||
_qr_detector.py — QR code detection helper using cv2.QRCodeDetector (no ROS2 deps).
|
||||
|
||||
Exposes two public functions:
|
||||
|
||||
detect_qr(bgr) → list of RawQR namedtuples
|
||||
deduplicate(results, cache, cooldown_s) → (new_results, updated_cache)
|
||||
|
||||
RawQR
|
||||
-----
|
||||
data : str — decoded text
|
||||
corners : list[float] — [x0,y0, x1,y1, x2,y2, x3,y3] (pixel coords)
|
||||
cx, cy : float — centroid (pixels)
|
||||
|
||||
Deduplication
|
||||
-------------
|
||||
The cache maps decoded data → last_seen_monotonic timestamp. A result is
|
||||
considered a duplicate if the same data was published within cooldown_s seconds.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, NamedTuple, Tuple
|
||||
import time
|
||||
|
||||
|
||||
class RawQR(NamedTuple):
|
||||
data: str
|
||||
corners: List[float] # [x0,y0, x1,y1, x2,y2, x3,y3]
|
||||
cx: float
|
||||
cy: float
|
||||
|
||||
|
||||
def detect_qr(bgr) -> List[RawQR]:
|
||||
"""
|
||||
Run cv2.QRCodeDetector on a BGR image.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bgr : np.ndarray (H, W, 3) uint8
|
||||
|
||||
Returns
|
||||
-------
|
||||
List of RawQR, one per decoded QR code found. Empty list if none found.
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
detector = _get_detector()
|
||||
# detectAndDecodeMulti returns (retval, decoded_info, points, straight_qrcode)
|
||||
# points shape: (N, 4, 2) — 4 corners per QR code
|
||||
try:
|
||||
ok, decoded_list, points, _ = detector.detectAndDecodeMulti(bgr)
|
||||
except cv2.error:
|
||||
return []
|
||||
|
||||
if not ok or points is None:
|
||||
return []
|
||||
|
||||
results: List[RawQR] = []
|
||||
for data, quad in zip(decoded_list, points):
|
||||
if not data: # empty string = decode failure
|
||||
continue
|
||||
flat = quad.reshape(-1).tolist() # [x0,y0, x1,y1, x2,y2, x3,y3]
|
||||
cx = float(np.mean(quad[:, 0]))
|
||||
cy = float(np.mean(quad[:, 1]))
|
||||
results.append(RawQR(data=data, corners=flat, cx=cx, cy=cy))
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def deduplicate(
|
||||
results: List[RawQR],
|
||||
cache: Dict[str, float],
|
||||
cooldown_s: float,
|
||||
now: float | None = None,
|
||||
) -> Tuple[List[RawQR], Dict[str, float]]:
|
||||
"""
|
||||
Filter results to those whose data has not been seen within cooldown_s.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
results : raw detections from detect_qr()
|
||||
cache : {data: last_published_monotonic_time} — mutated in-place
|
||||
cooldown_s : minimum seconds between re-publishing the same data
|
||||
now : current time.monotonic() — defaults to time.monotonic()
|
||||
|
||||
Returns
|
||||
-------
|
||||
(fresh_results, cache) — fresh_results contains only newly-seen QRs
|
||||
"""
|
||||
if now is None:
|
||||
now = time.monotonic()
|
||||
|
||||
fresh: List[RawQR] = []
|
||||
for r in results:
|
||||
last = cache.get(r.data, -float('inf'))
|
||||
if now - last >= cooldown_s:
|
||||
fresh.append(r)
|
||||
cache[r.data] = now
|
||||
|
||||
# Evict entries older than 10× cooldown to prevent unbounded growth
|
||||
evict_before = now - 10 * cooldown_s
|
||||
stale_keys = [k for k, t in cache.items() if t < evict_before]
|
||||
for k in stale_keys:
|
||||
del cache[k]
|
||||
|
||||
return fresh, cache
|
||||
|
||||
|
||||
# ── Module-level singleton detector (avoids re-creating on every call) ────────
|
||||
|
||||
_detector = None
|
||||
|
||||
|
||||
def _get_detector():
|
||||
global _detector
|
||||
if _detector is None:
|
||||
import cv2
|
||||
_detector = cv2.QRCodeDetector()
|
||||
return _detector
|
||||
@ -0,0 +1,156 @@
|
||||
"""
|
||||
qr_reader_node.py — QR code reader on CSI surround camera frames (Issue #233).
|
||||
|
||||
Subscribes to all four IMX219 CSI camera streams, runs cv2.QRCodeDetector
|
||||
on each frame, deduplicates detections with a per-data cooldown, and
|
||||
publishes at up to 10 Hz.
|
||||
|
||||
Subscribes (BEST_EFFORT):
|
||||
/camera/front/image_raw sensor_msgs/Image
|
||||
/camera/left/image_raw sensor_msgs/Image
|
||||
/camera/rear/image_raw sensor_msgs/Image
|
||||
/camera/right/image_raw sensor_msgs/Image
|
||||
|
||||
Publishes:
|
||||
/saltybot/qr_codes saltybot_scene_msgs/QRDetectionArray (≤10 Hz)
|
||||
|
||||
Algorithm
|
||||
---------
|
||||
1. Independent callbacks for each camera decode frames with QRCodeDetector.
|
||||
2. Any newly decoded QR payload is checked against the dedup cache:
|
||||
• If the same payload was published within `dedup_cooldown_s`, skip.
|
||||
3. A 10 Hz timer assembles all pending (deduplicated) detections and publishes
|
||||
them in a single QRDetectionArray message, then clears the pending queue.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dedup_cooldown_s float 2.0 Minimum seconds between re-publishing the same QR data
|
||||
publish_hz float 10.0 Publication rate (Hz)
|
||||
cameras str "front,left,rear,right" Comma-separated camera names to subscribe
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from threading import Lock
|
||||
from typing import Dict, List
|
||||
|
||||
import numpy as np
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
from sensor_msgs.msg import Image
|
||||
from saltybot_scene_msgs.msg import QRDetection, QRDetectionArray
|
||||
|
||||
from ._qr_detector import detect_qr, deduplicate
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=2,
|
||||
)
|
||||
|
||||
|
||||
class QRReaderNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('qr_reader_node')
|
||||
|
||||
self.declare_parameter('dedup_cooldown_s', 2.0)
|
||||
self.declare_parameter('publish_hz', 10.0)
|
||||
self.declare_parameter('cameras', 'front,left,rear,right')
|
||||
|
||||
self._cooldown = self.get_parameter('dedup_cooldown_s').value
|
||||
publish_hz = self.get_parameter('publish_hz').value
|
||||
cameras_param = self.get_parameter('cameras').value
|
||||
|
||||
self._bridge = CvBridge()
|
||||
|
||||
# Dedup cache: {data: last_published_monotonic}
|
||||
self._dedup_cache: Dict[str, float] = {}
|
||||
|
||||
# Pending queue (deduplicated results waiting for the next tick)
|
||||
self._pending: List[QRDetection] = []
|
||||
self._lock = Lock()
|
||||
|
||||
# Subscribe to each requested camera
|
||||
camera_names = [c.strip() for c in cameras_param.split(',') if c.strip()]
|
||||
for cam in camera_names:
|
||||
topic = f'/camera/{cam}/image_raw'
|
||||
self.create_subscription(
|
||||
Image, topic,
|
||||
lambda msg, c=cam: self._on_image(msg, c),
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
self.get_logger().info(f'Subscribed to {topic}')
|
||||
|
||||
# Publisher + tick timer
|
||||
self._pub = self.create_publisher(QRDetectionArray, '/saltybot/qr_codes', 10)
|
||||
self.create_timer(1.0 / publish_hz, self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f'qr_reader_node ready — cameras={camera_names} '
|
||||
f'cooldown={self._cooldown}s hz={publish_hz}'
|
||||
)
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_image(self, msg: Image, camera_name: str) -> None:
|
||||
try:
|
||||
bgr = self._bridge.imgmsg_to_cv2(msg, 'bgr8')
|
||||
except Exception as exc:
|
||||
self.get_logger().error(
|
||||
f'cv_bridge [{camera_name}]: {exc}', throttle_duration_sec=5.0)
|
||||
return
|
||||
|
||||
raw_results = detect_qr(bgr)
|
||||
if not raw_results:
|
||||
return
|
||||
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
fresh, self._dedup_cache = deduplicate(
|
||||
raw_results, self._dedup_cache, self._cooldown, now=now)
|
||||
|
||||
for r in fresh:
|
||||
det = QRDetection()
|
||||
det.header.stamp = msg.header.stamp
|
||||
det.header.frame_id = msg.header.frame_id
|
||||
det.data = r.data
|
||||
det.camera = camera_name
|
||||
det.corners = [float(v) for v in r.corners]
|
||||
det.center.x = r.cx
|
||||
det.center.y = r.cy
|
||||
det.center.z = 0.0
|
||||
self._pending.append(det)
|
||||
|
||||
# ── 10 Hz publish tick ────────────────────────────────────────────────────
|
||||
|
||||
def _tick(self) -> None:
|
||||
with self._lock:
|
||||
pending, self._pending = self._pending, []
|
||||
|
||||
msg = QRDetectionArray()
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
msg.header.frame_id = ''
|
||||
msg.detections = pending
|
||||
self._pub.publish(msg)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = QRReaderNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -27,6 +27,8 @@ setup(
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'person_detector = saltybot_perception.person_detector_node:main',
|
||||
# QR code reader on CSI frames (Issue #233)
|
||||
'qr_reader = saltybot_perception.qr_reader_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
165
jetson/ros2_ws/src/saltybot_perception/test/test_qr_detector.py
Normal file
165
jetson/ros2_ws/src/saltybot_perception/test/test_qr_detector.py
Normal file
@ -0,0 +1,165 @@
|
||||
"""
|
||||
test_qr_detector.py — Unit tests for _qr_detector helpers (no ROS2 required).
|
||||
|
||||
Covers:
|
||||
- detect_qr: real cv2.QRCodeDetector on a synthesised QR image
|
||||
- detect_qr: returns empty list for blank frame
|
||||
- detect_qr: cv2.error does not propagate
|
||||
- RawQR fields: corners length, centroid position
|
||||
- deduplicate: passes fresh results through
|
||||
- deduplicate: suppresses duplicates within cooldown
|
||||
- deduplicate: re-emits after cooldown expires
|
||||
- deduplicate: handles multiple different payloads
|
||||
- deduplicate: handles empty input
|
||||
- deduplicate: cache eviction (no unbounded growth)
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_perception._qr_detector import RawQR, detect_qr, deduplicate
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_qr_image(data: str, size: int = 200):
|
||||
"""Render a QR code into a BGR numpy array using cv2."""
|
||||
import cv2
|
||||
encoder = cv2.QRCodeEncoder.create()
|
||||
qr_img = encoder.encode(data) # grayscale
|
||||
# Scale up so detector is reliable
|
||||
qr_big = cv2.resize(qr_img, (size, size), interpolation=cv2.INTER_NEAREST)
|
||||
bgr = cv2.cvtColor(qr_big, cv2.COLOR_GRAY2BGR)
|
||||
return bgr
|
||||
|
||||
|
||||
def _blank_bgr(h=480, w=640):
|
||||
return np.zeros((h, w, 3), dtype=np.uint8)
|
||||
|
||||
|
||||
def _fake_raw(data='hello', cx=100.0, cy=100.0):
|
||||
return RawQR(data=data, corners=[10,10, 110,10, 110,110, 10,110], cx=cx, cy=cy)
|
||||
|
||||
|
||||
# ── detect_qr ─────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestDetectQR:
|
||||
|
||||
def test_detects_real_qr_code(self):
|
||||
img = _make_qr_image('https://example.com')
|
||||
results = detect_qr(img)
|
||||
assert len(results) == 1
|
||||
assert results[0].data == 'https://example.com'
|
||||
|
||||
def test_blank_frame_returns_empty(self):
|
||||
results = detect_qr(_blank_bgr())
|
||||
assert results == []
|
||||
|
||||
def test_corners_has_eight_values(self):
|
||||
img = _make_qr_image('ABC123')
|
||||
results = detect_qr(img)
|
||||
assert len(results) == 1
|
||||
assert len(results[0].corners) == 8
|
||||
|
||||
def test_centroid_inside_image(self):
|
||||
img = _make_qr_image('test')
|
||||
h, w = img.shape[:2]
|
||||
results = detect_qr(img)
|
||||
assert len(results) == 1
|
||||
r = results[0]
|
||||
assert 0.0 <= r.cx <= w
|
||||
assert 0.0 <= r.cy <= h
|
||||
|
||||
def test_data_matches_encoded_string(self):
|
||||
payload = 'robot:zone-A:dock-3'
|
||||
img = _make_qr_image(payload)
|
||||
results = detect_qr(img)
|
||||
assert any(r.data == payload for r in results)
|
||||
|
||||
def test_returns_list_type(self):
|
||||
results = detect_qr(_blank_bgr())
|
||||
assert isinstance(results, list)
|
||||
|
||||
|
||||
# ── deduplicate ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestDeduplicate:
|
||||
|
||||
def test_fresh_result_passes_through(self):
|
||||
raw = [_fake_raw('hello')]
|
||||
cache = {}
|
||||
fresh, cache = deduplicate(raw, cache, cooldown_s=2.0, now=0.0)
|
||||
assert len(fresh) == 1
|
||||
assert fresh[0].data == 'hello'
|
||||
|
||||
def test_duplicate_within_cooldown_suppressed(self):
|
||||
raw = [_fake_raw('hello')]
|
||||
cache = {'hello': 0.0} # last seen at t=0
|
||||
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=1.0) # t=1 < 2s
|
||||
assert fresh == []
|
||||
|
||||
def test_duplicate_at_exact_cooldown_passes(self):
|
||||
raw = [_fake_raw('hello')]
|
||||
cache = {'hello': 0.0}
|
||||
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=2.0) # exactly at boundary
|
||||
assert len(fresh) == 1
|
||||
|
||||
def test_re_emits_after_cooldown_expires(self):
|
||||
raw = [_fake_raw('hello')]
|
||||
cache = {'hello': 0.0}
|
||||
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=3.0) # past cooldown
|
||||
assert len(fresh) == 1
|
||||
|
||||
def test_multiple_different_payloads_all_pass(self):
|
||||
raw = [_fake_raw('A'), _fake_raw('B'), _fake_raw('C')]
|
||||
cache = {}
|
||||
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=0.0)
|
||||
assert len(fresh) == 3
|
||||
|
||||
def test_mixed_fresh_and_stale(self):
|
||||
raw = [_fake_raw('seen'), _fake_raw('new')]
|
||||
cache = {'seen': 0.0}
|
||||
fresh, _ = deduplicate(raw, cache, cooldown_s=2.0, now=1.0)
|
||||
assert len(fresh) == 1
|
||||
assert fresh[0].data == 'new'
|
||||
|
||||
def test_empty_input_returns_empty(self):
|
||||
fresh, cache = deduplicate([], {}, cooldown_s=2.0, now=0.0)
|
||||
assert fresh == []
|
||||
assert cache == {}
|
||||
|
||||
def test_cache_updated_after_pass_through(self):
|
||||
raw = [_fake_raw('item')]
|
||||
cache = {}
|
||||
_, cache = deduplicate(raw, cache, cooldown_s=2.0, now=42.0)
|
||||
assert cache['item'] == pytest.approx(42.0)
|
||||
|
||||
def test_cache_eviction_removes_old_entries(self):
|
||||
# Entry last seen at t=0 should be evicted when now > 10×cooldown
|
||||
cache = {'old': 0.0, 'recent': 50.0}
|
||||
cooldown = 5.0
|
||||
now = 0.0 + 10 * cooldown + 1 # = 51
|
||||
# Run with empty results (just to trigger eviction)
|
||||
_, cache = deduplicate([], cache, cooldown_s=cooldown, now=now)
|
||||
assert 'old' not in cache # evicted
|
||||
assert 'recent' in cache # kept (age = 1s < eviction threshold)
|
||||
|
||||
def test_defaults_now_to_current_time(self):
|
||||
"""Calling without `now` should not crash."""
|
||||
raw = [_fake_raw('x')]
|
||||
cache = {}
|
||||
before = time.monotonic()
|
||||
fresh, cache = deduplicate(raw, cache, cooldown_s=2.0)
|
||||
after = time.monotonic()
|
||||
assert len(fresh) == 1
|
||||
assert before <= cache['x'] <= after + 0.1
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
@ -13,6 +13,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/SceneObjectArray.msg"
|
||||
"msg/RoomClassification.msg"
|
||||
"msg/BehaviorHint.msg"
|
||||
# Issue #233 — QR code reader
|
||||
"msg/QRDetection.msg"
|
||||
"msg/QRDetectionArray.msg"
|
||||
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
||||
)
|
||||
|
||||
|
||||
12
jetson/ros2_ws/src/saltybot_scene_msgs/msg/QRDetection.msg
Normal file
12
jetson/ros2_ws/src/saltybot_scene_msgs/msg/QRDetection.msg
Normal file
@ -0,0 +1,12 @@
|
||||
# QRDetection.msg — single QR code detection result (Issue #233)
|
||||
#
|
||||
# data: decoded text content of the QR code
|
||||
# camera: camera name that detected it (e.g. "front", "left", "rear", "right")
|
||||
# corners: pixel corners [x0,y0, x1,y1, x2,y2, x3,y3] (CW from top-left, image frame)
|
||||
# center: centroid in image pixels
|
||||
#
|
||||
std_msgs/Header header
|
||||
string data # decoded QR payload
|
||||
string camera # source camera name
|
||||
float32[8] corners # corner pixel coords [x0,y0, x1,y1, x2,y2, x3,y3]
|
||||
geometry_msgs/Point center # QR centre in image pixels (z=0)
|
||||
@ -0,0 +1,3 @@
|
||||
# QRDetectionArray.msg — all QR codes detected in one frame (Issue #233)
|
||||
std_msgs/Header header
|
||||
QRDetection[] detections
|
||||
Loading…
x
Reference in New Issue
Block a user