feat(perception): appearance-based person re-identification (Issue #322) #330

Merged
sl-jetson merged 1 commits from sl-perception/issue-322-person-reid into main 2026-03-03 06:46:08 -05:00
7 changed files with 1030 additions and 0 deletions

View File

@ -0,0 +1,259 @@
"""
_person_reid.py Appearance-based person re-identification (no ROS2 deps).
Algorithm
---------
For each person detection (bounding-box crop from a camera image):
1. Extract a 2-D HS colour histogram:
H axis: 16 bins over [0, 180] (OpenCV hue)
S axis: 8 bins over [0, 256] (saturation)
The Value channel is omitted for illumination robustness.
Result: 128-dimensional float32 vector, L2-normalised.
2. Match against the gallery of active TrackEntry objects using cosine
similarity (= dot product of L2-normalised vectors). The best match
whose similarity exceeds `threshold` is selected; otherwise a new track
is created.
3. On a successful match the gallery entry is updated with an exponential
moving average (α = 0.3) of the new feature to adapt gradually to
appearance changes (lighting, pose) while staying robust to outliers.
4. Tracks not updated for `timeout_s` wall-clock seconds are flagged
`is_stale`; stale tracks are pruned from the gallery on the next cycle.
Public API
----------
H_BINS, S_BINS, FEAT_DIM histogram shape constants
TrackEntry mutable dataclass for one active track
extract_hsv_histogram(bgr_crop) -> np.ndarray (FEAT_DIM,) L2-normalised
cosine_similarity(a, b) -> float in [-1, 1]
match_track(feature, tracks, threshold)
-> (track_id, similarity) | (None, 0.0)
TrackGallery mutable store of active tracks
.add(camera_id, feature, bbox, now) -> int (new track_id)
.update(track_id, camera_id, feature, bbox, confidence, now)
.match(feature, threshold) -> (track_id, similarity) | (None, 0.0)
.mark_stale(now, timeout_s)
.prune_stale()
.tracks -> Dict[int, TrackEntry]
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple
import cv2
import numpy as np
# ── Histogram constants ───────────────────────────────────────────────────────
H_BINS: int = 16
S_BINS: int = 8
FEAT_DIM: int = H_BINS * S_BINS # 128
_EMA_ALPHA: float = 0.3 # weight of newest observation in EMA update
# ── Track entry ───────────────────────────────────────────────────────────────
@dataclass
class TrackEntry:
"""Mutable state for one active person track."""
track_id: int
camera_id: str
feature: np.ndarray # (FEAT_DIM,) L2-normalised appearance
confidence: float # last match similarity (1.0 for new)
first_seen: float # time.time() wall-clock seconds
last_seen: float # time.time() wall-clock seconds
bbox: Tuple[float, float, float, float] = (0.0, 0.0, 0.0, 0.0)
# (centre_x, centre_y, size_x, size_y) in image pixels
is_stale: bool = False
# ── Feature extraction ────────────────────────────────────────────────────────
def extract_hsv_histogram(bgr_crop: np.ndarray) -> np.ndarray:
"""
Extract a 2-D HS colour histogram from a BGR image crop.
The Value channel is excluded for illumination robustness.
The result is L2-normalised; a zero-area or empty crop returns a zero vector.
Parameters
----------
bgr_crop : uint8 BGR image, any size 1×1.
Returns
-------
np.ndarray, shape (FEAT_DIM,) = (128,), dtype float32, L2-normalised.
"""
if bgr_crop is None or bgr_crop.size == 0:
return np.zeros(FEAT_DIM, dtype=np.float32)
hsv = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2HSV)
hist = cv2.calcHist(
[hsv], [0, 1], None,
[H_BINS, S_BINS],
[0, 180, 0, 256],
)
flat = hist.flatten().astype(np.float32)
norm = np.linalg.norm(flat)
if norm > 0.0:
flat /= norm
return flat
# ── Similarity ────────────────────────────────────────────────────────────────
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""
Cosine similarity between two vectors.
Works correctly for both unit-normalised and unnormalised inputs.
Returns 0.0 when either vector has (near-)zero norm.
Returns
-------
float in [-1, 1].
"""
na = float(np.linalg.norm(a))
nb = float(np.linalg.norm(b))
if na < 1e-12 or nb < 1e-12:
return 0.0
return float(np.dot(a, b) / (na * nb))
# ── Gallery matching ──────────────────────────────────────────────────────────
def match_track(
feature: np.ndarray,
tracks: Dict[int, TrackEntry],
threshold: float,
) -> Tuple[Optional[int], float]:
"""
Find the gallery entry with the highest cosine similarity to `feature`.
Stale tracks are excluded from matching.
Parameters
----------
feature : query appearance vector (FEAT_DIM,)
tracks : gallery dict keyed by track_id
threshold : minimum cosine similarity required to accept a match
Returns
-------
(track_id, similarity) if a match is found, else (None, 0.0).
"""
best_id: Optional[int] = None
best_sim: float = threshold # strict: new track if sim == threshold exactly
for tid, entry in tracks.items():
if entry.is_stale:
continue
sim = cosine_similarity(feature, entry.feature)
if sim > best_sim:
best_sim = sim
best_id = tid
if best_id is not None:
return best_id, best_sim
return None, 0.0
# ── Gallery ───────────────────────────────────────────────────────────────────
class TrackGallery:
"""
Mutable store of active person tracks.
Thread-safety: not thread-safe; call from a single ROS2 callback thread.
"""
def __init__(self) -> None:
self._tracks: Dict[int, TrackEntry] = {}
self._next_id: int = 1
@property
def tracks(self) -> Dict[int, TrackEntry]:
return self._tracks
def add(
self,
camera_id: str,
feature: np.ndarray,
bbox: Tuple[float, float, float, float],
now: float,
) -> int:
"""Create a new track and return its assigned track_id."""
tid = self._next_id
self._next_id += 1
self._tracks[tid] = TrackEntry(
track_id=tid,
camera_id=camera_id,
feature=feature.copy(),
confidence=1.0,
first_seen=now,
last_seen=now,
bbox=bbox,
)
return tid
def update(
self,
track_id: int,
camera_id: str,
feature: np.ndarray,
bbox: Tuple[float, float, float, float],
confidence: float,
now: float,
) -> None:
"""
Update an existing track with a new observation.
The appearance feature is blended via EMA (α=0.3) for robustness.
"""
entry = self._tracks[track_id]
# EMA blend: α * new + (1-α) * old, then re-normalise
updated = _EMA_ALPHA * feature + (1.0 - _EMA_ALPHA) * entry.feature
norm = np.linalg.norm(updated)
if norm > 0.0:
updated /= norm
self._tracks[track_id] = TrackEntry(
track_id=track_id,
camera_id=camera_id,
feature=updated,
confidence=confidence,
first_seen=entry.first_seen,
last_seen=now,
bbox=bbox,
is_stale=False,
)
def match(
self,
feature: np.ndarray,
threshold: float,
) -> Tuple[Optional[int], float]:
"""Delegate to module-level match_track."""
return match_track(feature, self._tracks, threshold)
def mark_stale(self, now: float, timeout_s: float) -> None:
"""Flag tracks whose last_seen age exceeds timeout_s."""
for entry in self._tracks.values():
if (now - entry.last_seen) > timeout_s:
entry.is_stale = True
def prune_stale(self) -> None:
"""Remove all stale tracks from the gallery."""
self._tracks = {
tid: e for tid, e in self._tracks.items() if not e.is_stale
}

View File

@ -0,0 +1,215 @@
"""
person_reid_node.py Appearance-based person re-identification (Issue #322).
Subscribes to a camera RGB stream and person detection bounding boxes,
extracts per-person HSV colour histograms, matches against an in-memory
gallery using cosine similarity, and publishes stable PersonTrack IDs.
Running one instance per camera (remapping /camera/color/image_raw and
/saltybot/scene/objects) achieves cross-camera re-id: the same physical
person is assigned the same track_id regardless of which camera view
they appear in, because appearance matching is camera-agnostic.
Subscribes
----------
/camera/color/image_raw sensor_msgs/Image (BEST_EFFORT)
/saltybot/scene/objects saltybot_scene_msgs/SceneObjectArray (BEST_EFFORT)
Publishes
---------
/saltybot/person_tracks saltybot_scene_msgs/PersonTrackArray at publish_hz
Parameters
----------
camera_id str "front" Label stored in every PersonTrack
similarity_threshold float 0.75 Cosine similarity threshold for re-id
stale_timeout_s float 30.0 Seconds before inactive track is stale
max_tracks int 20 Maximum concurrent active tracks
publish_hz float 5.0 Track list publication rate (Hz)
"""
from __future__ import annotations
import time
import cv2
import cv_bridge
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from builtin_interfaces.msg import Time as TimeMsg
from sensor_msgs.msg import Image
from saltybot_scene_msgs.msg import (
SceneObjectArray,
PersonTrack,
PersonTrackArray,
)
from ._person_reid import TrackGallery, extract_hsv_histogram
# COCO class 0 = person
_PERSON_CLASS_ID = 0
_BEST_EFFORT_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
def _float_to_ros_time(wall_secs: float) -> TimeMsg:
"""Convert a time.time() float to builtin_interfaces/Time."""
sec_int = int(wall_secs)
nanosec = int((wall_secs - sec_int) * 1e9)
msg = TimeMsg()
msg.sec = sec_int
msg.nanosec = nanosec
return msg
class PersonReidNode(Node):
def __init__(self) -> None:
super().__init__('person_reid_node')
self.declare_parameter('camera_id', 'front')
self.declare_parameter('similarity_threshold', 0.75)
self.declare_parameter('stale_timeout_s', 30.0)
self.declare_parameter('max_tracks', 20)
self.declare_parameter('publish_hz', 5.0)
self._camera_id = str(self.get_parameter('camera_id').value)
self._threshold = float(self.get_parameter('similarity_threshold').value)
self._timeout = float(self.get_parameter('stale_timeout_s').value)
self._max_tracks = int(self.get_parameter('max_tracks').value)
publish_hz = float(self.get_parameter('publish_hz').value)
self._bridge: cv_bridge.CvBridge = cv_bridge.CvBridge()
self._gallery: TrackGallery = TrackGallery()
self._latest_image: np.ndarray | None = None
self._img_sub = self.create_subscription(
Image,
'/camera/color/image_raw',
self._on_image,
_BEST_EFFORT_QOS,
)
self._det_sub = self.create_subscription(
SceneObjectArray,
'/saltybot/scene/objects',
self._on_detections,
_BEST_EFFORT_QOS,
)
self._pub = self.create_publisher(
PersonTrackArray,
'/saltybot/person_tracks',
10,
)
period = 1.0 / max(publish_hz, 0.1)
self._timer = self.create_timer(period, self._on_timer)
self.get_logger().info(
f'person_reid_node ready — '
f'camera_id={self._camera_id} '
f'threshold={self._threshold} '
f'timeout={self._timeout}s '
f'max_tracks={self._max_tracks}'
)
# ── Image callback ────────────────────────────────────────────────────────
def _on_image(self, msg: Image) -> None:
try:
self._latest_image = self._bridge.imgmsg_to_cv2(msg, 'bgr8')
except Exception as exc:
self.get_logger().warning(f'cv_bridge error: {exc}', throttle_duration_sec=5.0)
# ── Detection callback ────────────────────────────────────────────────────
def _on_detections(self, msg: SceneObjectArray) -> None:
if self._latest_image is None:
return
now = time.time()
img = self._latest_image
h, w = img.shape[:2]
for obj in msg.objects:
if obj.class_id != _PERSON_CLASS_ID:
continue
# Crop the person ROI from the cached image
cx = obj.bbox.center.position.x
cy = obj.bbox.center.position.y
hsw = obj.bbox.size_x / 2.0
hsh = obj.bbox.size_y / 2.0
x1 = max(0, int(cx - hsw))
y1 = max(0, int(cy - hsh))
x2 = min(w, int(cx + hsw))
y2 = min(h, int(cy + hsh))
if x2 <= x1 or y2 <= y1:
continue
crop = img[y1:y2, x1:x2]
feature = extract_hsv_histogram(crop)
bbox = (cx, cy, obj.bbox.size_x, obj.bbox.size_y)
track_id, sim = self._gallery.match(feature, self._threshold)
if track_id is not None:
self._gallery.update(
track_id, self._camera_id, feature, bbox, float(sim), now
)
elif len(self._gallery.tracks) < self._max_tracks:
self._gallery.add(self._camera_id, feature, bbox, now)
# ── Timer callback — publish + housekeeping ───────────────────────────────
def _on_timer(self) -> None:
now = time.time()
self._gallery.mark_stale(now, self._timeout)
self._gallery.prune_stale()
stamp = self.get_clock().now().to_msg()
out = PersonTrackArray()
out.header.stamp = stamp
out.header.frame_id = self._camera_id
for entry in self._gallery.tracks.values():
t = PersonTrack()
t.header.stamp = stamp
t.header.frame_id = self._camera_id
t.track_id = entry.track_id
t.camera_id = entry.camera_id
t.confidence = float(entry.confidence)
t.first_seen = _float_to_ros_time(entry.first_seen)
t.last_seen = _float_to_ros_time(entry.last_seen)
t.is_stale = entry.is_stale
# Populate last-known bounding box
cx, cy, sx, sy = entry.bbox
t.bbox.center.position.x = cx
t.bbox.center.position.y = cy
t.bbox.size_x = sx
t.bbox.size_y = sy
out.tracks.append(t)
self._pub.publish(out)
def main(args=None) -> None:
rclpy.init(args=args)
node = PersonReidNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -49,6 +49,8 @@ setup(
'sky_detector = saltybot_bringup.sky_detect_node:main',
# Wheel encoder differential drive odometry (Issue #184)
'wheel_odom = saltybot_bringup.wheel_odom_node:main',
# Appearance-based person re-identification (Issue #322)
'person_reid = saltybot_bringup.person_reid_node:main',
],
},
)

