Merge pull request 'feat: Issue #363 — P0 person tracking for follow-me mode' (#367) from sl-perception/issue-363-person-tracking into main

This commit is contained in:
sl-jetson 2026-03-03 15:20:26 -05:00
commit eac203ecf4
6 changed files with 1716 additions and 0 deletions

View File

@ -0,0 +1,729 @@
"""
_person_tracker.py Multi-person tracker for follow-me mode (no ROS2 deps).
Pipeline (called once per colour frame)
-----------------------------------------
1. Person detections (bounding boxes + detector confidence) arrive from an
external detector (YOLOv8n, MobileNetSSD, etc.) not this module's concern.
2. Active tracks are predicted one step forward via a constant-velocity Kalman.
3. Detections are matched to predicted tracks with greedy IoU iou_threshold.
4. Unmatched detections that survive re-ID histogram matching against LOST
tracks are reattached to those tracks (brief-occlusion recovery).
5. Remaining unmatched detections start new TENTATIVE tracks.
6. Tracks not updated for max_lost_frames are removed permanently.
7. Bearing and range to a designated follow target are derived from camera
intrinsics + an aligned depth image.
Re-identification
-----------------
Each track stores an HSV colour histogram of the person's torso region
(middle 50 % of bbox height, centre 80 % width). After occlusion, new
detections whose histogram Bhattacharyya similarity exceeds reid_threshold
*and* whose predicted position is within reid_max_dist pixels are candidates
for re-identification. Closest histogram match wins.
Kalman state (8-D, one per track)
----------------------------------
x = [cx, cy, w, h, vcx, vcy, vw, vh]
Measurement z = [cx, cy, w, h]
dt = 1 frame (constant velocity model)
Public API
----------
BBox NamedTuple (x, y, w, h) pixel coordinates
Detection NamedTuple (bbox, confidence, frame_bgr)
TrackState Enum TENTATIVE / ACTIVE / LOST
PersonTrack dataclass per-track state snapshot
PersonTracker class .update() list[PersonTrack]
iou(a, b) float
bearing_from_pixel(u, cx_px, fx) float (degrees)
depth_at_bbox(depth_u16, bbox, ) (depth_m, quality)
extract_torso_hist(bgr, bbox, ) ndarray | None
hist_similarity(h1, h2) float (0 = different, 1 = same)
KalmanBoxFilter class
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from enum import IntEnum
from typing import List, NamedTuple, Optional, Tuple
import numpy as np
# ── Simple types ──────────────────────────────────────────────────────────────
class BBox(NamedTuple):
"""Axis-aligned bounding box in pixel coordinates."""
x: int # left edge
y: int # top edge
w: int # width (≥ 1)
h: int # height (≥ 1)
class Detection(NamedTuple):
"""One person detection from an external detector."""
bbox: BBox
confidence: float # 01 detector score
frame_bgr: Optional[np.ndarray] = None # colour frame (for histogram)
class TrackState(IntEnum):
TENTATIVE = 0 # seen < min_hits frames; not yet published
ACTIVE = 1 # confirmed; published to follow-me controller
LOST = 2 # missing; still kept for re-ID up to max_lost_frames
# ── Depth quality levels ──────────────────────────────────────────────────────
DEPTH_INVALID = 0
DEPTH_EXTRAPOLATED = 1
DEPTH_MARGINAL = 2
DEPTH_GOOD = 3
# ── IoU ───────────────────────────────────────────────────────────────────────
def iou(a: BBox, b: BBox) -> float:
"""Intersection-over-union of two bounding boxes."""
ax2, ay2 = a.x + a.w, a.y + a.h
bx2, by2 = b.x + b.w, b.y + b.h
ix1 = max(a.x, b.x); iy1 = max(a.y, b.y)
ix2 = min(ax2, bx2); iy2 = min(ay2, by2)
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
union = a.w * a.h + b.w * b.h - inter
return float(inter) / max(float(union), 1e-6)
# ── Kalman box filter ─────────────────────────────────────────────────────────
class KalmanBoxFilter:
"""
Constant-velocity 8-state Kalman filter for bounding-box tracking.
State x = [cx, cy, w, h, vcx, vcy, vw, vh]
Meas z = [cx, cy, w, h]
Process noise Q: position uncertainty σ=0.1 px/frame²,
velocity uncertainty σ=10 px/frame
Measurement noise R: ±2 px std-dev on bbox edges
"""
def __init__(self, initial_bbox: BBox) -> None:
# Transition matrix (dt = 1 frame)
self._F = np.eye(8, dtype=np.float64)
self._F[:4, 4:] = np.eye(4)
# Measurement matrix
self._H = np.zeros((4, 8), dtype=np.float64)
self._H[:4, :4] = np.eye(4)
# Process noise
self._Q = np.diag([1., 1., 1., 1., 100., 100., 100., 100.]).astype(np.float64)
# Measurement noise (~2 px std-dev → var = 4)
self._R = np.eye(4, dtype=np.float64) * 4.0
# Initial state
cx = initial_bbox.x + initial_bbox.w * 0.5
cy = initial_bbox.y + initial_bbox.h * 0.5
self._x = np.array(
[cx, cy, float(initial_bbox.w), float(initial_bbox.h), 0., 0., 0., 0.],
dtype=np.float64,
)
# Initial covariance — small pos uncertainty, large velocity uncertainty
self._P = np.diag([10., 10., 10., 10., 1000., 1000., 1000., 1000.]).astype(np.float64)
# -- predict ──────────────────────────────────────────────────────────────
def predict(self) -> BBox:
"""Advance state one step forward; return predicted BBox."""
self._x = self._F @ self._x
self._P = self._F @ self._P @ self._F.T + self._Q
return self._to_bbox()
# -- update ───────────────────────────────────────────────────────────────
def update(self, bbox: BBox) -> BBox:
"""Correct state with observation; return corrected BBox."""
z = np.array(
[bbox.x + bbox.w * 0.5, bbox.y + bbox.h * 0.5,
float(bbox.w), float(bbox.h)],
dtype=np.float64,
)
y = z - self._H @ self._x
S = self._H @ self._P @ self._H.T + self._R
K = self._P @ self._H.T @ np.linalg.inv(S)
self._x = self._x + K @ y
self._P = (np.eye(8) - K @ self._H) @ self._P
return self._to_bbox()
# -- accessors ────────────────────────────────────────────────────────────
@property
def velocity_px(self) -> Tuple[float, float]:
"""(vcx, vcy) in pixels / frame from Kalman state."""
return float(self._x[4]), float(self._x[5])
@property
def bbox(self) -> BBox:
return self._to_bbox()
def _to_bbox(self) -> BBox:
cx, cy, w, h = self._x[:4]
w = max(1.0, w); h = max(1.0, h)
return BBox(
int(round(cx - w * 0.5)),
int(round(cy - h * 0.5)),
int(round(w)),
int(round(h)),
)
# ── Colour histogram (HSV torso) ──────────────────────────────────────────────
_HIST_H_BINS = 16
_HIST_S_BINS = 8
_HIST_SIZE = _HIST_H_BINS * _HIST_S_BINS # 128
def extract_torso_hist(
bgr: np.ndarray,
bbox: BBox,
h_bins: int = _HIST_H_BINS,
s_bins: int = _HIST_S_BINS,
) -> Optional[np.ndarray]:
"""
Extract a normalised HSV colour histogram from the torso region of a
person bounding box.
Torso region: middle 50 % of bbox height, centre 80 % of bbox width.
Parameters
----------
bgr : (H, W, 3) uint8 colour image
bbox : person bounding box
h_bins, s_bins : histogram bins for H and S channels
Returns
-------
Normalised 1-D histogram of length h_bins * s_bins, or None if the
crop is too small or bgr is None.
"""
if bgr is None:
return None
ih, iw = bgr.shape[:2]
# Torso crop
y0 = bbox.y + bbox.h // 4 # skip top 25 % (head)
y1 = bbox.y + bbox.h * 3 // 4 # skip bottom 25 % (legs)
x0 = bbox.x + bbox.w // 10
x1 = bbox.x + bbox.w * 9 // 10
y0 = max(0, y0); y1 = min(ih, y1)
x0 = max(0, x0); x1 = min(iw, x1)
if y1 - y0 < 4 or x1 - x0 < 4:
return None
crop = bgr[y0:y1, x0:x1]
# BGR → HSV (manual, no OpenCV dependency for tests)
crop_f = crop.astype(np.float32) / 255.0
r, g, b = crop_f[..., 2], crop_f[..., 1], crop_f[..., 0]
cmax = np.maximum(np.maximum(r, g), b)
cmin = np.minimum(np.minimum(r, g), b)
delta_h = cmax - cmin + 1e-7 # epsilon only for hue angle computation
# Hue in [0, 360)
h = np.where(
cmax == r, 60.0 * ((g - b) / delta_h % 6),
np.where(cmax == g, 60.0 * ((b - r) / delta_h + 2),
60.0 * ((r - g) / delta_h + 4)),
)
h = np.clip(h % 360.0, 0.0, 359.9999)
# Saturation in [0, 1] — no epsilon so pure-colour pixels stay ≤ 1.0
s = np.clip(np.where(cmax > 1e-6, (cmax - cmin) / cmax, 0.0), 0.0, 1.0)
h_flat = h.ravel()
s_flat = s.ravel()
hist, _, _ = np.histogram2d(
h_flat, s_flat,
bins=[h_bins, s_bins],
range=[[0, 360], [0, 1]],
)
hist = hist.ravel().astype(np.float32)
total = hist.sum()
if total > 0:
hist /= total
return hist
def hist_similarity(h1: np.ndarray, h2: np.ndarray) -> float:
"""
Bhattacharyya similarity between two normalised histograms.
Returns a value in [0, 1]: 1 = identical, 0 = completely different.
"""
bc = float(np.sum(np.sqrt(h1 * h2)))
return float(np.clip(bc, 0.0, 1.0))
# ── Camera geometry ───────────────────────────────────────────────────────────
def bearing_from_pixel(
u: float,
cx_px: float,
fx: float,
) -> float:
"""
Convert a horizontal pixel coordinate to a bearing angle.
Parameters
----------
u : pixel column (horizontal image coordinate)
cx_px : principal point x (from CameraInfo.K[2])
fx : horizontal focal length in pixels (from CameraInfo.K[0])
Returns
-------
bearing_deg : signed degrees; positive = right of camera centre.
"""
return math.degrees(math.atan2(float(u - cx_px), float(fx)))
def depth_at_bbox(
depth_u16: np.ndarray,
bbox: BBox,
depth_scale: float = 0.001,
window_frac: float = 0.3,
) -> Tuple[float, int]:
"""
Sample median depth from the central torso region of a bounding box in a
uint16 depth image (D435i mm units by default).
Parameters
----------
depth_u16 : (H, W) uint16 depth image
bbox : person bounding box (colour image coordinates, assumed aligned)
depth_scale : multiply raw value to get metres (D435i: 0.001)
window_frac : fraction of bbox dimensions to use as central sample window
Returns
-------
(depth_m, quality)
depth_m : median depth in metres (0.0 when no valid pixels)
quality : DEPTH_GOOD / DEPTH_MARGINAL / DEPTH_EXTRAPOLATED / DEPTH_INVALID
"""
ih, iw = depth_u16.shape
# Central window
wf = max(0.1, min(1.0, window_frac))
cx = bbox.x + bbox.w * 0.5
cy = bbox.y + bbox.h * 0.5
hw = bbox.w * wf * 0.5
hh = bbox.h * wf * 0.5
r0 = int(max(0, cy - hh))
r1 = int(min(ih, cy + hh + 1))
c0 = int(max(0, cx - hw))
c1 = int(min(iw, cx + hw + 1))
if r1 <= r0 or c1 <= c0:
return 0.0, DEPTH_INVALID
patch = depth_u16[r0:r1, c0:c1]
valid = patch[patch > 0]
n_total = patch.size
if len(valid) == 0:
return 0.0, DEPTH_INVALID
fill_ratio = len(valid) / max(n_total, 1)
depth_m = float(np.median(valid)) * depth_scale
if fill_ratio > 0.6:
quality = DEPTH_GOOD
elif fill_ratio > 0.25:
quality = DEPTH_MARGINAL
else:
quality = DEPTH_EXTRAPOLATED
return depth_m, quality
# ── Per-track state ───────────────────────────────────────────────────────────
@dataclass
class PersonTrack:
"""Current state of one tracked person."""
track_id: int
state: TrackState
bbox: BBox # smoothed Kalman bbox (colour img px)
bearing: float = 0.0 # degrees; 0 until cam params set
distance: float = 0.0 # metres; 0 until depth available
depth_qual: int = DEPTH_INVALID
confidence: float = 0.0 # 01 combined score
vel_u: float = 0.0 # Kalman horizontal velocity (px/frame)
vel_v: float = 0.0 # Kalman vertical velocity (px/frame)
hits: int = 0 # consecutive matched frames
age: int = 0 # total frames since creation
lost_age: int = 0 # consecutive unmatched frames
color_hist: Optional[np.ndarray] = None # HSV torso histogram
# Internal — not serialised
_kalman: Optional[KalmanBoxFilter] = field(default=None, repr=False)
# ── Camera parameters (minimal) ──────────────────────────────────────────────
@dataclass
class CamParams:
"""Minimal camera intrinsics needed for bearing calculation."""
fx: float = 615.0 # D435i 640×480 depth defaults
fy: float = 615.0
cx: float = 320.0
cy: float = 240.0
fps: float = 30.0 # frame rate (for velocity conversion px→m/s)
# ── PersonTracker ─────────────────────────────────────────────────────────────
class PersonTracker:
"""
Multi-person tracker combining Kalman prediction, IoU data association,
and HSV colour histogram re-identification.
Parameters
----------
iou_threshold : minimum IoU to consider a detection-track match
min_hits : frames before a track transitions TENTATIVE ACTIVE
max_lost_frames : frames a track survives without a detection before removal
reid_threshold : minimum histogram Bhattacharyya similarity for re-ID
reid_max_dist : max predicted-to-detection centre distance for re-ID (px)
"""
def __init__(
self,
iou_threshold: float = 0.30,
min_hits: int = 3,
max_lost_frames: int = 30,
reid_threshold: float = 0.55,
reid_max_dist: float = 150.0,
) -> None:
self.iou_threshold = iou_threshold
self.min_hits = min_hits
self.max_lost_frames = max_lost_frames
self.reid_threshold = reid_threshold
self.reid_max_dist = reid_max_dist
self._tracks: List[PersonTrack] = []
self._next_id: int = 0
# ── Public API ────────────────────────────────────────────────────────────
@property
def tracks(self) -> List[PersonTrack]:
return list(self._tracks)
def update(
self,
detections: List[Detection],
cam: Optional[CamParams] = None,
depth_u16: Optional[np.ndarray] = None,
depth_scale: float = 0.001,
) -> List[PersonTrack]:
"""
Process one frame of detections.
Parameters
----------
detections : list of Detection from an external detector
cam : camera intrinsics (for bearing computation); None = skip
depth_u16 : aligned uint16 depth image; None = depth unavailable
depth_scale : mm-to-metres scale factor (D435i default 0.001)
Returns
-------
list of PersonTrack (ACTIVE state only)
"""
# ── Step 1: Predict all tracks ────────────────────────────────────────
for trk in self._tracks:
if trk._kalman is not None:
trk.bbox = trk._kalman.predict()
trk.age += 1
# ── Step 2: IoU matching ──────────────────────────────────────────────
active_tracks = [t for t in self._tracks if t.state != TrackState.LOST]
matched_t, matched_d, unmatched_t, unmatched_d = \
self._match_iou(active_tracks, detections)
# ── Step 3: Update matched tracks ────────────────────────────────────
for t_idx, d_idx in zip(matched_t, matched_d):
trk = active_tracks[t_idx]
det = detections[d_idx]
if trk._kalman is not None:
trk.bbox = trk._kalman.update(det.bbox)
trk.vel_u, trk.vel_v = trk._kalman.velocity_px
trk.hits += 1
trk.lost_age = 0
trk.confidence = float(det.confidence)
if trk.state == TrackState.TENTATIVE and trk.hits >= self.min_hits:
trk.state = TrackState.ACTIVE
self._update_hist(trk, det)
# ── Step 4: Re-ID for unmatched detections vs LOST tracks ────────────
lost_tracks = [t for t in self._tracks if t.state == TrackState.LOST]
still_unmatched_d = list(unmatched_d)
for d_idx in list(still_unmatched_d):
det = detections[d_idx]
best_trk, best_sim = self._reid_match(det, lost_tracks)
if best_trk is not None:
if best_trk._kalman is not None:
best_trk.bbox = best_trk._kalman.update(det.bbox)
else:
best_trk.bbox = det.bbox
best_trk.state = TrackState.ACTIVE
best_trk.hits += 1
best_trk.lost_age = 0
best_trk.confidence = float(det.confidence)
self._update_hist(best_trk, det)
still_unmatched_d.remove(d_idx)
# ── Step 5: Create new tracks for still-unmatched detections ─────────
for d_idx in still_unmatched_d:
det = detections[d_idx]
trk = PersonTrack(
track_id = self._next_id,
state = TrackState.TENTATIVE,
bbox = det.bbox,
hits = 1,
confidence= float(det.confidence),
_kalman = KalmanBoxFilter(det.bbox),
)
self._update_hist(trk, det)
self._tracks.append(trk)
self._next_id += 1
# ── Step 6: Age lost tracks, remove stale ────────────────────────────
# Mark newly-unmatched tracks as LOST (reset lost_age to 0)
for t_idx in unmatched_t:
trk = active_tracks[t_idx]
if trk.state != TrackState.LOST:
trk.lost_age = 0
trk.state = TrackState.LOST
# Increment lost_age for every LOST track (including previously LOST ones)
for trk in self._tracks:
if trk.state == TrackState.LOST:
trk.lost_age += 1
self._tracks = [
t for t in self._tracks
if t.lost_age < self.max_lost_frames
]
# ── Step 7: Update bearing / depth for all active tracks ─────────────
for trk in self._tracks:
if trk.state != TrackState.ACTIVE:
continue
u_centre = trk.bbox.x + trk.bbox.w * 0.5
if cam is not None:
trk.bearing = bearing_from_pixel(u_centre, cam.cx, cam.fx)
if depth_u16 is not None:
trk.distance, trk.depth_qual = depth_at_bbox(
depth_u16, trk.bbox, depth_scale=depth_scale
)
return [t for t in self._tracks if t.state == TrackState.ACTIVE]
def reset(self) -> None:
"""Clear all tracks."""
self._tracks.clear()
self._next_id = 0
# ── Internal helpers ──────────────────────────────────────────────────────
def _match_iou(
self,
tracks: List[PersonTrack],
detections: List[Detection],
) -> Tuple[List[int], List[int], List[int], List[int]]:
"""
Greedy IoU matching between tracks and detections.
Returns (matched_t_idx, matched_d_idx, unmatched_t_idx, unmatched_d_idx).
"""
if not tracks or not detections:
return [], [], list(range(len(tracks))), list(range(len(detections)))
iou_mat = np.zeros((len(tracks), len(detections)), dtype=np.float32)
for ti, trk in enumerate(tracks):
for di, det in enumerate(detections):
iou_mat[ti, di] = iou(trk.bbox, det.bbox)
matched_t: List[int] = []
matched_d: List[int] = []
used_t = set()
used_d = set()
# Greedy: highest IoU first
flat_order = np.argsort(iou_mat.ravel())[::-1]
for flat_idx in flat_order:
ti, di = divmod(int(flat_idx), len(detections))
if iou_mat[ti, di] < self.iou_threshold:
break
if ti not in used_t and di not in used_d:
matched_t.append(ti)
matched_d.append(di)
used_t.add(ti)
used_d.add(di)
unmatched_t = [i for i in range(len(tracks)) if i not in used_t]
unmatched_d = [i for i in range(len(detections)) if i not in used_d]
return matched_t, matched_d, unmatched_t, unmatched_d
def _reid_match(
self,
det: Detection,
lost: List[PersonTrack],
) -> Tuple[Optional[PersonTrack], float]:
"""
Find the best re-identification match for a detection among lost tracks.
Returns (best_track, similarity) or (None, 0.0) if no match found.
"""
if not lost:
return None, 0.0
det_hist = None
if det.frame_bgr is not None:
det_hist = extract_torso_hist(det.frame_bgr, det.bbox)
best_trk: Optional[PersonTrack] = None
best_sim: float = 0.0
det_cx = det.bbox.x + det.bbox.w * 0.5
det_cy = det.bbox.y + det.bbox.h * 0.5
for trk in lost:
# Spatial gate: predicted centre must be close enough
trk_cx = trk.bbox.x + trk.bbox.w * 0.5
trk_cy = trk.bbox.y + trk.bbox.h * 0.5
dist = math.sqrt((det_cx - trk_cx) ** 2 + (det_cy - trk_cy) ** 2)
if dist > self.reid_max_dist:
continue
# Histogram similarity
if det_hist is not None and trk.color_hist is not None:
sim = hist_similarity(det_hist, trk.color_hist)
if sim > self.reid_threshold and sim > best_sim:
best_sim = sim
best_trk = trk
return best_trk, best_sim
@staticmethod
def _update_hist(trk: PersonTrack, det: Detection) -> None:
"""Update track's colour histogram with exponential decay."""
if det.frame_bgr is None:
return
new_hist = extract_torso_hist(det.frame_bgr, det.bbox)
if new_hist is None:
return
if trk.color_hist is None:
trk.color_hist = new_hist
else:
# Running average (α = 0.3 — new frame contributes 30 %)
trk.color_hist = 0.7 * trk.color_hist + 0.3 * new_hist
# ── Follow-target selector ────────────────────────────────────────────────────
class FollowTargetSelector:
"""
Selects and locks onto a single PersonTrack as the follow target.
Strategy
--------
On start() or when no target is locked: choose the nearest active track
(by depth distance, or by image-centre proximity when depth unavailable).
Re-lock onto the same track_id each frame (continuous tracking).
If the locked track disappears: hold the last known state for
`hold_frames` frames, then go inactive.
"""
def __init__(self, hold_frames: int = 15) -> None:
self.hold_frames = hold_frames
self._target_id: Optional[int] = None
self._last_target: Optional[PersonTrack] = None
self._held_frames: int = 0
self._active: bool = False
# -- control ──────────────────────────────────────────────────────────────
def start(self) -> None:
"""(Re-)enable follow mode; re-select target on next update."""
self._active = True
self._target_id = None
self._held_frames = 0
def stop(self) -> None:
"""Disable follow mode."""
self._active = False
self._target_id = None
self._last_target = None
# -- update ───────────────────────────────────────────────────────────────
def update(
self,
active_tracks: List[PersonTrack],
img_cx: float = 320.0, # image centre x (px)
) -> Optional[PersonTrack]:
"""
Select the follow target from a list of active tracks.
Returns the locked PersonTrack, or None if follow mode is inactive or
no candidate found.
"""
if not self._active:
return None
if not active_tracks:
if self._last_target is not None and self._held_frames < self.hold_frames:
self._held_frames += 1
return self._last_target
self._last_target = None
return None
self._held_frames = 0
# Re-find locked track
if self._target_id is not None:
for t in active_tracks:
if t.track_id == self._target_id:
self._last_target = t
return t
# Locked track lost — re-select
self._target_id = None
# Select: prefer by smallest distance, then by image-centre proximity
def _score(t: PersonTrack) -> float:
if t.distance > 0:
return t.distance
return abs((t.bbox.x + t.bbox.w * 0.5) - img_cx)
chosen = min(active_tracks, key=_score)
self._target_id = chosen.track_id
self._last_target = chosen
return chosen

View File

@ -0,0 +1,371 @@
"""
person_tracking_node.py P0 follow-me person tracking node (Issue #363).
Runs a real-time person detection + tracking pipeline on the D435i colour and
depth streams and publishes a single TargetTrack message for the follow-me
motion controller.
Detection backend
-----------------
Attempts YOLOv8n via ultralytics (auto-converted to TensorRT FP16 on first
run for 15 fps on Orin Nano). Falls back to a simple HOG+SVM detector
when ultralytics is unavailable.
Subscribes
----------
/camera/color/image_raw sensor_msgs/Image (BEST_EFFORT)
/camera/depth/image_rect_raw sensor_msgs/Image (BEST_EFFORT)
/camera/depth/camera_info sensor_msgs/CameraInfo (RELIABLE)
/saltybot/follow_start std_msgs/Empty start/resume following
/saltybot/follow_stop std_msgs/Empty stop following
Publishes
---------
/saltybot/target_track saltybot_scene_msgs/TargetTrack
Parameters
----------
detector_model str 'yolov8n.pt' Ultralytics model file or TRT engine
use_tensorrt bool True Convert to TensorRT FP16 on first run
max_fps float 30.0 Maximum processing rate (Hz)
iou_threshold float 0.30 Tracker IoU matching threshold
min_hits int 3 Frames before TENTATIVE ACTIVE
max_lost_frames int 30 Frames a track survives without det
reid_threshold float 0.55 HSV histogram re-ID similarity cutoff
depth_scale float 0.001 D435i raw-to-metres scale
depth_max_m float 5.0 Range beyond which depth degrades
auto_follow bool True Auto-select nearest person on start
"""
from __future__ import annotations
import threading
import time
from typing import List, Optional
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy,
)
from sensor_msgs.msg import CameraInfo, Image
from std_msgs.msg import Empty
from saltybot_scene_msgs.msg import TargetTrack
from ._person_tracker import (
BBox, CamParams, Detection,
PersonTracker, FollowTargetSelector,
DEPTH_GOOD, DEPTH_MARGINAL,
)
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=2,
)
_RELIABLE_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
# ── Detector wrappers ─────────────────────────────────────────────────────────
class _YoloDetector:
"""Lazy-initialised YOLOv8n person detector (ultralytics / TensorRT)."""
def __init__(
self,
model_path: str = 'yolov8n.pt',
use_tensorrt: bool = True,
logger=None,
) -> None:
self._model_path = model_path
self._use_trt = use_tensorrt
self._log = logger
self._model = None
self._available = False
self._ready = threading.Event()
threading.Thread(target=self._load, daemon=True).start()
def _load(self) -> None:
try:
from ultralytics import YOLO
model = YOLO(self._model_path)
if self._use_trt:
try:
model = YOLO(model.export(format='engine', half=True, device=0))
except Exception as e:
if self._log:
self._log.warn(f'TRT export failed ({e}); using PyTorch')
self._model = model
self._available = True
if self._log:
self._log.info(f'YOLO detector loaded: {self._model_path}')
except Exception as e:
self._available = False
if self._log:
self._log.warn(f'ultralytics not available ({e}); using HOG fallback')
finally:
self._ready.set()
def detect(self, bgr: np.ndarray, conf_thresh: float = 0.40) -> List[Detection]:
if not self._ready.wait(timeout=0.01) or not self._available:
return []
results = self._model(bgr, classes=[0], conf=conf_thresh, verbose=False)
dets: List[Detection] = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = (int(v) for v in box.xyxy[0].cpu().numpy())
w, h = max(1, x2 - x1), max(1, y2 - y1)
dets.append(Detection(
bbox = BBox(x1, y1, w, h),
confidence = float(box.conf[0]),
frame_bgr = bgr,
))
return dets
class _HogDetector:
"""OpenCV HOG+SVM person detector — CPU fallback, ~510 fps."""
def __init__(self) -> None:
import cv2
self._hog = cv2.HOGDescriptor()
self._hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
def detect(self, bgr: np.ndarray, conf_thresh: float = 0.40) -> List[Detection]:
import cv2
small = cv2.resize(bgr, (320, 240)) if bgr.shape[1] > 320 else bgr
scale = bgr.shape[1] / small.shape[1]
rects, weights = self._hog.detectMultiScale(
small, winStride=(8, 8), padding=(4, 4), scale=1.05,
)
dets: List[Detection] = []
for rect, w in zip(rects, weights):
conf = float(np.clip(w, 0.0, 1.0))
if conf < conf_thresh:
continue
x, y, rw, rh = rect
dets.append(Detection(
bbox = BBox(
int(x * scale), int(y * scale),
int(rw * scale), int(rh * scale),
),
confidence = conf,
frame_bgr = bgr,
))
return dets
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class PersonTrackingNode(Node):
def __init__(self) -> None:
super().__init__('person_tracking_node')
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter('detector_model', 'yolov8n.pt')
self.declare_parameter('use_tensorrt', True)
self.declare_parameter('max_fps', 30.0)
self.declare_parameter('iou_threshold', 0.30)
self.declare_parameter('min_hits', 3)
self.declare_parameter('max_lost_frames', 30)
self.declare_parameter('reid_threshold', 0.55)
self.declare_parameter('depth_scale', 0.001)
self.declare_parameter('depth_max_m', 5.0)
self.declare_parameter('auto_follow', True)
p = self.get_parameter
self._min_period = 1.0 / max(float(p('max_fps').value), 1.0)
self._depth_scale = float(p('depth_scale').value)
self._depth_max_m = float(p('depth_max_m').value)
self._auto_follow = bool(p('auto_follow').value)
# ── Tracker + selector ───────────────────────────────────────────────
self._tracker = PersonTracker(
iou_threshold = float(p('iou_threshold').value),
min_hits = int(p('min_hits').value),
max_lost_frames = int(p('max_lost_frames').value),
reid_threshold = float(p('reid_threshold').value),
)
self._selector = FollowTargetSelector(hold_frames=15)
if self._auto_follow:
self._selector.start()
# ── Detector (lazy) ──────────────────────────────────────────────────
self._detector: Optional[_YoloDetector] = _YoloDetector(
model_path = str(p('detector_model').value),
use_tensorrt = bool(p('use_tensorrt').value),
logger = self.get_logger(),
)
self._hog_fallback: Optional[_HogDetector] = None
# ── Camera state ─────────────────────────────────────────────────────
self._cam = CamParams()
self._cam_received = False
self._depth_image: Optional[np.ndarray] = None
self._last_proc = 0.0
# ── Subscribers ──────────────────────────────────────────────────────
self._color_sub = self.create_subscription(
Image, '/camera/color/image_raw',
self._on_color, _SENSOR_QOS,
)
self._depth_sub = self.create_subscription(
Image, '/camera/depth/image_rect_raw',
self._on_depth, _SENSOR_QOS,
)
self._info_sub = self.create_subscription(
CameraInfo, '/camera/depth/camera_info',
self._on_cam_info, _RELIABLE_QOS,
)
self._start_sub = self.create_subscription(
Empty, '/saltybot/follow_start',
lambda _: self._selector.start(), 10,
)
self._stop_sub = self.create_subscription(
Empty, '/saltybot/follow_stop',
lambda _: self._selector.stop(), 10,
)
# ── Publisher ────────────────────────────────────────────────────────
self._pub = self.create_publisher(
TargetTrack, '/saltybot/target_track', 10
)
self.get_logger().info(
'person_tracking_node ready — '
f'auto_follow={self._auto_follow}, '
f'max_fps={1.0/self._min_period:.0f}'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _on_cam_info(self, msg: CameraInfo) -> None:
if self._cam_received:
return
self._cam = CamParams(
fx=float(msg.k[0]), fy=float(msg.k[4]),
cx=float(msg.k[2]), cy=float(msg.k[5]),
)
self._cam_received = True
self.get_logger().info(
f'Camera intrinsics: fx={self._cam.fx:.1f} cx={self._cam.cx:.1f}'
)
def _on_depth(self, msg: Image) -> None:
if msg.encoding.lower() not in ('16uc1', 'mono16'):
return
data = np.frombuffer(msg.data, dtype=np.uint16)
try:
self._depth_image = data.reshape((msg.height, msg.width))
except ValueError:
pass
def _on_color(self, msg: Image) -> None:
now = time.monotonic()
if now - self._last_proc < self._min_period:
return
self._last_proc = now
enc = msg.encoding.lower()
if enc not in ('bgr8', 'rgb8'):
return
data = np.frombuffer(msg.data, dtype=np.uint8)
try:
frame = data.reshape((msg.height, msg.width, 3))
except ValueError:
return
if enc == 'rgb8':
import cv2
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# ── Detect ────────────────────────────────────────────────────────
dets: List[Detection] = []
if self._detector and self._detector._ready.is_set():
if self._detector._available:
dets = self._detector.detect(frame)
else:
# Init HOG fallback on first YOLO failure
if self._hog_fallback is None:
try:
self._hog_fallback = _HogDetector()
except Exception:
pass
if self._hog_fallback:
dets = self._hog_fallback.detect(frame)
else:
# YOLO still loading — run HOG if available
if self._hog_fallback is None:
try:
self._hog_fallback = _HogDetector()
except Exception:
pass
if self._hog_fallback:
dets = self._hog_fallback.detect(frame)
# ── Track ─────────────────────────────────────────────────────────
active = self._tracker.update(
dets,
cam = self._cam if self._cam_received else None,
depth_u16 = self._depth_image,
depth_scale = self._depth_scale,
)
# ── Select target ─────────────────────────────────────────────────
target = self._selector.update(
active, img_cx=self._cam.cx
)
# ── Publish ───────────────────────────────────────────────────────
out = TargetTrack()
out.header.stamp = msg.header.stamp
out.header.frame_id = msg.header.frame_id or 'camera_color_optical_frame'
if target is not None:
out.tracking_active = True
out.track_id = target.track_id
out.bearing_deg = float(target.bearing)
out.distance_m = float(target.distance)
out.confidence = float(target.confidence)
out.bbox_x = int(target.bbox.x)
out.bbox_y = int(target.bbox.y)
out.bbox_w = int(target.bbox.w)
out.bbox_h = int(target.bbox.h)
out.depth_quality = int(target.depth_qual)
# Convert Kalman pixel velocity → bearing rate
if self._cam_received and target.vel_u != 0.0:
u_c = target.bbox.x + target.bbox.w * 0.5
# d(bearing)/du ≈ fx / (fx² + (u-cx)²) * (180/π)
denom = self._cam.fx ** 2 + (u_c - self._cam.cx) ** 2
d_bear_du = (self._cam.fx / denom) * (180.0 / 3.14159)
out.vel_bearing_dps = float(d_bear_du * target.vel_u * self._cam.fps)
# Distance velocity from depth (placeholder: not computed per-frame here)
out.vel_dist_mps = 0.0
else:
out.tracking_active = False
self._pub.publish(out)
def main(args=None) -> None:
rclpy.init(args=args)
node = PersonTrackingNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -61,6 +61,8 @@ setup(
'audio_scene = saltybot_bringup.audio_scene_node:main', 'audio_scene = saltybot_bringup.audio_scene_node:main',
# Face emotion classifier (Issue #359) # Face emotion classifier (Issue #359)
'face_emotion = saltybot_bringup.face_emotion_node:main', 'face_emotion = saltybot_bringup.face_emotion_node:main',
# Person tracking for follow-me mode (Issue #363)
'person_tracking = saltybot_bringup.person_tracking_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,599 @@
"""
test_person_tracker.py Unit tests for the P0 person tracking pipeline.
Tests cover: IoU, Kalman filter, colour histogram, bearing geometry,
depth sampling, tracker state machine, and follow-target selection.
No camera, no detector, no ROS2 needed.
"""
import math
import numpy as np
import pytest
from saltybot_bringup._person_tracker import (
BBox,
CamParams,
Detection,
FollowTargetSelector,
KalmanBoxFilter,
PersonTrack,
PersonTracker,
TrackState,
DEPTH_GOOD, DEPTH_MARGINAL, DEPTH_EXTRAPOLATED, DEPTH_INVALID,
bearing_from_pixel,
depth_at_bbox,
extract_torso_hist,
hist_similarity,
iou,
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
def _det(x=10, y=20, w=60, h=150, conf=0.85, frame=None) -> Detection:
return Detection(BBox(x, y, w, h), conf, frame)
def _solid_bgr(h=200, w=100, b=128, g=64, r=32) -> np.ndarray:
"""Uniform colour BGR image."""
return np.full((h, w, 3), (b, g, r), dtype=np.uint8)
def _depth_image(h=480, w=640, val_mm=2000) -> np.ndarray:
"""Uniform uint16 depth image at val_mm mm."""
return np.full((h, w), val_mm, dtype=np.uint16)
def _run_tracker_n(tracker, bbox, n=5, cam=None):
"""Feed the same detection to a tracker n times."""
for _ in range(n):
tracker.update([_det(*bbox)], cam=cam)
# ─────────────────────────────────────────────────────────────────────────────
# BBox
# ─────────────────────────────────────────────────────────────────────────────
def test_bbox_fields():
b = BBox(10, 20, 60, 150)
assert b.x == 10 and b.y == 20 and b.w == 60 and b.h == 150
def test_bbox_is_named_tuple():
b = BBox(0, 0, 1, 1)
assert isinstance(b, tuple)
# ─────────────────────────────────────────────────────────────────────────────
# IoU
# ─────────────────────────────────────────────────────────────────────────────
def test_iou_identical():
b = BBox(0, 0, 100, 100)
assert abs(iou(b, b) - 1.0) < 1e-6
def test_iou_no_overlap():
a = BBox(0, 0, 50, 50)
b = BBox(100, 0, 50, 50)
assert iou(a, b) == pytest.approx(0.0, abs=1e-6)
def test_iou_partial_overlap():
a = BBox(0, 0, 100, 100)
b = BBox(50, 0, 100, 100) # 50 % horizontal overlap
result = iou(a, b)
assert 0.0 < result < 1.0
def test_iou_exact_overlap_value():
a = BBox(0, 0, 100, 100) # area 10000
b = BBox(0, 0, 50, 100) # area 5000, inter=5000, union=10000
assert abs(iou(a, b) - 0.5) < 1e-6
def test_iou_contained():
a = BBox(0, 0, 100, 100)
b = BBox(25, 25, 50, 50) # b inside a
result = iou(a, b)
# inter = 2500, union = 10000+2500-2500 = 10000
assert abs(result - 0.25) < 1e-6
def test_iou_symmetric():
a = BBox(10, 10, 80, 80)
b = BBox(50, 50, 80, 80)
assert abs(iou(a, b) - iou(b, a)) < 1e-9
def test_iou_touching_edges():
a = BBox(0, 0, 50, 50)
b = BBox(50, 0, 50, 50) # touch at x=50 → zero overlap
assert iou(a, b) == pytest.approx(0.0, abs=1e-6)
# ─────────────────────────────────────────────────────────────────────────────
# KalmanBoxFilter
# ─────────────────────────────────────────────────────────────────────────────
def test_kalman_predict_returns_bbox():
kf = KalmanBoxFilter(BBox(10, 20, 60, 150))
pred = kf.predict()
assert isinstance(pred, BBox)
assert pred.w >= 1 and pred.h >= 1
def test_kalman_update_returns_bbox():
kf = KalmanBoxFilter(BBox(10, 20, 60, 150))
updated = kf.update(BBox(10, 20, 60, 150))
assert isinstance(updated, BBox)
def test_kalman_converges_to_stationary():
"""After many identical measurements, Kalman should converge near them."""
init = BBox(100, 100, 80, 160)
meas = BBox(102, 98, 80, 160)
kf = KalmanBoxFilter(init)
for _ in range(20):
kf.predict()
result = kf.update(meas)
# Should be within ~10 px of measurement
assert abs(result.x - meas.x) < 10
assert abs(result.y - meas.y) < 10
def test_kalman_predict_advances_with_velocity():
"""Give the filter a few frames of rightward motion; prediction overshoots."""
kf = KalmanBoxFilter(BBox(100, 100, 60, 150))
for i in range(5):
kf.predict()
kf.update(BBox(100 + i * 10, 100, 60, 150))
pred = kf.predict()
# After motion right, predicted cx should be further right
assert pred.x > 100 + 4 * 10 - 5 # at least near last position
def test_kalman_velocity_zero_initially():
kf = KalmanBoxFilter(BBox(100, 100, 60, 150))
assert kf.velocity_px == (0.0, 0.0)
def test_kalman_bbox_property():
b = BBox(50, 50, 80, 120)
kf = KalmanBoxFilter(b)
out = kf.bbox
assert isinstance(out, BBox)
# Should be near initial
assert abs(out.x - b.x) <= 2
assert abs(out.y - b.y) <= 2
def test_kalman_width_height_stay_positive():
kf = KalmanBoxFilter(BBox(10, 10, 5, 5))
for _ in range(30):
kf.predict()
assert kf.bbox.w >= 1 and kf.bbox.h >= 1
# ─────────────────────────────────────────────────────────────────────────────
# extract_torso_hist
# ─────────────────────────────────────────────────────────────────────────────
def test_torso_hist_shape():
bgr = _solid_bgr()
h = extract_torso_hist(bgr, BBox(0, 0, 100, 200))
assert h is not None
assert h.shape == (128,) # 16 * 8
def test_torso_hist_normalised():
bgr = _solid_bgr()
h = extract_torso_hist(bgr, BBox(0, 0, 100, 200))
assert h is not None
assert abs(h.sum() - 1.0) < 1e-5
def test_torso_hist_none_for_none_frame():
assert extract_torso_hist(None, BBox(0, 0, 100, 200)) is None
def test_torso_hist_none_for_tiny_bbox():
bgr = _solid_bgr()
assert extract_torso_hist(bgr, BBox(0, 0, 2, 2)) is None
def test_torso_hist_different_colours():
"""Two differently coloured crops should produce different histograms."""
bgr_red = np.full((200, 100, 3), (0, 0, 255), dtype=np.uint8)
bgr_blue = np.full((200, 100, 3), (255, 0, 0), dtype=np.uint8)
h_red = extract_torso_hist(bgr_red, BBox(0, 0, 100, 200))
h_blue = extract_torso_hist(bgr_blue, BBox(0, 0, 100, 200))
assert h_red is not None and h_blue is not None
assert not np.allclose(h_red, h_blue)
# ─────────────────────────────────────────────────────────────────────────────
# hist_similarity
# ─────────────────────────────────────────────────────────────────────────────
def test_hist_similarity_identical():
h = np.ones(128, dtype=np.float32) / 128.0
assert abs(hist_similarity(h, h) - 1.0) < 1e-6
def test_hist_similarity_orthogonal():
"""Non-overlapping histograms → 0 similarity."""
h1 = np.zeros(128, dtype=np.float32)
h2 = np.zeros(128, dtype=np.float32)
h1[:64] = 1.0 / 64
h2[64:] = 1.0 / 64
assert abs(hist_similarity(h1, h2)) < 1e-6
def test_hist_similarity_range():
rng = np.random.RandomState(0)
for _ in range(20):
h1 = rng.rand(128).astype(np.float32)
h2 = rng.rand(128).astype(np.float32)
h1 /= h1.sum(); h2 /= h2.sum()
s = hist_similarity(h1, h2)
assert 0.0 <= s <= 1.0 + 1e-6
def test_hist_similarity_symmetric():
h1 = np.random.RandomState(1).rand(128).astype(np.float32)
h2 = np.random.RandomState(2).rand(128).astype(np.float32)
h1 /= h1.sum(); h2 /= h2.sum()
assert abs(hist_similarity(h1, h2) - hist_similarity(h2, h1)) < 1e-6
# ─────────────────────────────────────────────────────────────────────────────
# bearing_from_pixel
# ─────────────────────────────────────────────────────────────────────────────
def test_bearing_centre_is_zero():
"""Pixel at principal point → bearing = 0°."""
assert abs(bearing_from_pixel(320.0, 320.0, 615.0)) < 1e-9
def test_bearing_right_is_positive():
b = bearing_from_pixel(400.0, 320.0, 615.0)
assert b > 0.0
def test_bearing_left_is_negative():
b = bearing_from_pixel(200.0, 320.0, 615.0)
assert b < 0.0
def test_bearing_symmetric():
b_right = bearing_from_pixel(400.0, 320.0, 615.0)
b_left = bearing_from_pixel(240.0, 320.0, 615.0)
assert abs(b_right + b_left) < 1e-9
def test_bearing_approx_at_45_deg():
"""u - cx = fx → atan(1) = 45°."""
bearing = bearing_from_pixel(935.0, 320.0, 615.0)
assert abs(bearing - 45.0) < 0.1
def test_bearing_degrees_not_radians():
"""Result must be in degrees (much larger than a radian value)."""
b = bearing_from_pixel(400.0, 320.0, 615.0)
assert abs(b) > 0.5 # atan2(80/615) ≈ 7.4°
# ─────────────────────────────────────────────────────────────────────────────
# depth_at_bbox
# ─────────────────────────────────────────────────────────────────────────────
def test_depth_uniform_field():
d = _depth_image(val_mm=2000)
depth_m, quality = depth_at_bbox(d, BBox(200, 150, 80, 200))
assert abs(depth_m - 2.0) < 0.01
assert quality == DEPTH_GOOD
def test_depth_zero_image_invalid():
d = np.zeros((480, 640), dtype=np.uint16)
depth_m, quality = depth_at_bbox(d, BBox(200, 150, 80, 200))
assert depth_m == 0.0
assert quality == DEPTH_INVALID
def test_depth_partial_fill_marginal():
d = np.zeros((480, 640), dtype=np.uint16)
# Fill only 40 % of the central window with valid readings
d[200:260, 280:360] = 1500
_, quality = depth_at_bbox(d, BBox(200, 150, 160, 200), window_frac=1.0)
assert quality in (DEPTH_MARGINAL, DEPTH_EXTRAPOLATED, DEPTH_GOOD)
def test_depth_scale_applied():
d = _depth_image(val_mm=3000)
depth_m, _ = depth_at_bbox(d, BBox(100, 100, 80, 150), depth_scale=0.001)
assert abs(depth_m - 3.0) < 0.01
def test_depth_out_of_bounds_bbox():
"""Bbox outside image should return DEPTH_INVALID."""
d = _depth_image()
_, quality = depth_at_bbox(d, BBox(700, 500, 80, 100)) # off-screen
assert quality == DEPTH_INVALID
def test_depth_at_5m():
d = _depth_image(val_mm=5000)
depth_m, _ = depth_at_bbox(d, BBox(200, 100, 80, 200))
assert abs(depth_m - 5.0) < 0.05
# ─────────────────────────────────────────────────────────────────────────────
# PersonTracker — state machine
# ─────────────────────────────────────────────────────────────────────────────
def test_tracker_new_track_tentative():
t = PersonTracker(min_hits=3)
active = t.update([_det()])
# min_hits=3: first frame → TENTATIVE, not in active output
assert len(active) == 0
assert len(t.tracks) == 1
assert t.tracks[0].state == TrackState.TENTATIVE
def test_tracker_track_becomes_active():
t = PersonTracker(min_hits=3)
for _ in range(3):
active = t.update([_det()])
assert len(active) == 1
assert active[0].state == TrackState.ACTIVE
def test_tracker_persistent_id():
t = PersonTracker(min_hits=2)
_run_tracker_n(t, (10, 20, 60, 150), n=4)
assert len(t.tracks) == 1
assert t.tracks[0].track_id == 0
def test_tracker_two_detections_two_tracks():
t = PersonTracker(min_hits=2)
# Two widely-separated boxes → two tracks
dets = [
Detection(BBox(10, 20, 60, 150), 0.9),
Detection(BBox(400, 20, 60, 150), 0.9),
]
for _ in range(3):
t.update(dets)
assert len(t.tracks) == 2
def test_tracker_ids_increment():
t = PersonTracker(min_hits=1)
t.update([_det(x=10)])
t.update([_det(x=10), _det(x=300)])
ids = {trk.track_id for trk in t.tracks}
assert len(ids) == 2
def test_tracker_track_goes_lost():
t = PersonTracker(min_hits=2, max_lost_frames=5)
_run_tracker_n(t, (10, 20, 60, 150), n=3)
# Now send no detections
t.update([])
lost = [tr for tr in t.tracks if tr.state == TrackState.LOST]
assert len(lost) == 1
def test_tracker_track_removed_after_max_lost():
t = PersonTracker(min_hits=2, max_lost_frames=3)
_run_tracker_n(t, (10, 20, 60, 150), n=3)
for _ in range(5):
t.update([])
assert len(t.tracks) == 0
def test_tracker_iou_matching_same_track():
"""Slightly moved detection should match the existing track (not create new)."""
t = PersonTracker(min_hits=2, iou_threshold=0.3)
_run_tracker_n(t, (10, 20, 60, 150), n=3)
n_before = len(t.tracks)
t.update([_det(x=15, y=20, w=60, h=150)]) # small shift, high IoU
assert len(t.tracks) == n_before
def test_tracker_no_overlap_creates_new_track():
t = PersonTracker(min_hits=2, iou_threshold=0.3)
_run_tracker_n(t, (10, 20, 60, 150), n=3)
t.update([_det(x=500, y=20, w=60, h=150)]) # far away → new track
assert len(t.tracks) == 2 # old (lost) + new (tentative)
def test_tracker_reset():
t = PersonTracker(min_hits=2)
_run_tracker_n(t, (10, 20, 60, 150), n=3)
t.reset()
assert len(t.tracks) == 0
assert t._next_id == 0
def test_tracker_bearing_set_with_cam():
cam = CamParams(fx=615.0, cx=320.0)
t = PersonTracker(min_hits=2)
_run_tracker_n(t, (10, 20, 60, 150), n=3, cam=cam)
active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE]
assert len(active) > 0
# bearing of a box at x=10..70 (cx ≈ 40, image cx=320) → negative (left)
assert active[0].bearing < 0.0
def test_tracker_distance_set_with_depth():
cam = CamParams(fx=615.0, cx=320.0)
t = PersonTracker(min_hits=2)
d = _depth_image(val_mm=2000)
for _ in range(3):
t.update([_det(x=200, y=100, w=80, h=200)], cam=cam, depth_u16=d)
active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE]
assert len(active) > 0
assert abs(active[0].distance - 2.0) < 0.1
def test_tracker_no_depth_distance_zero():
cam = CamParams()
t = PersonTracker(min_hits=2)
for _ in range(3):
t.update([_det()], cam=cam, depth_u16=None)
active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE]
assert len(active) > 0
assert active[0].distance == 0.0
# ─────────────────────────────────────────────────────────────────────────────
# PersonTracker — re-identification
# ─────────────────────────────────────────────────────────────────────────────
def test_reid_restores_track_id():
"""Person disappears for a few frames then reappears; same track_id."""
# Blue person in centre
bgr_blue = np.full((480, 640, 3), (200, 50, 0), dtype=np.uint8)
bbox = (250, 50, 80, 200)
t = PersonTracker(
min_hits=2, max_lost_frames=10,
reid_threshold=0.4, reid_max_dist=200.0,
)
# Establish track
for _ in range(3):
t.update([Detection(BBox(*bbox), 0.9, bgr_blue)])
track_id_before = t.tracks[0].track_id
# Disappear for 3 frames
for _ in range(3):
t.update([])
# Re-appear at same position with same colour
t.update([Detection(BBox(*bbox), 0.9, bgr_blue)])
# Track should be re-identified with the same ID
assert any(tr.track_id == track_id_before for tr in t.tracks)
def test_reid_different_colour_creates_new_track():
"""Person disappears; different colour appears same place → new track ID."""
bgr_blue = np.full((480, 640, 3), (200, 50, 0), dtype=np.uint8)
bgr_red = np.full((480, 640, 3), (0, 50, 200), dtype=np.uint8)
bbox = (250, 50, 80, 200)
t = PersonTracker(
min_hits=2, max_lost_frames=5,
reid_threshold=0.85, # strict threshold → red won't match blue
reid_max_dist=200.0,
)
for _ in range(3):
t.update([Detection(BBox(*bbox), 0.9, bgr_blue)])
track_id_before = t.tracks[0].track_id
for _ in range(2):
t.update([])
# Red person (different colour)
for _ in range(3):
t.update([Detection(BBox(*bbox), 0.9, bgr_red)])
new_ids = {tr.track_id for tr in t.tracks}
# Should contain a new ID (not only the original)
assert track_id_before not in new_ids or len(new_ids) > 1
# ─────────────────────────────────────────────────────────────────────────────
# FollowTargetSelector
# ─────────────────────────────────────────────────────────────────────────────
def _make_track(track_id, x, y, w, h, dist=0.0) -> PersonTrack:
trk = PersonTrack(
track_id=track_id, state=TrackState.ACTIVE,
bbox=BBox(x, y, w, h), distance=dist,
)
return trk
def test_selector_inactive_returns_none():
sel = FollowTargetSelector()
sel.stop()
result = sel.update([_make_track(0, 100, 50, 80, 200, dist=3.0)])
assert result is None
def test_selector_active_selects_nearest_by_distance():
sel = FollowTargetSelector()
sel.start()
tracks = [
_make_track(0, 100, 50, 80, 200, dist=4.0),
_make_track(1, 300, 50, 80, 200, dist=1.5),
]
result = sel.update(tracks, img_cx=320.0)
assert result.track_id == 1 # closer
def test_selector_locks_on_same_track():
sel = FollowTargetSelector()
sel.start()
tracks = [
_make_track(0, 100, 50, 80, 200, dist=2.0),
_make_track(1, 300, 50, 80, 200, dist=5.0),
]
first = sel.update(tracks, img_cx=320.0)
# Second call — switch distance but should keep locked ID
tracks[0] = _make_track(0, 100, 50, 80, 200, dist=2.0)
second = sel.update(tracks, img_cx=320.0)
assert second.track_id == first.track_id
def test_selector_holds_last_when_tracks_empty():
sel = FollowTargetSelector(hold_frames=5)
sel.start()
sel.update([_make_track(0, 100, 50, 80, 200, dist=2.0)])
# Now empty — should hold for up to 5 frames
result = sel.update([], img_cx=320.0)
assert result is not None
assert result.track_id == 0
def test_selector_stops_holding_after_hold_frames():
sel = FollowTargetSelector(hold_frames=2)
sel.start()
sel.update([_make_track(0, 100, 50, 80, 200)])
for _ in range(5):
result = sel.update([])
assert result is None
def test_selector_selects_by_centre_when_no_depth():
sel = FollowTargetSelector()
sel.start()
# Track 0: x=0..80 → centre x=40 (far left of image)
# Track 1: x=280..360 → centre x=320 (near image centre)
tracks = [
_make_track(0, 0, 50, 80, 200, dist=0.0),
_make_track(1, 280, 50, 80, 200, dist=0.0),
]
result = sel.update(tracks, img_cx=320.0)
assert result.track_id == 1 # nearer to image centre
def test_selector_restart_reselects():
sel = FollowTargetSelector()
sel.start()
t0 = _make_track(0, 100, 50, 80, 200, dist=2.0)
sel.update([t0])
sel.stop()
sel.start()
t1 = _make_track(1, 300, 50, 80, 200, dist=1.0)
result = sel.update([t0, t1])
assert result.track_id == 1 # re-selected nearest

View File

@ -35,6 +35,8 @@ rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #359 face emotion classifier # Issue #359 face emotion classifier
"msg/FaceEmotion.msg" "msg/FaceEmotion.msg"
"msg/FaceEmotionArray.msg" "msg/FaceEmotionArray.msg"
# Issue #363 person tracking for follow-me mode
"msg/TargetTrack.msg"
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
) )

View File

@ -0,0 +1,13 @@
std_msgs/Header header
bool tracking_active # false when no target is locked
uint32 track_id # persistent ID across frames
float32 bearing_deg # horizontal bearing to target (°, right=+, left=)
float32 distance_m # range to target (m); 0 = unknown / depth invalid
float32 confidence # 0.01.0 overall track quality
int32 bbox_x # colour-image bounding box (pixels, top-left origin)
int32 bbox_y
int32 bbox_w
int32 bbox_h
float32 vel_bearing_dps # bearing rate (°/s, from Kalman velocity state)
float32 vel_dist_mps # distance rate (m/s, + = moving away)
uint8 depth_quality # 0=invalid 1=extrapolated 2=marginal 3=good