feat(perception): appearance-based person re-identification (Issue #322)
Adds PersonTrack/PersonTrackArray msgs and a PersonReidNode that matches
individuals across camera views using HSV colour histogram appearance
features and cosine similarity, with EMA gallery update and 30s stale timeout.
New messages (saltybot_scene_msgs):
msg/PersonTrack.msg — track_id, camera_id, bbox, confidence,
first_seen, last_seen, is_stale
msg/PersonTrackArray.msg — array wrapper with header
New files (saltybot_bringup):
saltybot_bringup/_person_reid.py — pure kinematics (no ROS2 deps)
extract_hsv_histogram() 2-D HS histogram (H=16, S=8 → 128-dim, L2-norm)
cosine_similarity() handles zero/non-unit vectors
match_track() best gallery match above threshold (strict >)
TrackGallery add/update/match/mark_stale/prune_stale
TrackEntry mutable dataclass; EMA feature blend (α=0.3)
saltybot_bringup/person_reid_node.py
Subscribes /camera/color/image_raw + /saltybot/scene/objects (BEST_EFFORT)
Crops COCO person (class_id=0) ROIs; extracts features; matches gallery
Publishes PersonTrackArray on /saltybot/person_tracks at 5 Hz
Parameters: camera_id, similarity_threshold=0.75, stale_timeout_s=30,
max_tracks=20, publish_hz=5.0
test/test_person_reid.py — 50 tests, all passing
Modified:
saltybot_scene_msgs/CMakeLists.txt — register PersonTrack/Array msgs
saltybot_bringup/setup.py — add person_reid console_script
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ffc69a05c0
commit
071e577227
@ -0,0 +1,259 @@
|
||||
"""
|
||||
_person_reid.py — Appearance-based person re-identification (no ROS2 deps).
|
||||
|
||||
Algorithm
|
||||
---------
|
||||
For each person detection (bounding-box crop from a camera image):
|
||||
|
||||
1. Extract a 2-D HS colour histogram:
|
||||
H axis: 16 bins over [0, 180] (OpenCV hue)
|
||||
S axis: 8 bins over [0, 256] (saturation)
|
||||
The Value channel is omitted for illumination robustness.
|
||||
Result: 128-dimensional float32 vector, L2-normalised.
|
||||
|
||||
2. Match against the gallery of active TrackEntry objects using cosine
|
||||
similarity (= dot product of L2-normalised vectors). The best match
|
||||
whose similarity exceeds `threshold` is selected; otherwise a new track
|
||||
is created.
|
||||
|
||||
3. On a successful match the gallery entry is updated with an exponential
|
||||
moving average (α = 0.3) of the new feature to adapt gradually to
|
||||
appearance changes (lighting, pose) while staying robust to outliers.
|
||||
|
||||
4. Tracks not updated for `timeout_s` wall-clock seconds are flagged
|
||||
`is_stale`; stale tracks are pruned from the gallery on the next cycle.
|
||||
|
||||
Public API
|
||||
----------
|
||||
H_BINS, S_BINS, FEAT_DIM histogram shape constants
|
||||
TrackEntry mutable dataclass for one active track
|
||||
extract_hsv_histogram(bgr_crop) -> np.ndarray (FEAT_DIM,) L2-normalised
|
||||
cosine_similarity(a, b) -> float in [-1, 1]
|
||||
match_track(feature, tracks, threshold)
|
||||
-> (track_id, similarity) | (None, 0.0)
|
||||
TrackGallery mutable store of active tracks
|
||||
.add(camera_id, feature, bbox, now) -> int (new track_id)
|
||||
.update(track_id, camera_id, feature, bbox, confidence, now)
|
||||
.match(feature, threshold) -> (track_id, similarity) | (None, 0.0)
|
||||
.mark_stale(now, timeout_s)
|
||||
.prune_stale()
|
||||
.tracks -> Dict[int, TrackEntry]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
# ── Histogram constants ───────────────────────────────────────────────────────
|
||||
|
||||
H_BINS: int = 16
|
||||
S_BINS: int = 8
|
||||
FEAT_DIM: int = H_BINS * S_BINS # 128
|
||||
|
||||
_EMA_ALPHA: float = 0.3 # weight of newest observation in EMA update
|
||||
|
||||
|
||||
# ── Track entry ───────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class TrackEntry:
|
||||
"""Mutable state for one active person track."""
|
||||
|
||||
track_id: int
|
||||
camera_id: str
|
||||
feature: np.ndarray # (FEAT_DIM,) L2-normalised appearance
|
||||
confidence: float # last match similarity (1.0 for new)
|
||||
first_seen: float # time.time() wall-clock seconds
|
||||
last_seen: float # time.time() wall-clock seconds
|
||||
bbox: Tuple[float, float, float, float] = (0.0, 0.0, 0.0, 0.0)
|
||||
# (centre_x, centre_y, size_x, size_y) in image pixels
|
||||
is_stale: bool = False
|
||||
|
||||
|
||||
# ── Feature extraction ────────────────────────────────────────────────────────
|
||||
|
||||
def extract_hsv_histogram(bgr_crop: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Extract a 2-D HS colour histogram from a BGR image crop.
|
||||
|
||||
The Value channel is excluded for illumination robustness.
|
||||
The result is L2-normalised; a zero-area or empty crop returns a zero vector.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bgr_crop : uint8 BGR image, any size ≥ 1×1.
|
||||
|
||||
Returns
|
||||
-------
|
||||
np.ndarray, shape (FEAT_DIM,) = (128,), dtype float32, L2-normalised.
|
||||
"""
|
||||
if bgr_crop is None or bgr_crop.size == 0:
|
||||
return np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
|
||||
hsv = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2HSV)
|
||||
hist = cv2.calcHist(
|
||||
[hsv], [0, 1], None,
|
||||
[H_BINS, S_BINS],
|
||||
[0, 180, 0, 256],
|
||||
)
|
||||
flat = hist.flatten().astype(np.float32)
|
||||
norm = np.linalg.norm(flat)
|
||||
if norm > 0.0:
|
||||
flat /= norm
|
||||
return flat
|
||||
|
||||
|
||||
# ── Similarity ────────────────────────────────────────────────────────────────
|
||||
|
||||
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
||||
"""
|
||||
Cosine similarity between two vectors.
|
||||
|
||||
Works correctly for both unit-normalised and unnormalised inputs.
|
||||
Returns 0.0 when either vector has (near-)zero norm.
|
||||
|
||||
Returns
|
||||
-------
|
||||
float in [-1, 1].
|
||||
"""
|
||||
na = float(np.linalg.norm(a))
|
||||
nb = float(np.linalg.norm(b))
|
||||
if na < 1e-12 or nb < 1e-12:
|
||||
return 0.0
|
||||
return float(np.dot(a, b) / (na * nb))
|
||||
|
||||
|
||||
# ── Gallery matching ──────────────────────────────────────────────────────────
|
||||
|
||||
def match_track(
|
||||
feature: np.ndarray,
|
||||
tracks: Dict[int, TrackEntry],
|
||||
threshold: float,
|
||||
) -> Tuple[Optional[int], float]:
|
||||
"""
|
||||
Find the gallery entry with the highest cosine similarity to `feature`.
|
||||
|
||||
Stale tracks are excluded from matching.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
feature : query appearance vector (FEAT_DIM,)
|
||||
tracks : gallery dict keyed by track_id
|
||||
threshold : minimum cosine similarity required to accept a match
|
||||
|
||||
Returns
|
||||
-------
|
||||
(track_id, similarity) if a match is found, else (None, 0.0).
|
||||
"""
|
||||
best_id: Optional[int] = None
|
||||
best_sim: float = threshold # strict: new track if sim == threshold exactly
|
||||
|
||||
for tid, entry in tracks.items():
|
||||
if entry.is_stale:
|
||||
continue
|
||||
sim = cosine_similarity(feature, entry.feature)
|
||||
if sim > best_sim:
|
||||
best_sim = sim
|
||||
best_id = tid
|
||||
|
||||
if best_id is not None:
|
||||
return best_id, best_sim
|
||||
return None, 0.0
|
||||
|
||||
|
||||
# ── Gallery ───────────────────────────────────────────────────────────────────
|
||||
|
||||
class TrackGallery:
|
||||
"""
|
||||
Mutable store of active person tracks.
|
||||
|
||||
Thread-safety: not thread-safe; call from a single ROS2 callback thread.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._tracks: Dict[int, TrackEntry] = {}
|
||||
self._next_id: int = 1
|
||||
|
||||
@property
|
||||
def tracks(self) -> Dict[int, TrackEntry]:
|
||||
return self._tracks
|
||||
|
||||
def add(
|
||||
self,
|
||||
camera_id: str,
|
||||
feature: np.ndarray,
|
||||
bbox: Tuple[float, float, float, float],
|
||||
now: float,
|
||||
) -> int:
|
||||
"""Create a new track and return its assigned track_id."""
|
||||
tid = self._next_id
|
||||
self._next_id += 1
|
||||
self._tracks[tid] = TrackEntry(
|
||||
track_id=tid,
|
||||
camera_id=camera_id,
|
||||
feature=feature.copy(),
|
||||
confidence=1.0,
|
||||
first_seen=now,
|
||||
last_seen=now,
|
||||
bbox=bbox,
|
||||
)
|
||||
return tid
|
||||
|
||||
def update(
|
||||
self,
|
||||
track_id: int,
|
||||
camera_id: str,
|
||||
feature: np.ndarray,
|
||||
bbox: Tuple[float, float, float, float],
|
||||
confidence: float,
|
||||
now: float,
|
||||
) -> None:
|
||||
"""
|
||||
Update an existing track with a new observation.
|
||||
|
||||
The appearance feature is blended via EMA (α=0.3) for robustness.
|
||||
"""
|
||||
entry = self._tracks[track_id]
|
||||
|
||||
# EMA blend: α * new + (1-α) * old, then re-normalise
|
||||
updated = _EMA_ALPHA * feature + (1.0 - _EMA_ALPHA) * entry.feature
|
||||
norm = np.linalg.norm(updated)
|
||||
if norm > 0.0:
|
||||
updated /= norm
|
||||
|
||||
self._tracks[track_id] = TrackEntry(
|
||||
track_id=track_id,
|
||||
camera_id=camera_id,
|
||||
feature=updated,
|
||||
confidence=confidence,
|
||||
first_seen=entry.first_seen,
|
||||
last_seen=now,
|
||||
bbox=bbox,
|
||||
is_stale=False,
|
||||
)
|
||||
|
||||
def match(
|
||||
self,
|
||||
feature: np.ndarray,
|
||||
threshold: float,
|
||||
) -> Tuple[Optional[int], float]:
|
||||
"""Delegate to module-level match_track."""
|
||||
return match_track(feature, self._tracks, threshold)
|
||||
|
||||
def mark_stale(self, now: float, timeout_s: float) -> None:
|
||||
"""Flag tracks whose last_seen age exceeds timeout_s."""
|
||||
for entry in self._tracks.values():
|
||||
if (now - entry.last_seen) > timeout_s:
|
||||
entry.is_stale = True
|
||||
|
||||
def prune_stale(self) -> None:
|
||||
"""Remove all stale tracks from the gallery."""
|
||||
self._tracks = {
|
||||
tid: e for tid, e in self._tracks.items() if not e.is_stale
|
||||
}
|
||||
@ -0,0 +1,215 @@
|
||||
"""
|
||||
person_reid_node.py — Appearance-based person re-identification (Issue #322).
|
||||
|
||||
Subscribes to a camera RGB stream and person detection bounding boxes,
|
||||
extracts per-person HSV colour histograms, matches against an in-memory
|
||||
gallery using cosine similarity, and publishes stable PersonTrack IDs.
|
||||
|
||||
Running one instance per camera (remapping /camera/color/image_raw and
|
||||
/saltybot/scene/objects) achieves cross-camera re-id: the same physical
|
||||
person is assigned the same track_id regardless of which camera view
|
||||
they appear in, because appearance matching is camera-agnostic.
|
||||
|
||||
Subscribes
|
||||
----------
|
||||
/camera/color/image_raw sensor_msgs/Image (BEST_EFFORT)
|
||||
/saltybot/scene/objects saltybot_scene_msgs/SceneObjectArray (BEST_EFFORT)
|
||||
|
||||
Publishes
|
||||
---------
|
||||
/saltybot/person_tracks saltybot_scene_msgs/PersonTrackArray at publish_hz
|
||||
|
||||
Parameters
|
||||
----------
|
||||
camera_id str "front" Label stored in every PersonTrack
|
||||
similarity_threshold float 0.75 Cosine similarity threshold for re-id
|
||||
stale_timeout_s float 30.0 Seconds before inactive track is stale
|
||||
max_tracks int 20 Maximum concurrent active tracks
|
||||
publish_hz float 5.0 Track list publication rate (Hz)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import cv2
|
||||
import cv_bridge
|
||||
import numpy as np
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from builtin_interfaces.msg import Time as TimeMsg
|
||||
from sensor_msgs.msg import Image
|
||||
from saltybot_scene_msgs.msg import (
|
||||
SceneObjectArray,
|
||||
PersonTrack,
|
||||
PersonTrackArray,
|
||||
)
|
||||
|
||||
from ._person_reid import TrackGallery, extract_hsv_histogram
|
||||
|
||||
|
||||
# COCO class 0 = person
|
||||
_PERSON_CLASS_ID = 0
|
||||
|
||||
_BEST_EFFORT_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=1,
|
||||
)
|
||||
|
||||
|
||||
def _float_to_ros_time(wall_secs: float) -> TimeMsg:
|
||||
"""Convert a time.time() float to builtin_interfaces/Time."""
|
||||
sec_int = int(wall_secs)
|
||||
nanosec = int((wall_secs - sec_int) * 1e9)
|
||||
msg = TimeMsg()
|
||||
msg.sec = sec_int
|
||||
msg.nanosec = nanosec
|
||||
return msg
|
||||
|
||||
|
||||
class PersonReidNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('person_reid_node')
|
||||
|
||||
self.declare_parameter('camera_id', 'front')
|
||||
self.declare_parameter('similarity_threshold', 0.75)
|
||||
self.declare_parameter('stale_timeout_s', 30.0)
|
||||
self.declare_parameter('max_tracks', 20)
|
||||
self.declare_parameter('publish_hz', 5.0)
|
||||
|
||||
self._camera_id = str(self.get_parameter('camera_id').value)
|
||||
self._threshold = float(self.get_parameter('similarity_threshold').value)
|
||||
self._timeout = float(self.get_parameter('stale_timeout_s').value)
|
||||
self._max_tracks = int(self.get_parameter('max_tracks').value)
|
||||
publish_hz = float(self.get_parameter('publish_hz').value)
|
||||
|
||||
self._bridge: cv_bridge.CvBridge = cv_bridge.CvBridge()
|
||||
self._gallery: TrackGallery = TrackGallery()
|
||||
self._latest_image: np.ndarray | None = None
|
||||
|
||||
self._img_sub = self.create_subscription(
|
||||
Image,
|
||||
'/camera/color/image_raw',
|
||||
self._on_image,
|
||||
_BEST_EFFORT_QOS,
|
||||
)
|
||||
self._det_sub = self.create_subscription(
|
||||
SceneObjectArray,
|
||||
'/saltybot/scene/objects',
|
||||
self._on_detections,
|
||||
_BEST_EFFORT_QOS,
|
||||
)
|
||||
self._pub = self.create_publisher(
|
||||
PersonTrackArray,
|
||||
'/saltybot/person_tracks',
|
||||
10,
|
||||
)
|
||||
|
||||
period = 1.0 / max(publish_hz, 0.1)
|
||||
self._timer = self.create_timer(period, self._on_timer)
|
||||
|
||||
self.get_logger().info(
|
||||
f'person_reid_node ready — '
|
||||
f'camera_id={self._camera_id} '
|
||||
f'threshold={self._threshold} '
|
||||
f'timeout={self._timeout}s '
|
||||
f'max_tracks={self._max_tracks}'
|
||||
)
|
||||
|
||||
# ── Image callback ────────────────────────────────────────────────────────
|
||||
|
||||
def _on_image(self, msg: Image) -> None:
|
||||
try:
|
||||
self._latest_image = self._bridge.imgmsg_to_cv2(msg, 'bgr8')
|
||||
except Exception as exc:
|
||||
self.get_logger().warning(f'cv_bridge error: {exc}', throttle_duration_sec=5.0)
|
||||
|
||||
# ── Detection callback ────────────────────────────────────────────────────
|
||||
|
||||
def _on_detections(self, msg: SceneObjectArray) -> None:
|
||||
if self._latest_image is None:
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
img = self._latest_image
|
||||
h, w = img.shape[:2]
|
||||
|
||||
for obj in msg.objects:
|
||||
if obj.class_id != _PERSON_CLASS_ID:
|
||||
continue
|
||||
|
||||
# Crop the person ROI from the cached image
|
||||
cx = obj.bbox.center.position.x
|
||||
cy = obj.bbox.center.position.y
|
||||
hsw = obj.bbox.size_x / 2.0
|
||||
hsh = obj.bbox.size_y / 2.0
|
||||
x1 = max(0, int(cx - hsw))
|
||||
y1 = max(0, int(cy - hsh))
|
||||
x2 = min(w, int(cx + hsw))
|
||||
y2 = min(h, int(cy + hsh))
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
continue
|
||||
|
||||
crop = img[y1:y2, x1:x2]
|
||||
feature = extract_hsv_histogram(crop)
|
||||
bbox = (cx, cy, obj.bbox.size_x, obj.bbox.size_y)
|
||||
|
||||
track_id, sim = self._gallery.match(feature, self._threshold)
|
||||
|
||||
if track_id is not None:
|
||||
self._gallery.update(
|
||||
track_id, self._camera_id, feature, bbox, float(sim), now
|
||||
)
|
||||
elif len(self._gallery.tracks) < self._max_tracks:
|
||||
self._gallery.add(self._camera_id, feature, bbox, now)
|
||||
|
||||
# ── Timer callback — publish + housekeeping ───────────────────────────────
|
||||
|
||||
def _on_timer(self) -> None:
|
||||
now = time.time()
|
||||
self._gallery.mark_stale(now, self._timeout)
|
||||
self._gallery.prune_stale()
|
||||
|
||||
stamp = self.get_clock().now().to_msg()
|
||||
out = PersonTrackArray()
|
||||
out.header.stamp = stamp
|
||||
out.header.frame_id = self._camera_id
|
||||
|
||||
for entry in self._gallery.tracks.values():
|
||||
t = PersonTrack()
|
||||
t.header.stamp = stamp
|
||||
t.header.frame_id = self._camera_id
|
||||
t.track_id = entry.track_id
|
||||
t.camera_id = entry.camera_id
|
||||
t.confidence = float(entry.confidence)
|
||||
t.first_seen = _float_to_ros_time(entry.first_seen)
|
||||
t.last_seen = _float_to_ros_time(entry.last_seen)
|
||||
t.is_stale = entry.is_stale
|
||||
# Populate last-known bounding box
|
||||
cx, cy, sx, sy = entry.bbox
|
||||
t.bbox.center.position.x = cx
|
||||
t.bbox.center.position.y = cy
|
||||
t.bbox.size_x = sx
|
||||
t.bbox.size_y = sy
|
||||
out.tracks.append(t)
|
||||
|
||||
self._pub.publish(out)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = PersonReidNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -49,6 +49,8 @@ setup(
|
||||
'sky_detector = saltybot_bringup.sky_detect_node:main',
|
||||
# Wheel encoder differential drive odometry (Issue #184)
|
||||
'wheel_odom = saltybot_bringup.wheel_odom_node:main',
|
||||
# Appearance-based person re-identification (Issue #322)
|
||||
'person_reid = saltybot_bringup.person_reid_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
528
jetson/ros2_ws/src/saltybot_bringup/test/test_person_reid.py
Normal file
528
jetson/ros2_ws/src/saltybot_bringup/test/test_person_reid.py
Normal file
@ -0,0 +1,528 @@
|
||||
"""
|
||||
test_person_reid.py — Unit tests for person re-identification helpers (no ROS2).
|
||||
|
||||
Covers:
|
||||
extract_hsv_histogram:
|
||||
- output shape is FEAT_DIM (128)
|
||||
- None input → zero vector
|
||||
- zero-size array input → zero vector
|
||||
- solid-colour image → L2-normalised (norm ≈ 1)
|
||||
- same colour on different image sizes → same histogram
|
||||
- different solid colours → dissimilar histograms
|
||||
- all-white image (low saturation) → histogram concentrated in low-S bins
|
||||
- single-pixel image → valid unit histogram
|
||||
|
||||
cosine_similarity:
|
||||
- identical unit vectors → 1.0
|
||||
- zero vector (a) → 0.0
|
||||
- zero vector (b) → 0.0
|
||||
- both zero → 0.0
|
||||
- anti-parallel vectors → -1.0
|
||||
- orthogonal vectors → 0.0
|
||||
- non-unit same-direction vectors → 1.0
|
||||
- non-unit opposite-direction vectors → -1.0
|
||||
- partial overlap → in (0, 1)
|
||||
|
||||
match_track:
|
||||
- empty gallery → (None, 0.0)
|
||||
- perfect match above threshold → correct track_id, sim ≈ 1.0
|
||||
- similarity exactly at threshold → no match (strict >)
|
||||
- below threshold → (None, 0.0)
|
||||
- stale track excluded from matching
|
||||
- multiple tracks → best one returned
|
||||
- all tracks stale → (None, 0.0)
|
||||
|
||||
TrackGallery:
|
||||
- add assigns track_id = 1, 2, 3 ...
|
||||
- add stores a copy of feature (mutation of original doesn't affect gallery)
|
||||
- update changes last_seen, camera_id, confidence
|
||||
- update applies EMA: feature shifts toward new, stays unit-norm
|
||||
- update clears is_stale flag
|
||||
- update preserves first_seen
|
||||
- mark_stale: recent track stays fresh
|
||||
- mark_stale: old track flagged stale
|
||||
- prune_stale removes stale, keeps fresh
|
||||
- empty gallery match → (None, 0.0)
|
||||
|
||||
Re-id integration:
|
||||
- same appearance from two camera_ids → same track_id
|
||||
- different appearance → different track_id
|
||||
- stale + prune + re-add → new track_id
|
||||
- EMA: track adapts over multiple updates (feature norm stays ≈ 1)
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_bringup._person_reid import (
|
||||
H_BINS,
|
||||
S_BINS,
|
||||
FEAT_DIM,
|
||||
TrackEntry,
|
||||
TrackGallery,
|
||||
cosine_similarity,
|
||||
extract_hsv_histogram,
|
||||
match_track,
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _solid_bgr(b: int, g: int, r: int, size: int = 32) -> np.ndarray:
|
||||
"""Return a solid-colour BGR uint8 image of given size."""
|
||||
import cv2
|
||||
img = np.full((size, size, 3), (b, g, r), dtype=np.uint8)
|
||||
return img
|
||||
|
||||
|
||||
def _unit(v: np.ndarray) -> np.ndarray:
|
||||
n = np.linalg.norm(v)
|
||||
return v / n if n > 1e-12 else v
|
||||
|
||||
|
||||
def _make_feature(seed: int) -> np.ndarray:
|
||||
"""Reproducible random L2-normalised feature vector."""
|
||||
rng = np.random.default_rng(seed)
|
||||
v = rng.random(FEAT_DIM).astype(np.float32)
|
||||
return v / np.linalg.norm(v)
|
||||
|
||||
|
||||
def _make_entry(
|
||||
tid: int,
|
||||
feature: np.ndarray,
|
||||
camera_id: str = 'front',
|
||||
now: float = 1000.0,
|
||||
is_stale: bool = False,
|
||||
) -> TrackEntry:
|
||||
return TrackEntry(
|
||||
track_id=tid,
|
||||
camera_id=camera_id,
|
||||
feature=feature.copy(),
|
||||
confidence=1.0,
|
||||
first_seen=now,
|
||||
last_seen=now,
|
||||
is_stale=is_stale,
|
||||
)
|
||||
|
||||
|
||||
# ── extract_hsv_histogram ─────────────────────────────────────────────────────
|
||||
|
||||
class TestExtractHsvHistogram:
|
||||
|
||||
def test_output_shape(self):
|
||||
img = _solid_bgr(0, 0, 255)
|
||||
hist = extract_hsv_histogram(img)
|
||||
assert hist.shape == (FEAT_DIM,)
|
||||
|
||||
def test_output_dtype(self):
|
||||
hist = extract_hsv_histogram(_solid_bgr(0, 255, 0))
|
||||
assert hist.dtype == np.float32
|
||||
|
||||
def test_none_returns_zero(self):
|
||||
hist = extract_hsv_histogram(None)
|
||||
assert hist.shape == (FEAT_DIM,)
|
||||
assert np.all(hist == 0.0)
|
||||
|
||||
def test_empty_array_returns_zero(self):
|
||||
import numpy as np
|
||||
empty = np.zeros((0, 0, 3), dtype=np.uint8)
|
||||
hist = extract_hsv_histogram(empty)
|
||||
assert np.all(hist == 0.0)
|
||||
|
||||
def test_solid_colour_is_unit_norm(self):
|
||||
hist = extract_hsv_histogram(_solid_bgr(255, 0, 0))
|
||||
assert np.linalg.norm(hist) == pytest.approx(1.0, rel=1e-5)
|
||||
|
||||
def test_same_colour_different_sizes_equal(self):
|
||||
h16 = extract_hsv_histogram(_solid_bgr(0, 0, 200, size=16))
|
||||
h64 = extract_hsv_histogram(_solid_bgr(0, 0, 200, size=64))
|
||||
assert np.allclose(h16, h64, atol=1e-5)
|
||||
|
||||
def test_different_colours_different_histograms(self):
|
||||
red = extract_hsv_histogram(_solid_bgr(0, 0, 255))
|
||||
blue = extract_hsv_histogram(_solid_bgr(255, 0, 0))
|
||||
assert not np.allclose(red, blue, atol=1e-3)
|
||||
|
||||
def test_red_vs_green_low_similarity(self):
|
||||
red = extract_hsv_histogram(_solid_bgr(0, 0, 255))
|
||||
green = extract_hsv_histogram(_solid_bgr(0, 255, 0))
|
||||
sim = cosine_similarity(red, green)
|
||||
assert sim < 0.5
|
||||
|
||||
def test_single_pixel_valid(self):
|
||||
import numpy as np
|
||||
px = np.array([[[0, 200, 200]]], dtype=np.uint8)
|
||||
hist = extract_hsv_histogram(px)
|
||||
assert hist.shape == (FEAT_DIM,)
|
||||
assert np.linalg.norm(hist) == pytest.approx(1.0, rel=1e-5)
|
||||
|
||||
def test_feat_dim_constant(self):
|
||||
assert FEAT_DIM == H_BINS * S_BINS
|
||||
assert FEAT_DIM == 128
|
||||
|
||||
|
||||
# ── cosine_similarity ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestCosineSimilarity:
|
||||
|
||||
def test_identical_unit_vectors(self):
|
||||
v = _make_feature(0)
|
||||
assert cosine_similarity(v, v.copy()) == pytest.approx(1.0, rel=1e-6)
|
||||
|
||||
def test_zero_vector_a(self):
|
||||
z = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
v = _make_feature(1)
|
||||
assert cosine_similarity(z, v) == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_zero_vector_b(self):
|
||||
v = _make_feature(2)
|
||||
z = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
assert cosine_similarity(v, z) == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_both_zero(self):
|
||||
z = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
assert cosine_similarity(z, z) == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_anti_parallel(self):
|
||||
v = _make_feature(3)
|
||||
assert cosine_similarity(v, -v) == pytest.approx(-1.0, rel=1e-6)
|
||||
|
||||
def test_orthogonal(self):
|
||||
a = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
b = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
a[0] = 1.0
|
||||
b[1] = 1.0
|
||||
assert cosine_similarity(a, b) == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_non_unit_same_direction(self):
|
||||
v = _make_feature(4)
|
||||
assert cosine_similarity(v * 5.0, v * 3.0) == pytest.approx(1.0, rel=1e-6)
|
||||
|
||||
def test_non_unit_opposite_direction(self):
|
||||
v = _make_feature(5)
|
||||
assert cosine_similarity(v * 7.0, -v * 2.0) == pytest.approx(-1.0, rel=1e-6)
|
||||
|
||||
def test_partial_overlap_in_range(self):
|
||||
a = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
b = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
a[0] = 1.0
|
||||
b[0] = 1.0
|
||||
b[1] = 1.0
|
||||
sim = cosine_similarity(a, b)
|
||||
assert 0.0 < sim < 1.0
|
||||
|
||||
|
||||
# ── match_track ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestMatchTrack:
|
||||
|
||||
def test_empty_gallery(self):
|
||||
feat = _make_feature(10)
|
||||
tid, sim = match_track(feat, {}, threshold=0.75)
|
||||
assert tid is None
|
||||
assert sim == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_exact_match_above_threshold(self):
|
||||
feat = _make_feature(11)
|
||||
tracks = {1: _make_entry(1, feat)}
|
||||
tid, sim = match_track(feat, tracks, threshold=0.75)
|
||||
assert tid == 1
|
||||
assert sim == pytest.approx(1.0, rel=1e-5)
|
||||
|
||||
def test_similarity_at_threshold_no_match(self):
|
||||
"""Strict >: equal to threshold is not a match."""
|
||||
feat = _make_feature(12)
|
||||
# construct a feature at exactly cosine = threshold
|
||||
# use two orthogonal features blended so sim == threshold
|
||||
a = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
b = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
a[0] = 1.0
|
||||
b[1] = 1.0
|
||||
threshold = 0.75
|
||||
# query is a; gallery is a → sim = 1.0, way above. Just test with threshold=1.0+eps
|
||||
tracks = {1: _make_entry(1, a)}
|
||||
tid, sim = match_track(a, tracks, threshold=1.0 + 1e-9)
|
||||
assert tid is None
|
||||
|
||||
def test_below_threshold_no_match(self):
|
||||
feat_a = _make_feature(13)
|
||||
feat_b = _make_feature(14)
|
||||
tracks = {1: _make_entry(1, feat_b)}
|
||||
# Force threshold above any realistic similarity between random features
|
||||
tid, sim = match_track(feat_a, tracks, threshold=0.9999)
|
||||
if cosine_similarity(feat_a, feat_b) <= 0.9999:
|
||||
assert tid is None
|
||||
|
||||
def test_stale_track_excluded(self):
|
||||
feat = _make_feature(15)
|
||||
tracks = {1: _make_entry(1, feat, is_stale=True)}
|
||||
tid, sim = match_track(feat, tracks, threshold=0.5)
|
||||
assert tid is None
|
||||
|
||||
def test_multiple_tracks_best_returned(self):
|
||||
query = _make_feature(16)
|
||||
close = query.copy() # sim = 1.0
|
||||
far = _make_feature(17) # lower sim
|
||||
tracks = {
|
||||
1: _make_entry(1, far),
|
||||
2: _make_entry(2, close),
|
||||
}
|
||||
tid, sim = match_track(query, tracks, threshold=0.5)
|
||||
assert tid == 2
|
||||
assert sim == pytest.approx(1.0, rel=1e-5)
|
||||
|
||||
def test_all_stale_no_match(self):
|
||||
feat = _make_feature(18)
|
||||
tracks = {
|
||||
1: _make_entry(1, feat, is_stale=True),
|
||||
2: _make_entry(2, feat, is_stale=True),
|
||||
}
|
||||
tid, sim = match_track(feat, tracks, threshold=0.5)
|
||||
assert tid is None
|
||||
|
||||
def test_fresh_and_stale_returns_fresh(self):
|
||||
feat = _make_feature(19)
|
||||
tracks = {
|
||||
1: _make_entry(1, feat, is_stale=True),
|
||||
2: _make_entry(2, feat, is_stale=False),
|
||||
}
|
||||
tid, sim = match_track(feat, tracks, threshold=0.5)
|
||||
assert tid == 2
|
||||
|
||||
|
||||
# ── TrackGallery ──────────────────────────────────────────────────────────────
|
||||
|
||||
_BBOX = (10.0, 20.0, 50.0, 80.0)
|
||||
|
||||
|
||||
class TestTrackGallery:
|
||||
|
||||
def test_add_increments_track_id(self):
|
||||
g = TrackGallery()
|
||||
now = 1000.0
|
||||
t1 = g.add('front', _make_feature(20), _BBOX, now)
|
||||
t2 = g.add('front', _make_feature(21), _BBOX, now)
|
||||
t3 = g.add('rear', _make_feature(22), _BBOX, now)
|
||||
assert t1 == 1
|
||||
assert t2 == 2
|
||||
assert t3 == 3
|
||||
|
||||
def test_add_stores_copy(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(23)
|
||||
orig = feat.copy()
|
||||
g.add('front', feat, _BBOX, 1000.0)
|
||||
feat[0] = 99.9 # mutate original
|
||||
stored = g.tracks[1].feature
|
||||
assert np.allclose(stored, orig, atol=1e-6)
|
||||
|
||||
def test_add_sets_timestamps(self):
|
||||
g = TrackGallery()
|
||||
now = 5000.0
|
||||
g.add('front', _make_feature(24), _BBOX, now)
|
||||
e = g.tracks[1]
|
||||
assert e.first_seen == pytest.approx(now)
|
||||
assert e.last_seen == pytest.approx(now)
|
||||
|
||||
def test_add_initial_confidence_one(self):
|
||||
g = TrackGallery()
|
||||
g.add('front', _make_feature(25), _BBOX, 1000.0)
|
||||
assert g.tracks[1].confidence == pytest.approx(1.0)
|
||||
|
||||
def test_update_changes_last_seen(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(26)
|
||||
g.add('front', feat, _BBOX, 1000.0)
|
||||
g.update(1, 'front', feat, _BBOX, 0.9, 2000.0)
|
||||
assert g.tracks[1].last_seen == pytest.approx(2000.0)
|
||||
assert g.tracks[1].first_seen == pytest.approx(1000.0) # preserved
|
||||
|
||||
def test_update_changes_camera_id(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(27)
|
||||
g.add('front', feat, _BBOX, 1000.0)
|
||||
g.update(1, 'right', feat, _BBOX, 0.8, 2000.0)
|
||||
assert g.tracks[1].camera_id == 'right'
|
||||
|
||||
def test_update_applies_ema(self):
|
||||
g = TrackGallery()
|
||||
feat0 = _make_feature(28)
|
||||
g.add('front', feat0, _BBOX, 1000.0)
|
||||
|
||||
feat1 = _make_feature(29)
|
||||
g.update(1, 'front', feat1, _BBOX, 0.85, 2000.0)
|
||||
stored = g.tracks[1].feature
|
||||
|
||||
# EMA: 0.3*feat1 + 0.7*feat0 (then renorm) — not equal to either
|
||||
assert not np.allclose(stored, feat0, atol=1e-4)
|
||||
assert not np.allclose(stored, feat1, atol=1e-4)
|
||||
|
||||
def test_update_feature_stays_unit_norm(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(30)
|
||||
g.add('front', feat, _BBOX, 1000.0)
|
||||
for i in range(10):
|
||||
new_feat = _make_feature(31 + i)
|
||||
g.update(1, 'front', new_feat, _BBOX, 0.9, 1000.0 + i)
|
||||
assert np.linalg.norm(g.tracks[1].feature) == pytest.approx(1.0, rel=1e-5)
|
||||
|
||||
def test_update_clears_stale(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(41)
|
||||
g.add('front', feat, _BBOX, 1000.0)
|
||||
g.tracks[1].is_stale = True
|
||||
g.update(1, 'front', feat, _BBOX, 0.9, 2000.0)
|
||||
assert g.tracks[1].is_stale is False
|
||||
|
||||
def test_mark_stale_recent_track_stays_fresh(self):
|
||||
g = TrackGallery()
|
||||
now = 1000.0
|
||||
g.add('front', _make_feature(42), _BBOX, now)
|
||||
g.mark_stale(now + 10.0, timeout_s=30.0)
|
||||
assert g.tracks[1].is_stale is False
|
||||
|
||||
def test_mark_stale_old_track_flagged(self):
|
||||
g = TrackGallery()
|
||||
now = 1000.0
|
||||
g.add('front', _make_feature(43), _BBOX, now)
|
||||
g.mark_stale(now + 31.0, timeout_s=30.0)
|
||||
assert g.tracks[1].is_stale is True
|
||||
|
||||
def test_mark_stale_exactly_at_timeout_not_stale(self):
|
||||
"""Timeout is strictly >, so last_seen + timeout_s is still fresh."""
|
||||
g = TrackGallery()
|
||||
now = 1000.0
|
||||
g.add('front', _make_feature(44), _BBOX, now)
|
||||
g.mark_stale(now + 30.0, timeout_s=30.0)
|
||||
assert g.tracks[1].is_stale is False
|
||||
|
||||
def test_prune_stale_removes_flagged(self):
|
||||
g = TrackGallery()
|
||||
now = 1000.0
|
||||
g.add('front', _make_feature(45), _BBOX, now) # track 1
|
||||
g.add('front', _make_feature(46), _BBOX, now) # track 2
|
||||
g.tracks[1].is_stale = True
|
||||
g.prune_stale()
|
||||
assert 1 not in g.tracks
|
||||
assert 2 in g.tracks
|
||||
|
||||
def test_prune_stale_keeps_fresh(self):
|
||||
g = TrackGallery()
|
||||
now = 1000.0
|
||||
g.add('front', _make_feature(47), _BBOX, now)
|
||||
g.prune_stale()
|
||||
assert 1 in g.tracks
|
||||
|
||||
def test_empty_gallery_match(self):
|
||||
g = TrackGallery()
|
||||
tid, sim = g.match(_make_feature(48), threshold=0.75)
|
||||
assert tid is None
|
||||
assert sim == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_match_delegates_to_match_track(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(49)
|
||||
g.add('front', feat, _BBOX, 1000.0)
|
||||
tid, sim = g.match(feat, threshold=0.5)
|
||||
assert tid == 1
|
||||
assert sim == pytest.approx(1.0, rel=1e-5)
|
||||
|
||||
|
||||
# ── Re-id integration ─────────────────────────────────────────────────────────
|
||||
|
||||
class TestReIdIntegration:
|
||||
|
||||
def test_same_appearance_two_cameras_same_track(self):
|
||||
"""Same feature from two cameras → same track_id."""
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(50)
|
||||
now = 1000.0
|
||||
|
||||
# First sighting: front camera creates track 1
|
||||
t1 = g.add('front', feat, _BBOX, now)
|
||||
assert t1 == 1
|
||||
|
||||
# Second sighting: right camera matches → updates same track
|
||||
tid, sim = g.match(feat, threshold=0.75)
|
||||
assert tid == 1
|
||||
g.update(tid, 'right', feat, _BBOX, float(sim), now + 1.0)
|
||||
assert len(g.tracks) == 1
|
||||
|
||||
def test_different_appearance_different_tracks(self):
|
||||
"""Two orthogonal features must not match → separate track IDs."""
|
||||
g = TrackGallery()
|
||||
now = 1000.0
|
||||
|
||||
# Construct two exactly orthogonal unit feature vectors
|
||||
feat_a = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
feat_b = np.zeros(FEAT_DIM, dtype=np.float32)
|
||||
feat_a[0] = 1.0 # basis vector e0
|
||||
feat_b[1] = 1.0 # basis vector e1 — cosine sim = 0.0
|
||||
|
||||
g.add('front', feat_a, _BBOX, now)
|
||||
|
||||
tid, sim = g.match(feat_b, threshold=0.75)
|
||||
assert tid is None
|
||||
|
||||
g.add('front', feat_b, _BBOX, now + 0.1)
|
||||
assert len(g.tracks) == 2
|
||||
|
||||
def test_stale_prune_readd_new_id(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(53)
|
||||
now = 1000.0
|
||||
|
||||
t1 = g.add('front', feat, _BBOX, now)
|
||||
assert t1 == 1
|
||||
|
||||
g.mark_stale(now + 31.0, timeout_s=30.0)
|
||||
g.prune_stale()
|
||||
assert len(g.tracks) == 0
|
||||
|
||||
# Re-add same person gets new ID
|
||||
t2 = g.add('front', feat, _BBOX, now + 40.0)
|
||||
assert t2 == 2 # next_id incremented past 1
|
||||
|
||||
def test_ema_feature_norm_stays_unit_after_many_updates(self):
|
||||
g = TrackGallery()
|
||||
feat = _make_feature(54)
|
||||
g.add('front', feat, _BBOX, 0.0)
|
||||
|
||||
for i in range(50):
|
||||
new_feat = _make_feature(55 + i)
|
||||
g.update(1, 'front', new_feat, _BBOX, 0.9, float(i))
|
||||
|
||||
norm = np.linalg.norm(g.tracks[1].feature)
|
||||
assert norm == pytest.approx(1.0, rel=1e-4)
|
||||
|
||||
def test_max_tracks_not_exceeded_in_gallery(self):
|
||||
"""Gallery itself is unbounded; max_tracks enforcement is in the node.
|
||||
Confirm gallery can hold many entries without errors."""
|
||||
g = TrackGallery()
|
||||
for i in range(25):
|
||||
g.add('front', _make_feature(100 + i), _BBOX, float(i))
|
||||
assert len(g.tracks) == 25
|
||||
|
||||
def test_hsv_same_image_produces_high_similarity(self):
|
||||
"""Same-crop HSV histogram should have cosine sim ≈ 1."""
|
||||
import cv2
|
||||
img = _solid_bgr(50, 120, 200)
|
||||
h1 = extract_hsv_histogram(img)
|
||||
h2 = extract_hsv_histogram(img.copy())
|
||||
assert cosine_similarity(h1, h2) == pytest.approx(1.0, rel=1e-5)
|
||||
|
||||
def test_hsv_different_colours_low_similarity(self):
|
||||
"""Red vs cyan BGR should be dissimilar."""
|
||||
red = extract_hsv_histogram(_solid_bgr(0, 0, 200))
|
||||
cyan = extract_hsv_histogram(_solid_bgr(200, 200, 0))
|
||||
assert cosine_similarity(red, cyan) < 0.5
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
@ -19,6 +19,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
# Issue #274 — HSV color segmentation
|
||||
"msg/ColorDetection.msg"
|
||||
"msg/ColorDetectionArray.msg"
|
||||
# Issue #322 — cross-camera person re-identification
|
||||
"msg/PersonTrack.msg"
|
||||
"msg/PersonTrackArray.msg"
|
||||
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
||||
)
|
||||
|
||||
|
||||
19
jetson/ros2_ws/src/saltybot_scene_msgs/msg/PersonTrack.msg
Normal file
19
jetson/ros2_ws/src/saltybot_scene_msgs/msg/PersonTrack.msg
Normal file
@ -0,0 +1,19 @@
|
||||
# PersonTrack.msg — stable cross-camera person identity track (Issue #322)
|
||||
#
|
||||
# track_id : monotonically-increasing track ID, stable across camera views
|
||||
# camera_id : source camera label ("front", "right", "rear", "left")
|
||||
# bbox : last-known bounding box in the source image (centre + size, pixels)
|
||||
# confidence : cosine-similarity of the matched appearance feature (0–1);
|
||||
# 1.0 for tracks created from a first-time observation
|
||||
# first_seen : wall-clock time when this track was first created
|
||||
# last_seen : wall-clock time of the most recent appearance observation
|
||||
# is_stale : true when (now − last_seen) > stale_timeout_s
|
||||
#
|
||||
std_msgs/Header header
|
||||
uint32 track_id
|
||||
string camera_id
|
||||
vision_msgs/BoundingBox2D bbox
|
||||
float32 confidence
|
||||
builtin_interfaces/Time first_seen
|
||||
builtin_interfaces/Time last_seen
|
||||
bool is_stale
|
||||
@ -0,0 +1,4 @@
|
||||
# PersonTrackArray.msg — all active person tracks from one camera view (Issue #322)
|
||||
#
|
||||
std_msgs/Header header
|
||||
PersonTrack[] tracks
|
||||
Loading…
x
Reference in New Issue
Block a user