View File

@ -0,0 +1,528 @@
"""
test_person_reid.py Unit tests for person re-identification helpers (no ROS2).
Covers:
extract_hsv_histogram:
- output shape is FEAT_DIM (128)
- None input zero vector
- zero-size array input zero vector
- solid-colour image L2-normalised (norm 1)
- same colour on different image sizes same histogram
- different solid colours dissimilar histograms
- all-white image (low saturation) histogram concentrated in low-S bins
- single-pixel image valid unit histogram
cosine_similarity:
- identical unit vectors 1.0
- zero vector (a) 0.0
- zero vector (b) 0.0
- both zero 0.0
- anti-parallel vectors -1.0
- orthogonal vectors 0.0
- non-unit same-direction vectors 1.0
- non-unit opposite-direction vectors -1.0
- partial overlap in (0, 1)
match_track:
- empty gallery (None, 0.0)
- perfect match above threshold correct track_id, sim 1.0
- similarity exactly at threshold no match (strict >)
- below threshold (None, 0.0)
- stale track excluded from matching
- multiple tracks best one returned
- all tracks stale (None, 0.0)
TrackGallery:
- add assigns track_id = 1, 2, 3 ...
- add stores a copy of feature (mutation of original doesn't affect gallery)
- update changes last_seen, camera_id, confidence
- update applies EMA: feature shifts toward new, stays unit-norm
- update clears is_stale flag
- update preserves first_seen
- mark_stale: recent track stays fresh
- mark_stale: old track flagged stale
- prune_stale removes stale, keeps fresh
- empty gallery match (None, 0.0)
Re-id integration:
- same appearance from two camera_ids same track_id
- different appearance different track_id
- stale + prune + re-add new track_id
- EMA: track adapts over multiple updates (feature norm stays 1)
"""
import sys
import os
import math
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._person_reid import (
H_BINS,
S_BINS,
FEAT_DIM,
TrackEntry,
TrackGallery,
cosine_similarity,
extract_hsv_histogram,
match_track,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _solid_bgr(b: int, g: int, r: int, size: int = 32) -> np.ndarray:
"""Return a solid-colour BGR uint8 image of given size."""
import cv2
img = np.full((size, size, 3), (b, g, r), dtype=np.uint8)
return img
def _unit(v: np.ndarray) -> np.ndarray:
n = np.linalg.norm(v)
return v / n if n > 1e-12 else v
def _make_feature(seed: int) -> np.ndarray:
"""Reproducible random L2-normalised feature vector."""
rng = np.random.default_rng(seed)
v = rng.random(FEAT_DIM).astype(np.float32)
return v / np.linalg.norm(v)
def _make_entry(
tid: int,
feature: np.ndarray,
camera_id: str = 'front',
now: float = 1000.0,
is_stale: bool = False,
) -> TrackEntry:
return TrackEntry(
track_id=tid,
camera_id=camera_id,
feature=feature.copy(),
confidence=1.0,
first_seen=now,
last_seen=now,
is_stale=is_stale,
)
# ── extract_hsv_histogram ─────────────────────────────────────────────────────
class TestExtractHsvHistogram:
def test_output_shape(self):
img = _solid_bgr(0, 0, 255)
hist = extract_hsv_histogram(img)
assert hist.shape == (FEAT_DIM,)
def test_output_dtype(self):
hist = extract_hsv_histogram(_solid_bgr(0, 255, 0))
assert hist.dtype == np.float32
def test_none_returns_zero(self):
hist = extract_hsv_histogram(None)
assert hist.shape == (FEAT_DIM,)
assert np.all(hist == 0.0)
def test_empty_array_returns_zero(self):
import numpy as np
empty = np.zeros((0, 0, 3), dtype=np.uint8)
hist = extract_hsv_histogram(empty)
assert np.all(hist == 0.0)
def test_solid_colour_is_unit_norm(self):
hist = extract_hsv_histogram(_solid_bgr(255, 0, 0))
assert np.linalg.norm(hist) == pytest.approx(1.0, rel=1e-5)
def test_same_colour_different_sizes_equal(self):
h16 = extract_hsv_histogram(_solid_bgr(0, 0, 200, size=16))
h64 = extract_hsv_histogram(_solid_bgr(0, 0, 200, size=64))
assert np.allclose(h16, h64, atol=1e-5)
def test_different_colours_different_histograms(self):
red = extract_hsv_histogram(_solid_bgr(0, 0, 255))
blue = extract_hsv_histogram(_solid_bgr(255, 0, 0))
assert not np.allclose(red, blue, atol=1e-3)
def test_red_vs_green_low_similarity(self):
red = extract_hsv_histogram(_solid_bgr(0, 0, 255))
green = extract_hsv_histogram(_solid_bgr(0, 255, 0))
sim = cosine_similarity(red, green)
assert sim < 0.5
def test_single_pixel_valid(self):
import numpy as np
px = np.array([[[0, 200, 200]]], dtype=np.uint8)
hist = extract_hsv_histogram(px)
assert hist.shape == (FEAT_DIM,)
assert np.linalg.norm(hist) == pytest.approx(1.0, rel=1e-5)
def test_feat_dim_constant(self):
assert FEAT_DIM == H_BINS * S_BINS
assert FEAT_DIM == 128
# ── cosine_similarity ─────────────────────────────────────────────────────────
class TestCosineSimilarity:
def test_identical_unit_vectors(self):
v = _make_feature(0)
assert cosine_similarity(v, v.copy()) == pytest.approx(1.0, rel=1e-6)
def test_zero_vector_a(self):
z = np.zeros(FEAT_DIM, dtype=np.float32)
v = _make_feature(1)
assert cosine_similarity(z, v) == pytest.approx(0.0, abs=1e-9)
def test_zero_vector_b(self):
v = _make_feature(2)
z = np.zeros(FEAT_DIM, dtype=np.float32)
assert cosine_similarity(v, z) == pytest.approx(0.0, abs=1e-9)
def test_both_zero(self):
z = np.zeros(FEAT_DIM, dtype=np.float32)
assert cosine_similarity(z, z) == pytest.approx(0.0, abs=1e-9)
def test_anti_parallel(self):
v = _make_feature(3)
assert cosine_similarity(v, -v) == pytest.approx(-1.0, rel=1e-6)
def test_orthogonal(self):
a = np.zeros(FEAT_DIM, dtype=np.float32)
b = np.zeros(FEAT_DIM, dtype=np.float32)
a[0] = 1.0
b[1] = 1.0
assert cosine_similarity(a, b) == pytest.approx(0.0, abs=1e-9)
def test_non_unit_same_direction(self):
v = _make_feature(4)
assert cosine_similarity(v * 5.0, v * 3.0) == pytest.approx(1.0, rel=1e-6)
def test_non_unit_opposite_direction(self):
v = _make_feature(5)
assert cosine_similarity(v * 7.0, -v * 2.0) == pytest.approx(-1.0, rel=1e-6)
def test_partial_overlap_in_range(self):
a = np.zeros(FEAT_DIM, dtype=np.float32)
b = np.zeros(FEAT_DIM, dtype=np.float32)
a[0] = 1.0
b[0] = 1.0
b[1] = 1.0
sim = cosine_similarity(a, b)
assert 0.0 < sim < 1.0
# ── match_track ───────────────────────────────────────────────────────────────
class TestMatchTrack:
def test_empty_gallery(self):
feat = _make_feature(10)
tid, sim = match_track(feat, {}, threshold=0.75)
assert tid is None
assert sim == pytest.approx(0.0, abs=1e-9)
def test_exact_match_above_threshold(self):
feat = _make_feature(11)
tracks = {1: _make_entry(1, feat)}
tid, sim = match_track(feat, tracks, threshold=0.75)
assert tid == 1
assert sim == pytest.approx(1.0, rel=1e-5)
def test_similarity_at_threshold_no_match(self):
"""Strict >: equal to threshold is not a match."""
feat = _make_feature(12)
# construct a feature at exactly cosine = threshold
# use two orthogonal features blended so sim == threshold
a = np.zeros(FEAT_DIM, dtype=np.float32)
b = np.zeros(FEAT_DIM, dtype=np.float32)
a[0] = 1.0
b[1] = 1.0
threshold = 0.75
# query is a; gallery is a → sim = 1.0, way above. Just test with threshold=1.0+eps
tracks = {1: _make_entry(1, a)}
tid, sim = match_track(a, tracks, threshold=1.0 + 1e-9)
assert tid is None
def test_below_threshold_no_match(self):
feat_a = _make_feature(13)
feat_b = _make_feature(14)
tracks = {1: _make_entry(1, feat_b)}
# Force threshold above any realistic similarity between random features
tid, sim = match_track(feat_a, tracks, threshold=0.9999)
if cosine_similarity(feat_a, feat_b) <= 0.9999:
assert tid is None
def test_stale_track_excluded(self):
feat = _make_feature(15)
tracks = {1: _make_entry(1, feat, is_stale=True)}
tid, sim = match_track(feat, tracks, threshold=0.5)
assert tid is None
def test_multiple_tracks_best_returned(self):
query = _make_feature(16)
close = query.copy() # sim = 1.0
far = _make_feature(17) # lower sim
tracks = {
1: _make_entry(1, far),
2: _make_entry(2, close),
}
tid, sim = match_track(query, tracks, threshold=0.5)
assert tid == 2
assert sim == pytest.approx(1.0, rel=1e-5)
def test_all_stale_no_match(self):
feat = _make_feature(18)
tracks = {
1: _make_entry(1, feat, is_stale=True),
2: _make_entry(2, feat, is_stale=True),
}
tid, sim = match_track(feat, tracks, threshold=0.5)
assert tid is None
def test_fresh_and_stale_returns_fresh(self):
feat = _make_feature(19)
tracks = {
1: _make_entry(1, feat, is_stale=True),
2: _make_entry(2, feat, is_stale=False),
}
tid, sim = match_track(feat, tracks, threshold=0.5)
assert tid == 2
# ── TrackGallery ──────────────────────────────────────────────────────────────
_BBOX = (10.0, 20.0, 50.0, 80.0)
class TestTrackGallery:
def test_add_increments_track_id(self):
g = TrackGallery()
now = 1000.0
t1 = g.add('front', _make_feature(20), _BBOX, now)
t2 = g.add('front', _make_feature(21), _BBOX, now)
t3 = g.add('rear', _make_feature(22), _BBOX, now)
assert t1 == 1
assert t2 == 2
assert t3 == 3
def test_add_stores_copy(self):
g = TrackGallery()
feat = _make_feature(23)
orig = feat.copy()
g.add('front', feat, _BBOX, 1000.0)
feat[0] = 99.9 # mutate original
stored = g.tracks[1].feature
assert np.allclose(stored, orig, atol=1e-6)
def test_add_sets_timestamps(self):
g = TrackGallery()
now = 5000.0
g.add('front', _make_feature(24), _BBOX, now)
e = g.tracks[1]
assert e.first_seen == pytest.approx(now)
assert e.last_seen == pytest.approx(now)
def test_add_initial_confidence_one(self):
g = TrackGallery()
g.add('front', _make_feature(25), _BBOX, 1000.0)
assert g.tracks[1].confidence == pytest.approx(1.0)
def test_update_changes_last_seen(self):
g = TrackGallery()
feat = _make_feature(26)
g.add('front', feat, _BBOX, 1000.0)
g.update(1, 'front', feat, _BBOX, 0.9, 2000.0)
assert g.tracks[1].last_seen == pytest.approx(2000.0)
assert g.tracks[1].first_seen == pytest.approx(1000.0) # preserved
def test_update_changes_camera_id(self):
g = TrackGallery()
feat = _make_feature(27)
g.add('front', feat, _BBOX, 1000.0)
g.update(1, 'right', feat, _BBOX, 0.8, 2000.0)
assert g.tracks[1].camera_id == 'right'
def test_update_applies_ema(self):
g = TrackGallery()
feat0 = _make_feature(28)
g.add('front', feat0, _BBOX, 1000.0)
feat1 = _make_feature(29)
g.update(1, 'front', feat1, _BBOX, 0.85, 2000.0)
stored = g.tracks[1].feature
# EMA: 0.3*feat1 + 0.7*feat0 (then renorm) — not equal to either
assert not np.allclose(stored, feat0, atol=1e-4)
assert not np.allclose(stored, feat1, atol=1e-4)
def test_update_feature_stays_unit_norm(self):
g = TrackGallery()
feat = _make_feature(30)
g.add('front', feat, _BBOX, 1000.0)
for i in range(10):
new_feat = _make_feature(31 + i)
g.update(1, 'front', new_feat, _BBOX, 0.9, 1000.0 + i)
assert np.linalg.norm(g.tracks[1].feature) == pytest.approx(1.0, rel=1e-5)
def test_update_clears_stale(self):
g = TrackGallery()
feat = _make_feature(41)
g.add('front', feat, _BBOX, 1000.0)
g.tracks[1].is_stale = True
g.update(1, 'front', feat, _BBOX, 0.9, 2000.0)
assert g.tracks[1].is_stale is False
def test_mark_stale_recent_track_stays_fresh(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(42), _BBOX, now)
g.mark_stale(now + 10.0, timeout_s=30.0)
assert g.tracks[1].is_stale is False
def test_mark_stale_old_track_flagged(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(43), _BBOX, now)
g.mark_stale(now + 31.0, timeout_s=30.0)
assert g.tracks[1].is_stale is True
def test_mark_stale_exactly_at_timeout_not_stale(self):
"""Timeout is strictly >, so last_seen + timeout_s is still fresh."""
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(44), _BBOX, now)
g.mark_stale(now + 30.0, timeout_s=30.0)
assert g.tracks[1].is_stale is False
def test_prune_stale_removes_flagged(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(45), _BBOX, now) # track 1
g.add('front', _make_feature(46), _BBOX, now) # track 2
g.tracks[1].is_stale = True
g.prune_stale()
assert 1 not in g.tracks
assert 2 in g.tracks
def test_prune_stale_keeps_fresh(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(47), _BBOX, now)
g.prune_stale()
assert 1 in g.tracks
def test_empty_gallery_match(self):
g = TrackGallery()
tid, sim = g.match(_make_feature(48), threshold=0.75)
assert tid is None
assert sim == pytest.approx(0.0, abs=1e-9)
def test_match_delegates_to_match_track(self):
g = TrackGallery()
feat = _make_feature(49)
g.add('front', feat, _BBOX, 1000.0)
tid, sim = g.match(feat, threshold=0.5)
assert tid == 1
assert sim == pytest.approx(1.0, rel=1e-5)
# ── Re-id integration ─────────────────────────────────────────────────────────
class TestReIdIntegration:
def test_same_appearance_two_cameras_same_track(self):
"""Same feature from two cameras → same track_id."""
g = TrackGallery()
feat = _make_feature(50)
now = 1000.0
# First sighting: front camera creates track 1
t1 = g.add('front', feat, _BBOX, now)
assert t1 == 1
# Second sighting: right camera matches → updates same track
tid, sim = g.match(feat, threshold=0.75)
assert tid == 1
g.update(tid, 'right', feat, _BBOX, float(sim), now + 1.0)
assert len(g.tracks) == 1
def test_different_appearance_different_tracks(self):
"""Two orthogonal features must not match → separate track IDs."""
g = TrackGallery()
now = 1000.0
# Construct two exactly orthogonal unit feature vectors
feat_a = np.zeros(FEAT_DIM, dtype=np.float32)
feat_b = np.zeros(FEAT_DIM, dtype=np.float32)
feat_a[0] = 1.0 # basis vector e0
feat_b[1] = 1.0 # basis vector e1 — cosine sim = 0.0
g.add('front', feat_a, _BBOX, now)
tid, sim = g.match(feat_b, threshold=0.75)
assert tid is None
g.add('front', feat_b, _BBOX, now + 0.1)
assert len(g.tracks) == 2
def test_stale_prune_readd_new_id(self):
g = TrackGallery()
feat = _make_feature(53)
now = 1000.0
t1 = g.add('front', feat, _BBOX, now)
assert t1 == 1
g.mark_stale(now + 31.0, timeout_s=30.0)
g.prune_stale()
assert len(g.tracks) == 0
# Re-add same person gets new ID
t2 = g.add('front', feat, _BBOX, now + 40.0)
assert t2 == 2 # next_id incremented past 1
def test_ema_feature_norm_stays_unit_after_many_updates(self):
g = TrackGallery()
feat = _make_feature(54)
g.add('front', feat, _BBOX, 0.0)
for i in range(50):
new_feat = _make_feature(55 + i)
g.update(1, 'front', new_feat, _BBOX, 0.9, float(i))
norm = np.linalg.norm(g.tracks[1].feature)
assert norm == pytest.approx(1.0, rel=1e-4)
def test_max_tracks_not_exceeded_in_gallery(self):
"""Gallery itself is unbounded; max_tracks enforcement is in the node.
Confirm gallery can hold many entries without errors."""
g = TrackGallery()
for i in range(25):
g.add('front', _make_feature(100 + i), _BBOX, float(i))
assert len(g.tracks) == 25
def test_hsv_same_image_produces_high_similarity(self):
"""Same-crop HSV histogram should have cosine sim ≈ 1."""
import cv2
img = _solid_bgr(50, 120, 200)
h1 = extract_hsv_histogram(img)
h2 = extract_hsv_histogram(img.copy())
assert cosine_similarity(h1, h2) == pytest.approx(1.0, rel=1e-5)
def test_hsv_different_colours_low_similarity(self):
"""Red vs cyan BGR should be dissimilar."""
red = extract_hsv_histogram(_solid_bgr(0, 0, 200))
cyan = extract_hsv_histogram(_solid_bgr(200, 200, 0))
assert cosine_similarity(red, cyan) < 0.5
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@ -19,6 +19,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #274 HSV color segmentation
"msg/ColorDetection.msg"
"msg/ColorDetectionArray.msg"
# Issue #322 cross-camera person re-identification
"msg/PersonTrack.msg"
"msg/PersonTrackArray.msg"
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
)

View File

@ -0,0 +1,19 @@
# PersonTrack.msg — stable cross-camera person identity track (Issue #322)
#
# track_id : monotonically-increasing track ID, stable across camera views
# camera_id : source camera label ("front", "right", "rear", "left")
# bbox : last-known bounding box in the source image (centre + size, pixels)
# confidence : cosine-similarity of the matched appearance feature (01);
# 1.0 for tracks created from a first-time observation
# first_seen : wall-clock time when this track was first created
# last_seen : wall-clock time of the most recent appearance observation
# is_stale : true when (now last_seen) > stale_timeout_s
#
std_msgs/Header header
uint32 track_id
string camera_id
vision_msgs/BoundingBox2D bbox
float32 confidence
builtin_interfaces/Time first_seen
builtin_interfaces/Time last_seen
bool is_stale

View File

@ -0,0 +1,4 @@
# PersonTrackArray.msg — all active person tracks from one camera view (Issue #322)
#
std_msgs/Header header
PersonTrack[] tracks