Compare commits
No commits in common. "eac203ecf4f65fced7f0ba07fcb185e6cdbacd95" and "bcf848109b2b89f90ae2679a5a03cc18688e0d53" have entirely different histories.
eac203ecf4
...
bcf848109b
@ -1,729 +0,0 @@
|
|||||||
"""
|
|
||||||
_person_tracker.py — Multi-person tracker for follow-me mode (no ROS2 deps).
|
|
||||||
|
|
||||||
Pipeline (called once per colour frame)
|
|
||||||
-----------------------------------------
|
|
||||||
1. Person detections (bounding boxes + detector confidence) arrive from an
|
|
||||||
external detector (YOLOv8n, MobileNetSSD, etc.) — not this module's concern.
|
|
||||||
2. Active tracks are predicted one step forward via a constant-velocity Kalman.
|
|
||||||
3. Detections are matched to predicted tracks with greedy IoU ≥ iou_threshold.
|
|
||||||
4. Unmatched detections that survive re-ID histogram matching against LOST
|
|
||||||
tracks are reattached to those tracks (brief-occlusion recovery).
|
|
||||||
5. Remaining unmatched detections start new TENTATIVE tracks.
|
|
||||||
6. Tracks not updated for max_lost_frames are removed permanently.
|
|
||||||
7. Bearing and range to a designated follow target are derived from camera
|
|
||||||
intrinsics + an aligned depth image.
|
|
||||||
|
|
||||||
Re-identification
|
|
||||||
-----------------
|
|
||||||
Each track stores an HSV colour histogram of the person's torso region
|
|
||||||
(middle 50 % of bbox height, centre 80 % width). After occlusion, new
|
|
||||||
detections whose histogram Bhattacharyya similarity exceeds reid_threshold
|
|
||||||
*and* whose predicted position is within reid_max_dist pixels are candidates
|
|
||||||
for re-identification. Closest histogram match wins.
|
|
||||||
|
|
||||||
Kalman state (8-D, one per track)
|
|
||||||
----------------------------------
|
|
||||||
x = [cx, cy, w, h, vcx, vcy, vw, vh]
|
|
||||||
Measurement z = [cx, cy, w, h]
|
|
||||||
dt = 1 frame (constant velocity model)
|
|
||||||
|
|
||||||
Public API
|
|
||||||
----------
|
|
||||||
BBox NamedTuple (x, y, w, h) — pixel coordinates
|
|
||||||
Detection NamedTuple (bbox, confidence, frame_bgr)
|
|
||||||
TrackState Enum TENTATIVE / ACTIVE / LOST
|
|
||||||
PersonTrack dataclass per-track state snapshot
|
|
||||||
PersonTracker class .update() → list[PersonTrack]
|
|
||||||
|
|
||||||
iou(a, b) → float
|
|
||||||
bearing_from_pixel(u, cx_px, fx) → float (degrees)
|
|
||||||
depth_at_bbox(depth_u16, bbox, …) → (depth_m, quality)
|
|
||||||
extract_torso_hist(bgr, bbox, …) → ndarray | None
|
|
||||||
hist_similarity(h1, h2) → float (0 = different, 1 = same)
|
|
||||||
KalmanBoxFilter class
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import math
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from enum import IntEnum
|
|
||||||
from typing import List, NamedTuple, Optional, Tuple
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
# ── Simple types ──────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class BBox(NamedTuple):
|
|
||||||
"""Axis-aligned bounding box in pixel coordinates."""
|
|
||||||
x: int # left edge
|
|
||||||
y: int # top edge
|
|
||||||
w: int # width (≥ 1)
|
|
||||||
h: int # height (≥ 1)
|
|
||||||
|
|
||||||
|
|
||||||
class Detection(NamedTuple):
|
|
||||||
"""One person detection from an external detector."""
|
|
||||||
bbox: BBox
|
|
||||||
confidence: float # 0–1 detector score
|
|
||||||
frame_bgr: Optional[np.ndarray] = None # colour frame (for histogram)
|
|
||||||
|
|
||||||
|
|
||||||
class TrackState(IntEnum):
|
|
||||||
TENTATIVE = 0 # seen < min_hits frames; not yet published
|
|
||||||
ACTIVE = 1 # confirmed; published to follow-me controller
|
|
||||||
LOST = 2 # missing; still kept for re-ID up to max_lost_frames
|
|
||||||
|
|
||||||
|
|
||||||
# ── Depth quality levels ──────────────────────────────────────────────────────
|
|
||||||
DEPTH_INVALID = 0
|
|
||||||
DEPTH_EXTRAPOLATED = 1
|
|
||||||
DEPTH_MARGINAL = 2
|
|
||||||
DEPTH_GOOD = 3
|
|
||||||
|
|
||||||
|
|
||||||
# ── IoU ───────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def iou(a: BBox, b: BBox) -> float:
|
|
||||||
"""Intersection-over-union of two bounding boxes."""
|
|
||||||
ax2, ay2 = a.x + a.w, a.y + a.h
|
|
||||||
bx2, by2 = b.x + b.w, b.y + b.h
|
|
||||||
ix1 = max(a.x, b.x); iy1 = max(a.y, b.y)
|
|
||||||
ix2 = min(ax2, bx2); iy2 = min(ay2, by2)
|
|
||||||
inter = max(0, ix2 - ix1) * max(0, iy2 - iy1)
|
|
||||||
union = a.w * a.h + b.w * b.h - inter
|
|
||||||
return float(inter) / max(float(union), 1e-6)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Kalman box filter ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class KalmanBoxFilter:
|
|
||||||
"""
|
|
||||||
Constant-velocity 8-state Kalman filter for bounding-box tracking.
|
|
||||||
|
|
||||||
State x = [cx, cy, w, h, vcx, vcy, vw, vh]
|
|
||||||
Meas z = [cx, cy, w, h]
|
|
||||||
|
|
||||||
Process noise Q: position uncertainty σ=0.1 px/frame²,
|
|
||||||
velocity uncertainty σ=10 px/frame
|
|
||||||
Measurement noise R: ±2 px std-dev on bbox edges
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, initial_bbox: BBox) -> None:
|
|
||||||
# Transition matrix (dt = 1 frame)
|
|
||||||
self._F = np.eye(8, dtype=np.float64)
|
|
||||||
self._F[:4, 4:] = np.eye(4)
|
|
||||||
|
|
||||||
# Measurement matrix
|
|
||||||
self._H = np.zeros((4, 8), dtype=np.float64)
|
|
||||||
self._H[:4, :4] = np.eye(4)
|
|
||||||
|
|
||||||
# Process noise
|
|
||||||
self._Q = np.diag([1., 1., 1., 1., 100., 100., 100., 100.]).astype(np.float64)
|
|
||||||
|
|
||||||
# Measurement noise (~2 px std-dev → var = 4)
|
|
||||||
self._R = np.eye(4, dtype=np.float64) * 4.0
|
|
||||||
|
|
||||||
# Initial state
|
|
||||||
cx = initial_bbox.x + initial_bbox.w * 0.5
|
|
||||||
cy = initial_bbox.y + initial_bbox.h * 0.5
|
|
||||||
self._x = np.array(
|
|
||||||
[cx, cy, float(initial_bbox.w), float(initial_bbox.h), 0., 0., 0., 0.],
|
|
||||||
dtype=np.float64,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Initial covariance — small pos uncertainty, large velocity uncertainty
|
|
||||||
self._P = np.diag([10., 10., 10., 10., 1000., 1000., 1000., 1000.]).astype(np.float64)
|
|
||||||
|
|
||||||
# -- predict ──────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def predict(self) -> BBox:
|
|
||||||
"""Advance state one step forward; return predicted BBox."""
|
|
||||||
self._x = self._F @ self._x
|
|
||||||
self._P = self._F @ self._P @ self._F.T + self._Q
|
|
||||||
return self._to_bbox()
|
|
||||||
|
|
||||||
# -- update ───────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def update(self, bbox: BBox) -> BBox:
|
|
||||||
"""Correct state with observation; return corrected BBox."""
|
|
||||||
z = np.array(
|
|
||||||
[bbox.x + bbox.w * 0.5, bbox.y + bbox.h * 0.5,
|
|
||||||
float(bbox.w), float(bbox.h)],
|
|
||||||
dtype=np.float64,
|
|
||||||
)
|
|
||||||
y = z - self._H @ self._x
|
|
||||||
S = self._H @ self._P @ self._H.T + self._R
|
|
||||||
K = self._P @ self._H.T @ np.linalg.inv(S)
|
|
||||||
self._x = self._x + K @ y
|
|
||||||
self._P = (np.eye(8) - K @ self._H) @ self._P
|
|
||||||
return self._to_bbox()
|
|
||||||
|
|
||||||
# -- accessors ────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
@property
|
|
||||||
def velocity_px(self) -> Tuple[float, float]:
|
|
||||||
"""(vcx, vcy) in pixels / frame from Kalman state."""
|
|
||||||
return float(self._x[4]), float(self._x[5])
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bbox(self) -> BBox:
|
|
||||||
return self._to_bbox()
|
|
||||||
|
|
||||||
def _to_bbox(self) -> BBox:
|
|
||||||
cx, cy, w, h = self._x[:4]
|
|
||||||
w = max(1.0, w); h = max(1.0, h)
|
|
||||||
return BBox(
|
|
||||||
int(round(cx - w * 0.5)),
|
|
||||||
int(round(cy - h * 0.5)),
|
|
||||||
int(round(w)),
|
|
||||||
int(round(h)),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Colour histogram (HSV torso) ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
_HIST_H_BINS = 16
|
|
||||||
_HIST_S_BINS = 8
|
|
||||||
_HIST_SIZE = _HIST_H_BINS * _HIST_S_BINS # 128
|
|
||||||
|
|
||||||
|
|
||||||
def extract_torso_hist(
|
|
||||||
bgr: np.ndarray,
|
|
||||||
bbox: BBox,
|
|
||||||
h_bins: int = _HIST_H_BINS,
|
|
||||||
s_bins: int = _HIST_S_BINS,
|
|
||||||
) -> Optional[np.ndarray]:
|
|
||||||
"""
|
|
||||||
Extract a normalised HSV colour histogram from the torso region of a
|
|
||||||
person bounding box.
|
|
||||||
|
|
||||||
Torso region: middle 50 % of bbox height, centre 80 % of bbox width.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
bgr : (H, W, 3) uint8 colour image
|
|
||||||
bbox : person bounding box
|
|
||||||
h_bins, s_bins : histogram bins for H and S channels
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
Normalised 1-D histogram of length h_bins * s_bins, or None if the
|
|
||||||
crop is too small or bgr is None.
|
|
||||||
"""
|
|
||||||
if bgr is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
ih, iw = bgr.shape[:2]
|
|
||||||
|
|
||||||
# Torso crop
|
|
||||||
y0 = bbox.y + bbox.h // 4 # skip top 25 % (head)
|
|
||||||
y1 = bbox.y + bbox.h * 3 // 4 # skip bottom 25 % (legs)
|
|
||||||
x0 = bbox.x + bbox.w // 10
|
|
||||||
x1 = bbox.x + bbox.w * 9 // 10
|
|
||||||
|
|
||||||
y0 = max(0, y0); y1 = min(ih, y1)
|
|
||||||
x0 = max(0, x0); x1 = min(iw, x1)
|
|
||||||
|
|
||||||
if y1 - y0 < 4 or x1 - x0 < 4:
|
|
||||||
return None
|
|
||||||
|
|
||||||
crop = bgr[y0:y1, x0:x1]
|
|
||||||
|
|
||||||
# BGR → HSV (manual, no OpenCV dependency for tests)
|
|
||||||
crop_f = crop.astype(np.float32) / 255.0
|
|
||||||
r, g, b = crop_f[..., 2], crop_f[..., 1], crop_f[..., 0]
|
|
||||||
|
|
||||||
cmax = np.maximum(np.maximum(r, g), b)
|
|
||||||
cmin = np.minimum(np.minimum(r, g), b)
|
|
||||||
delta_h = cmax - cmin + 1e-7 # epsilon only for hue angle computation
|
|
||||||
|
|
||||||
# Hue in [0, 360)
|
|
||||||
h = np.where(
|
|
||||||
cmax == r, 60.0 * ((g - b) / delta_h % 6),
|
|
||||||
np.where(cmax == g, 60.0 * ((b - r) / delta_h + 2),
|
|
||||||
60.0 * ((r - g) / delta_h + 4)),
|
|
||||||
)
|
|
||||||
h = np.clip(h % 360.0, 0.0, 359.9999)
|
|
||||||
# Saturation in [0, 1] — no epsilon so pure-colour pixels stay ≤ 1.0
|
|
||||||
s = np.clip(np.where(cmax > 1e-6, (cmax - cmin) / cmax, 0.0), 0.0, 1.0)
|
|
||||||
|
|
||||||
h_flat = h.ravel()
|
|
||||||
s_flat = s.ravel()
|
|
||||||
|
|
||||||
hist, _, _ = np.histogram2d(
|
|
||||||
h_flat, s_flat,
|
|
||||||
bins=[h_bins, s_bins],
|
|
||||||
range=[[0, 360], [0, 1]],
|
|
||||||
)
|
|
||||||
hist = hist.ravel().astype(np.float32)
|
|
||||||
total = hist.sum()
|
|
||||||
if total > 0:
|
|
||||||
hist /= total
|
|
||||||
return hist
|
|
||||||
|
|
||||||
|
|
||||||
def hist_similarity(h1: np.ndarray, h2: np.ndarray) -> float:
|
|
||||||
"""
|
|
||||||
Bhattacharyya similarity between two normalised histograms.
|
|
||||||
|
|
||||||
Returns a value in [0, 1]: 1 = identical, 0 = completely different.
|
|
||||||
"""
|
|
||||||
bc = float(np.sum(np.sqrt(h1 * h2)))
|
|
||||||
return float(np.clip(bc, 0.0, 1.0))
|
|
||||||
|
|
||||||
|
|
||||||
# ── Camera geometry ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def bearing_from_pixel(
|
|
||||||
u: float,
|
|
||||||
cx_px: float,
|
|
||||||
fx: float,
|
|
||||||
) -> float:
|
|
||||||
"""
|
|
||||||
Convert a horizontal pixel coordinate to a bearing angle.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
u : pixel column (horizontal image coordinate)
|
|
||||||
cx_px : principal point x (from CameraInfo.K[2])
|
|
||||||
fx : horizontal focal length in pixels (from CameraInfo.K[0])
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
bearing_deg : signed degrees; positive = right of camera centre.
|
|
||||||
"""
|
|
||||||
return math.degrees(math.atan2(float(u - cx_px), float(fx)))
|
|
||||||
|
|
||||||
|
|
||||||
def depth_at_bbox(
|
|
||||||
depth_u16: np.ndarray,
|
|
||||||
bbox: BBox,
|
|
||||||
depth_scale: float = 0.001,
|
|
||||||
window_frac: float = 0.3,
|
|
||||||
) -> Tuple[float, int]:
|
|
||||||
"""
|
|
||||||
Sample median depth from the central torso region of a bounding box in a
|
|
||||||
uint16 depth image (D435i mm units by default).
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
depth_u16 : (H, W) uint16 depth image
|
|
||||||
bbox : person bounding box (colour image coordinates, assumed aligned)
|
|
||||||
depth_scale : multiply raw value to get metres (D435i: 0.001)
|
|
||||||
window_frac : fraction of bbox dimensions to use as central sample window
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
(depth_m, quality)
|
|
||||||
depth_m : median depth in metres (0.0 when no valid pixels)
|
|
||||||
quality : DEPTH_GOOD / DEPTH_MARGINAL / DEPTH_EXTRAPOLATED / DEPTH_INVALID
|
|
||||||
"""
|
|
||||||
ih, iw = depth_u16.shape
|
|
||||||
|
|
||||||
# Central window
|
|
||||||
wf = max(0.1, min(1.0, window_frac))
|
|
||||||
cx = bbox.x + bbox.w * 0.5
|
|
||||||
cy = bbox.y + bbox.h * 0.5
|
|
||||||
hw = bbox.w * wf * 0.5
|
|
||||||
hh = bbox.h * wf * 0.5
|
|
||||||
|
|
||||||
r0 = int(max(0, cy - hh))
|
|
||||||
r1 = int(min(ih, cy + hh + 1))
|
|
||||||
c0 = int(max(0, cx - hw))
|
|
||||||
c1 = int(min(iw, cx + hw + 1))
|
|
||||||
|
|
||||||
if r1 <= r0 or c1 <= c0:
|
|
||||||
return 0.0, DEPTH_INVALID
|
|
||||||
|
|
||||||
patch = depth_u16[r0:r1, c0:c1]
|
|
||||||
valid = patch[patch > 0]
|
|
||||||
n_total = patch.size
|
|
||||||
|
|
||||||
if len(valid) == 0:
|
|
||||||
return 0.0, DEPTH_INVALID
|
|
||||||
|
|
||||||
fill_ratio = len(valid) / max(n_total, 1)
|
|
||||||
depth_m = float(np.median(valid)) * depth_scale
|
|
||||||
|
|
||||||
if fill_ratio > 0.6:
|
|
||||||
quality = DEPTH_GOOD
|
|
||||||
elif fill_ratio > 0.25:
|
|
||||||
quality = DEPTH_MARGINAL
|
|
||||||
else:
|
|
||||||
quality = DEPTH_EXTRAPOLATED
|
|
||||||
|
|
||||||
return depth_m, quality
|
|
||||||
|
|
||||||
|
|
||||||
# ── Per-track state ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class PersonTrack:
|
|
||||||
"""Current state of one tracked person."""
|
|
||||||
track_id: int
|
|
||||||
state: TrackState
|
|
||||||
bbox: BBox # smoothed Kalman bbox (colour img px)
|
|
||||||
bearing: float = 0.0 # degrees; 0 until cam params set
|
|
||||||
distance: float = 0.0 # metres; 0 until depth available
|
|
||||||
depth_qual: int = DEPTH_INVALID
|
|
||||||
confidence: float = 0.0 # 0–1 combined score
|
|
||||||
vel_u: float = 0.0 # Kalman horizontal velocity (px/frame)
|
|
||||||
vel_v: float = 0.0 # Kalman vertical velocity (px/frame)
|
|
||||||
|
|
||||||
hits: int = 0 # consecutive matched frames
|
|
||||||
age: int = 0 # total frames since creation
|
|
||||||
lost_age: int = 0 # consecutive unmatched frames
|
|
||||||
|
|
||||||
color_hist: Optional[np.ndarray] = None # HSV torso histogram
|
|
||||||
|
|
||||||
# Internal — not serialised
|
|
||||||
_kalman: Optional[KalmanBoxFilter] = field(default=None, repr=False)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Camera parameters (minimal) ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class CamParams:
|
|
||||||
"""Minimal camera intrinsics needed for bearing calculation."""
|
|
||||||
fx: float = 615.0 # D435i 640×480 depth defaults
|
|
||||||
fy: float = 615.0
|
|
||||||
cx: float = 320.0
|
|
||||||
cy: float = 240.0
|
|
||||||
fps: float = 30.0 # frame rate (for velocity conversion px→m/s)
|
|
||||||
|
|
||||||
|
|
||||||
# ── PersonTracker ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class PersonTracker:
|
|
||||||
"""
|
|
||||||
Multi-person tracker combining Kalman prediction, IoU data association,
|
|
||||||
and HSV colour histogram re-identification.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
iou_threshold : minimum IoU to consider a detection-track match
|
|
||||||
min_hits : frames before a track transitions TENTATIVE → ACTIVE
|
|
||||||
max_lost_frames : frames a track survives without a detection before removal
|
|
||||||
reid_threshold : minimum histogram Bhattacharyya similarity for re-ID
|
|
||||||
reid_max_dist : max predicted-to-detection centre distance for re-ID (px)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
iou_threshold: float = 0.30,
|
|
||||||
min_hits: int = 3,
|
|
||||||
max_lost_frames: int = 30,
|
|
||||||
reid_threshold: float = 0.55,
|
|
||||||
reid_max_dist: float = 150.0,
|
|
||||||
) -> None:
|
|
||||||
self.iou_threshold = iou_threshold
|
|
||||||
self.min_hits = min_hits
|
|
||||||
self.max_lost_frames = max_lost_frames
|
|
||||||
self.reid_threshold = reid_threshold
|
|
||||||
self.reid_max_dist = reid_max_dist
|
|
||||||
|
|
||||||
self._tracks: List[PersonTrack] = []
|
|
||||||
self._next_id: int = 0
|
|
||||||
|
|
||||||
# ── Public API ────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
@property
|
|
||||||
def tracks(self) -> List[PersonTrack]:
|
|
||||||
return list(self._tracks)
|
|
||||||
|
|
||||||
def update(
|
|
||||||
self,
|
|
||||||
detections: List[Detection],
|
|
||||||
cam: Optional[CamParams] = None,
|
|
||||||
depth_u16: Optional[np.ndarray] = None,
|
|
||||||
depth_scale: float = 0.001,
|
|
||||||
) -> List[PersonTrack]:
|
|
||||||
"""
|
|
||||||
Process one frame of detections.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
detections : list of Detection from an external detector
|
|
||||||
cam : camera intrinsics (for bearing computation); None = skip
|
|
||||||
depth_u16 : aligned uint16 depth image; None = depth unavailable
|
|
||||||
depth_scale : mm-to-metres scale factor (D435i default 0.001)
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
list of PersonTrack (ACTIVE state only)
|
|
||||||
"""
|
|
||||||
# ── Step 1: Predict all tracks ────────────────────────────────────────
|
|
||||||
for trk in self._tracks:
|
|
||||||
if trk._kalman is not None:
|
|
||||||
trk.bbox = trk._kalman.predict()
|
|
||||||
trk.age += 1
|
|
||||||
|
|
||||||
# ── Step 2: IoU matching ──────────────────────────────────────────────
|
|
||||||
active_tracks = [t for t in self._tracks if t.state != TrackState.LOST]
|
|
||||||
matched_t, matched_d, unmatched_t, unmatched_d = \
|
|
||||||
self._match_iou(active_tracks, detections)
|
|
||||||
|
|
||||||
# ── Step 3: Update matched tracks ────────────────────────────────────
|
|
||||||
for t_idx, d_idx in zip(matched_t, matched_d):
|
|
||||||
trk = active_tracks[t_idx]
|
|
||||||
det = detections[d_idx]
|
|
||||||
if trk._kalman is not None:
|
|
||||||
trk.bbox = trk._kalman.update(det.bbox)
|
|
||||||
trk.vel_u, trk.vel_v = trk._kalman.velocity_px
|
|
||||||
trk.hits += 1
|
|
||||||
trk.lost_age = 0
|
|
||||||
trk.confidence = float(det.confidence)
|
|
||||||
if trk.state == TrackState.TENTATIVE and trk.hits >= self.min_hits:
|
|
||||||
trk.state = TrackState.ACTIVE
|
|
||||||
self._update_hist(trk, det)
|
|
||||||
|
|
||||||
# ── Step 4: Re-ID for unmatched detections vs LOST tracks ────────────
|
|
||||||
lost_tracks = [t for t in self._tracks if t.state == TrackState.LOST]
|
|
||||||
still_unmatched_d = list(unmatched_d)
|
|
||||||
|
|
||||||
for d_idx in list(still_unmatched_d):
|
|
||||||
det = detections[d_idx]
|
|
||||||
best_trk, best_sim = self._reid_match(det, lost_tracks)
|
|
||||||
if best_trk is not None:
|
|
||||||
if best_trk._kalman is not None:
|
|
||||||
best_trk.bbox = best_trk._kalman.update(det.bbox)
|
|
||||||
else:
|
|
||||||
best_trk.bbox = det.bbox
|
|
||||||
best_trk.state = TrackState.ACTIVE
|
|
||||||
best_trk.hits += 1
|
|
||||||
best_trk.lost_age = 0
|
|
||||||
best_trk.confidence = float(det.confidence)
|
|
||||||
self._update_hist(best_trk, det)
|
|
||||||
still_unmatched_d.remove(d_idx)
|
|
||||||
|
|
||||||
# ── Step 5: Create new tracks for still-unmatched detections ─────────
|
|
||||||
for d_idx in still_unmatched_d:
|
|
||||||
det = detections[d_idx]
|
|
||||||
trk = PersonTrack(
|
|
||||||
track_id = self._next_id,
|
|
||||||
state = TrackState.TENTATIVE,
|
|
||||||
bbox = det.bbox,
|
|
||||||
hits = 1,
|
|
||||||
confidence= float(det.confidence),
|
|
||||||
_kalman = KalmanBoxFilter(det.bbox),
|
|
||||||
)
|
|
||||||
self._update_hist(trk, det)
|
|
||||||
self._tracks.append(trk)
|
|
||||||
self._next_id += 1
|
|
||||||
|
|
||||||
# ── Step 6: Age lost tracks, remove stale ────────────────────────────
|
|
||||||
# Mark newly-unmatched tracks as LOST (reset lost_age to 0)
|
|
||||||
for t_idx in unmatched_t:
|
|
||||||
trk = active_tracks[t_idx]
|
|
||||||
if trk.state != TrackState.LOST:
|
|
||||||
trk.lost_age = 0
|
|
||||||
trk.state = TrackState.LOST
|
|
||||||
|
|
||||||
# Increment lost_age for every LOST track (including previously LOST ones)
|
|
||||||
for trk in self._tracks:
|
|
||||||
if trk.state == TrackState.LOST:
|
|
||||||
trk.lost_age += 1
|
|
||||||
|
|
||||||
self._tracks = [
|
|
||||||
t for t in self._tracks
|
|
||||||
if t.lost_age < self.max_lost_frames
|
|
||||||
]
|
|
||||||
|
|
||||||
# ── Step 7: Update bearing / depth for all active tracks ─────────────
|
|
||||||
for trk in self._tracks:
|
|
||||||
if trk.state != TrackState.ACTIVE:
|
|
||||||
continue
|
|
||||||
u_centre = trk.bbox.x + trk.bbox.w * 0.5
|
|
||||||
if cam is not None:
|
|
||||||
trk.bearing = bearing_from_pixel(u_centre, cam.cx, cam.fx)
|
|
||||||
if depth_u16 is not None:
|
|
||||||
trk.distance, trk.depth_qual = depth_at_bbox(
|
|
||||||
depth_u16, trk.bbox, depth_scale=depth_scale
|
|
||||||
)
|
|
||||||
|
|
||||||
return [t for t in self._tracks if t.state == TrackState.ACTIVE]
|
|
||||||
|
|
||||||
def reset(self) -> None:
|
|
||||||
"""Clear all tracks."""
|
|
||||||
self._tracks.clear()
|
|
||||||
self._next_id = 0
|
|
||||||
|
|
||||||
# ── Internal helpers ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _match_iou(
|
|
||||||
self,
|
|
||||||
tracks: List[PersonTrack],
|
|
||||||
detections: List[Detection],
|
|
||||||
) -> Tuple[List[int], List[int], List[int], List[int]]:
|
|
||||||
"""
|
|
||||||
Greedy IoU matching between tracks and detections.
|
|
||||||
|
|
||||||
Returns (matched_t_idx, matched_d_idx, unmatched_t_idx, unmatched_d_idx).
|
|
||||||
"""
|
|
||||||
if not tracks or not detections:
|
|
||||||
return [], [], list(range(len(tracks))), list(range(len(detections)))
|
|
||||||
|
|
||||||
iou_mat = np.zeros((len(tracks), len(detections)), dtype=np.float32)
|
|
||||||
for ti, trk in enumerate(tracks):
|
|
||||||
for di, det in enumerate(detections):
|
|
||||||
iou_mat[ti, di] = iou(trk.bbox, det.bbox)
|
|
||||||
|
|
||||||
matched_t: List[int] = []
|
|
||||||
matched_d: List[int] = []
|
|
||||||
used_t = set()
|
|
||||||
used_d = set()
|
|
||||||
|
|
||||||
# Greedy: highest IoU first
|
|
||||||
flat_order = np.argsort(iou_mat.ravel())[::-1]
|
|
||||||
for flat_idx in flat_order:
|
|
||||||
ti, di = divmod(int(flat_idx), len(detections))
|
|
||||||
if iou_mat[ti, di] < self.iou_threshold:
|
|
||||||
break
|
|
||||||
if ti not in used_t and di not in used_d:
|
|
||||||
matched_t.append(ti)
|
|
||||||
matched_d.append(di)
|
|
||||||
used_t.add(ti)
|
|
||||||
used_d.add(di)
|
|
||||||
|
|
||||||
unmatched_t = [i for i in range(len(tracks)) if i not in used_t]
|
|
||||||
unmatched_d = [i for i in range(len(detections)) if i not in used_d]
|
|
||||||
return matched_t, matched_d, unmatched_t, unmatched_d
|
|
||||||
|
|
||||||
def _reid_match(
|
|
||||||
self,
|
|
||||||
det: Detection,
|
|
||||||
lost: List[PersonTrack],
|
|
||||||
) -> Tuple[Optional[PersonTrack], float]:
|
|
||||||
"""
|
|
||||||
Find the best re-identification match for a detection among lost tracks.
|
|
||||||
|
|
||||||
Returns (best_track, similarity) or (None, 0.0) if no match found.
|
|
||||||
"""
|
|
||||||
if not lost:
|
|
||||||
return None, 0.0
|
|
||||||
|
|
||||||
det_hist = None
|
|
||||||
if det.frame_bgr is not None:
|
|
||||||
det_hist = extract_torso_hist(det.frame_bgr, det.bbox)
|
|
||||||
|
|
||||||
best_trk: Optional[PersonTrack] = None
|
|
||||||
best_sim: float = 0.0
|
|
||||||
|
|
||||||
det_cx = det.bbox.x + det.bbox.w * 0.5
|
|
||||||
det_cy = det.bbox.y + det.bbox.h * 0.5
|
|
||||||
|
|
||||||
for trk in lost:
|
|
||||||
# Spatial gate: predicted centre must be close enough
|
|
||||||
trk_cx = trk.bbox.x + trk.bbox.w * 0.5
|
|
||||||
trk_cy = trk.bbox.y + trk.bbox.h * 0.5
|
|
||||||
dist = math.sqrt((det_cx - trk_cx) ** 2 + (det_cy - trk_cy) ** 2)
|
|
||||||
if dist > self.reid_max_dist:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Histogram similarity
|
|
||||||
if det_hist is not None and trk.color_hist is not None:
|
|
||||||
sim = hist_similarity(det_hist, trk.color_hist)
|
|
||||||
if sim > self.reid_threshold and sim > best_sim:
|
|
||||||
best_sim = sim
|
|
||||||
best_trk = trk
|
|
||||||
|
|
||||||
return best_trk, best_sim
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _update_hist(trk: PersonTrack, det: Detection) -> None:
|
|
||||||
"""Update track's colour histogram with exponential decay."""
|
|
||||||
if det.frame_bgr is None:
|
|
||||||
return
|
|
||||||
new_hist = extract_torso_hist(det.frame_bgr, det.bbox)
|
|
||||||
if new_hist is None:
|
|
||||||
return
|
|
||||||
if trk.color_hist is None:
|
|
||||||
trk.color_hist = new_hist
|
|
||||||
else:
|
|
||||||
# Running average (α = 0.3 — new frame contributes 30 %)
|
|
||||||
trk.color_hist = 0.7 * trk.color_hist + 0.3 * new_hist
|
|
||||||
|
|
||||||
|
|
||||||
# ── Follow-target selector ────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class FollowTargetSelector:
|
|
||||||
"""
|
|
||||||
Selects and locks onto a single PersonTrack as the follow target.
|
|
||||||
|
|
||||||
Strategy
|
|
||||||
--------
|
|
||||||
• On start() or when no target is locked: choose the nearest active track
|
|
||||||
(by depth distance, or by image-centre proximity when depth unavailable).
|
|
||||||
• Re-lock onto the same track_id each frame (continuous tracking).
|
|
||||||
• If the locked track disappears: hold the last known state for
|
|
||||||
`hold_frames` frames, then go inactive.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, hold_frames: int = 15) -> None:
|
|
||||||
self.hold_frames = hold_frames
|
|
||||||
self._target_id: Optional[int] = None
|
|
||||||
self._last_target: Optional[PersonTrack] = None
|
|
||||||
self._held_frames: int = 0
|
|
||||||
self._active: bool = False
|
|
||||||
|
|
||||||
# -- control ──────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def start(self) -> None:
|
|
||||||
"""(Re-)enable follow mode; re-select target on next update."""
|
|
||||||
self._active = True
|
|
||||||
self._target_id = None
|
|
||||||
self._held_frames = 0
|
|
||||||
|
|
||||||
def stop(self) -> None:
|
|
||||||
"""Disable follow mode."""
|
|
||||||
self._active = False
|
|
||||||
self._target_id = None
|
|
||||||
self._last_target = None
|
|
||||||
|
|
||||||
# -- update ───────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def update(
|
|
||||||
self,
|
|
||||||
active_tracks: List[PersonTrack],
|
|
||||||
img_cx: float = 320.0, # image centre x (px)
|
|
||||||
) -> Optional[PersonTrack]:
|
|
||||||
"""
|
|
||||||
Select the follow target from a list of active tracks.
|
|
||||||
|
|
||||||
Returns the locked PersonTrack, or None if follow mode is inactive or
|
|
||||||
no candidate found.
|
|
||||||
"""
|
|
||||||
if not self._active:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not active_tracks:
|
|
||||||
if self._last_target is not None and self._held_frames < self.hold_frames:
|
|
||||||
self._held_frames += 1
|
|
||||||
return self._last_target
|
|
||||||
self._last_target = None
|
|
||||||
return None
|
|
||||||
|
|
||||||
self._held_frames = 0
|
|
||||||
|
|
||||||
# Re-find locked track
|
|
||||||
if self._target_id is not None:
|
|
||||||
for t in active_tracks:
|
|
||||||
if t.track_id == self._target_id:
|
|
||||||
self._last_target = t
|
|
||||||
return t
|
|
||||||
# Locked track lost — re-select
|
|
||||||
self._target_id = None
|
|
||||||
|
|
||||||
# Select: prefer by smallest distance, then by image-centre proximity
|
|
||||||
def _score(t: PersonTrack) -> float:
|
|
||||||
if t.distance > 0:
|
|
||||||
return t.distance
|
|
||||||
return abs((t.bbox.x + t.bbox.w * 0.5) - img_cx)
|
|
||||||
|
|
||||||
chosen = min(active_tracks, key=_score)
|
|
||||||
self._target_id = chosen.track_id
|
|
||||||
self._last_target = chosen
|
|
||||||
return chosen
|
|
||||||
@ -1,371 +0,0 @@
|
|||||||
"""
|
|
||||||
person_tracking_node.py — P0 follow-me person tracking node (Issue #363).
|
|
||||||
|
|
||||||
Runs a real-time person detection + tracking pipeline on the D435i colour and
|
|
||||||
depth streams and publishes a single TargetTrack message for the follow-me
|
|
||||||
motion controller.
|
|
||||||
|
|
||||||
Detection backend
|
|
||||||
-----------------
|
|
||||||
Attempts YOLOv8n via ultralytics (auto-converted to TensorRT FP16 on first
|
|
||||||
run for ≥ 15 fps on Orin Nano). Falls back to a simple HOG+SVM detector
|
|
||||||
when ultralytics is unavailable.
|
|
||||||
|
|
||||||
Subscribes
|
|
||||||
----------
|
|
||||||
/camera/color/image_raw sensor_msgs/Image (BEST_EFFORT)
|
|
||||||
/camera/depth/image_rect_raw sensor_msgs/Image (BEST_EFFORT)
|
|
||||||
/camera/depth/camera_info sensor_msgs/CameraInfo (RELIABLE)
|
|
||||||
/saltybot/follow_start std_msgs/Empty — start/resume following
|
|
||||||
/saltybot/follow_stop std_msgs/Empty — stop following
|
|
||||||
|
|
||||||
Publishes
|
|
||||||
---------
|
|
||||||
/saltybot/target_track saltybot_scene_msgs/TargetTrack
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
detector_model str 'yolov8n.pt' Ultralytics model file or TRT engine
|
|
||||||
use_tensorrt bool True Convert to TensorRT FP16 on first run
|
|
||||||
max_fps float 30.0 Maximum processing rate (Hz)
|
|
||||||
iou_threshold float 0.30 Tracker IoU matching threshold
|
|
||||||
min_hits int 3 Frames before TENTATIVE → ACTIVE
|
|
||||||
max_lost_frames int 30 Frames a track survives without det
|
|
||||||
reid_threshold float 0.55 HSV histogram re-ID similarity cutoff
|
|
||||||
depth_scale float 0.001 D435i raw-to-metres scale
|
|
||||||
depth_max_m float 5.0 Range beyond which depth degrades
|
|
||||||
auto_follow bool True Auto-select nearest person on start
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
from typing import List, Optional
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy.qos import (
|
|
||||||
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy,
|
|
||||||
)
|
|
||||||
|
|
||||||
from sensor_msgs.msg import CameraInfo, Image
|
|
||||||
from std_msgs.msg import Empty
|
|
||||||
from saltybot_scene_msgs.msg import TargetTrack
|
|
||||||
from ._person_tracker import (
|
|
||||||
BBox, CamParams, Detection,
|
|
||||||
PersonTracker, FollowTargetSelector,
|
|
||||||
DEPTH_GOOD, DEPTH_MARGINAL,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
_SENSOR_QOS = QoSProfile(
|
|
||||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
|
||||||
history=HistoryPolicy.KEEP_LAST,
|
|
||||||
depth=2,
|
|
||||||
)
|
|
||||||
_RELIABLE_QOS = QoSProfile(
|
|
||||||
reliability=ReliabilityPolicy.RELIABLE,
|
|
||||||
history=HistoryPolicy.KEEP_LAST,
|
|
||||||
depth=1,
|
|
||||||
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Detector wrappers ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class _YoloDetector:
|
|
||||||
"""Lazy-initialised YOLOv8n person detector (ultralytics / TensorRT)."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
model_path: str = 'yolov8n.pt',
|
|
||||||
use_tensorrt: bool = True,
|
|
||||||
logger=None,
|
|
||||||
) -> None:
|
|
||||||
self._model_path = model_path
|
|
||||||
self._use_trt = use_tensorrt
|
|
||||||
self._log = logger
|
|
||||||
self._model = None
|
|
||||||
self._available = False
|
|
||||||
self._ready = threading.Event()
|
|
||||||
threading.Thread(target=self._load, daemon=True).start()
|
|
||||||
|
|
||||||
def _load(self) -> None:
|
|
||||||
try:
|
|
||||||
from ultralytics import YOLO
|
|
||||||
model = YOLO(self._model_path)
|
|
||||||
if self._use_trt:
|
|
||||||
try:
|
|
||||||
model = YOLO(model.export(format='engine', half=True, device=0))
|
|
||||||
except Exception as e:
|
|
||||||
if self._log:
|
|
||||||
self._log.warn(f'TRT export failed ({e}); using PyTorch')
|
|
||||||
self._model = model
|
|
||||||
self._available = True
|
|
||||||
if self._log:
|
|
||||||
self._log.info(f'YOLO detector loaded: {self._model_path}')
|
|
||||||
except Exception as e:
|
|
||||||
self._available = False
|
|
||||||
if self._log:
|
|
||||||
self._log.warn(f'ultralytics not available ({e}); using HOG fallback')
|
|
||||||
finally:
|
|
||||||
self._ready.set()
|
|
||||||
|
|
||||||
def detect(self, bgr: np.ndarray, conf_thresh: float = 0.40) -> List[Detection]:
|
|
||||||
if not self._ready.wait(timeout=0.01) or not self._available:
|
|
||||||
return []
|
|
||||||
results = self._model(bgr, classes=[0], conf=conf_thresh, verbose=False)
|
|
||||||
dets: List[Detection] = []
|
|
||||||
for r in results:
|
|
||||||
for box in r.boxes:
|
|
||||||
x1, y1, x2, y2 = (int(v) for v in box.xyxy[0].cpu().numpy())
|
|
||||||
w, h = max(1, x2 - x1), max(1, y2 - y1)
|
|
||||||
dets.append(Detection(
|
|
||||||
bbox = BBox(x1, y1, w, h),
|
|
||||||
confidence = float(box.conf[0]),
|
|
||||||
frame_bgr = bgr,
|
|
||||||
))
|
|
||||||
return dets
|
|
||||||
|
|
||||||
|
|
||||||
class _HogDetector:
|
|
||||||
"""OpenCV HOG+SVM person detector — CPU fallback, ~5–10 fps."""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
import cv2
|
|
||||||
self._hog = cv2.HOGDescriptor()
|
|
||||||
self._hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
|
|
||||||
|
|
||||||
def detect(self, bgr: np.ndarray, conf_thresh: float = 0.40) -> List[Detection]:
|
|
||||||
import cv2
|
|
||||||
small = cv2.resize(bgr, (320, 240)) if bgr.shape[1] > 320 else bgr
|
|
||||||
scale = bgr.shape[1] / small.shape[1]
|
|
||||||
rects, weights = self._hog.detectMultiScale(
|
|
||||||
small, winStride=(8, 8), padding=(4, 4), scale=1.05,
|
|
||||||
)
|
|
||||||
dets: List[Detection] = []
|
|
||||||
for rect, w in zip(rects, weights):
|
|
||||||
conf = float(np.clip(w, 0.0, 1.0))
|
|
||||||
if conf < conf_thresh:
|
|
||||||
continue
|
|
||||||
x, y, rw, rh = rect
|
|
||||||
dets.append(Detection(
|
|
||||||
bbox = BBox(
|
|
||||||
int(x * scale), int(y * scale),
|
|
||||||
int(rw * scale), int(rh * scale),
|
|
||||||
),
|
|
||||||
confidence = conf,
|
|
||||||
frame_bgr = bgr,
|
|
||||||
))
|
|
||||||
return dets
|
|
||||||
|
|
||||||
|
|
||||||
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class PersonTrackingNode(Node):
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
super().__init__('person_tracking_node')
|
|
||||||
|
|
||||||
# ── Parameters ──────────────────────────────────────────────────────
|
|
||||||
self.declare_parameter('detector_model', 'yolov8n.pt')
|
|
||||||
self.declare_parameter('use_tensorrt', True)
|
|
||||||
self.declare_parameter('max_fps', 30.0)
|
|
||||||
self.declare_parameter('iou_threshold', 0.30)
|
|
||||||
self.declare_parameter('min_hits', 3)
|
|
||||||
self.declare_parameter('max_lost_frames', 30)
|
|
||||||
self.declare_parameter('reid_threshold', 0.55)
|
|
||||||
self.declare_parameter('depth_scale', 0.001)
|
|
||||||
self.declare_parameter('depth_max_m', 5.0)
|
|
||||||
self.declare_parameter('auto_follow', True)
|
|
||||||
|
|
||||||
p = self.get_parameter
|
|
||||||
self._min_period = 1.0 / max(float(p('max_fps').value), 1.0)
|
|
||||||
self._depth_scale = float(p('depth_scale').value)
|
|
||||||
self._depth_max_m = float(p('depth_max_m').value)
|
|
||||||
self._auto_follow = bool(p('auto_follow').value)
|
|
||||||
|
|
||||||
# ── Tracker + selector ───────────────────────────────────────────────
|
|
||||||
self._tracker = PersonTracker(
|
|
||||||
iou_threshold = float(p('iou_threshold').value),
|
|
||||||
min_hits = int(p('min_hits').value),
|
|
||||||
max_lost_frames = int(p('max_lost_frames').value),
|
|
||||||
reid_threshold = float(p('reid_threshold').value),
|
|
||||||
)
|
|
||||||
self._selector = FollowTargetSelector(hold_frames=15)
|
|
||||||
if self._auto_follow:
|
|
||||||
self._selector.start()
|
|
||||||
|
|
||||||
# ── Detector (lazy) ──────────────────────────────────────────────────
|
|
||||||
self._detector: Optional[_YoloDetector] = _YoloDetector(
|
|
||||||
model_path = str(p('detector_model').value),
|
|
||||||
use_tensorrt = bool(p('use_tensorrt').value),
|
|
||||||
logger = self.get_logger(),
|
|
||||||
)
|
|
||||||
self._hog_fallback: Optional[_HogDetector] = None
|
|
||||||
|
|
||||||
# ── Camera state ─────────────────────────────────────────────────────
|
|
||||||
self._cam = CamParams()
|
|
||||||
self._cam_received = False
|
|
||||||
self._depth_image: Optional[np.ndarray] = None
|
|
||||||
self._last_proc = 0.0
|
|
||||||
|
|
||||||
# ── Subscribers ──────────────────────────────────────────────────────
|
|
||||||
self._color_sub = self.create_subscription(
|
|
||||||
Image, '/camera/color/image_raw',
|
|
||||||
self._on_color, _SENSOR_QOS,
|
|
||||||
)
|
|
||||||
self._depth_sub = self.create_subscription(
|
|
||||||
Image, '/camera/depth/image_rect_raw',
|
|
||||||
self._on_depth, _SENSOR_QOS,
|
|
||||||
)
|
|
||||||
self._info_sub = self.create_subscription(
|
|
||||||
CameraInfo, '/camera/depth/camera_info',
|
|
||||||
self._on_cam_info, _RELIABLE_QOS,
|
|
||||||
)
|
|
||||||
self._start_sub = self.create_subscription(
|
|
||||||
Empty, '/saltybot/follow_start',
|
|
||||||
lambda _: self._selector.start(), 10,
|
|
||||||
)
|
|
||||||
self._stop_sub = self.create_subscription(
|
|
||||||
Empty, '/saltybot/follow_stop',
|
|
||||||
lambda _: self._selector.stop(), 10,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Publisher ────────────────────────────────────────────────────────
|
|
||||||
self._pub = self.create_publisher(
|
|
||||||
TargetTrack, '/saltybot/target_track', 10
|
|
||||||
)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
'person_tracking_node ready — '
|
|
||||||
f'auto_follow={self._auto_follow}, '
|
|
||||||
f'max_fps={1.0/self._min_period:.0f}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _on_cam_info(self, msg: CameraInfo) -> None:
|
|
||||||
if self._cam_received:
|
|
||||||
return
|
|
||||||
self._cam = CamParams(
|
|
||||||
fx=float(msg.k[0]), fy=float(msg.k[4]),
|
|
||||||
cx=float(msg.k[2]), cy=float(msg.k[5]),
|
|
||||||
)
|
|
||||||
self._cam_received = True
|
|
||||||
self.get_logger().info(
|
|
||||||
f'Camera intrinsics: fx={self._cam.fx:.1f} cx={self._cam.cx:.1f}'
|
|
||||||
)
|
|
||||||
|
|
||||||
def _on_depth(self, msg: Image) -> None:
|
|
||||||
if msg.encoding.lower() not in ('16uc1', 'mono16'):
|
|
||||||
return
|
|
||||||
data = np.frombuffer(msg.data, dtype=np.uint16)
|
|
||||||
try:
|
|
||||||
self._depth_image = data.reshape((msg.height, msg.width))
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _on_color(self, msg: Image) -> None:
|
|
||||||
now = time.monotonic()
|
|
||||||
if now - self._last_proc < self._min_period:
|
|
||||||
return
|
|
||||||
self._last_proc = now
|
|
||||||
|
|
||||||
enc = msg.encoding.lower()
|
|
||||||
if enc not in ('bgr8', 'rgb8'):
|
|
||||||
return
|
|
||||||
data = np.frombuffer(msg.data, dtype=np.uint8)
|
|
||||||
try:
|
|
||||||
frame = data.reshape((msg.height, msg.width, 3))
|
|
||||||
except ValueError:
|
|
||||||
return
|
|
||||||
if enc == 'rgb8':
|
|
||||||
import cv2
|
|
||||||
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
|
|
||||||
|
|
||||||
# ── Detect ────────────────────────────────────────────────────────
|
|
||||||
dets: List[Detection] = []
|
|
||||||
if self._detector and self._detector._ready.is_set():
|
|
||||||
if self._detector._available:
|
|
||||||
dets = self._detector.detect(frame)
|
|
||||||
else:
|
|
||||||
# Init HOG fallback on first YOLO failure
|
|
||||||
if self._hog_fallback is None:
|
|
||||||
try:
|
|
||||||
self._hog_fallback = _HogDetector()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
if self._hog_fallback:
|
|
||||||
dets = self._hog_fallback.detect(frame)
|
|
||||||
else:
|
|
||||||
# YOLO still loading — run HOG if available
|
|
||||||
if self._hog_fallback is None:
|
|
||||||
try:
|
|
||||||
self._hog_fallback = _HogDetector()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
if self._hog_fallback:
|
|
||||||
dets = self._hog_fallback.detect(frame)
|
|
||||||
|
|
||||||
# ── Track ─────────────────────────────────────────────────────────
|
|
||||||
active = self._tracker.update(
|
|
||||||
dets,
|
|
||||||
cam = self._cam if self._cam_received else None,
|
|
||||||
depth_u16 = self._depth_image,
|
|
||||||
depth_scale = self._depth_scale,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Select target ─────────────────────────────────────────────────
|
|
||||||
target = self._selector.update(
|
|
||||||
active, img_cx=self._cam.cx
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Publish ───────────────────────────────────────────────────────
|
|
||||||
out = TargetTrack()
|
|
||||||
out.header.stamp = msg.header.stamp
|
|
||||||
out.header.frame_id = msg.header.frame_id or 'camera_color_optical_frame'
|
|
||||||
|
|
||||||
if target is not None:
|
|
||||||
out.tracking_active = True
|
|
||||||
out.track_id = target.track_id
|
|
||||||
out.bearing_deg = float(target.bearing)
|
|
||||||
out.distance_m = float(target.distance)
|
|
||||||
out.confidence = float(target.confidence)
|
|
||||||
out.bbox_x = int(target.bbox.x)
|
|
||||||
out.bbox_y = int(target.bbox.y)
|
|
||||||
out.bbox_w = int(target.bbox.w)
|
|
||||||
out.bbox_h = int(target.bbox.h)
|
|
||||||
out.depth_quality = int(target.depth_qual)
|
|
||||||
|
|
||||||
# Convert Kalman pixel velocity → bearing rate
|
|
||||||
if self._cam_received and target.vel_u != 0.0:
|
|
||||||
u_c = target.bbox.x + target.bbox.w * 0.5
|
|
||||||
# d(bearing)/du ≈ fx / (fx² + (u-cx)²) * (180/π)
|
|
||||||
denom = self._cam.fx ** 2 + (u_c - self._cam.cx) ** 2
|
|
||||||
d_bear_du = (self._cam.fx / denom) * (180.0 / 3.14159)
|
|
||||||
out.vel_bearing_dps = float(d_bear_du * target.vel_u * self._cam.fps)
|
|
||||||
|
|
||||||
# Distance velocity from depth (placeholder: not computed per-frame here)
|
|
||||||
out.vel_dist_mps = 0.0
|
|
||||||
else:
|
|
||||||
out.tracking_active = False
|
|
||||||
|
|
||||||
self._pub.publish(out)
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None) -> None:
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = PersonTrackingNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
||||||
@ -61,8 +61,6 @@ setup(
|
|||||||
'audio_scene = saltybot_bringup.audio_scene_node:main',
|
'audio_scene = saltybot_bringup.audio_scene_node:main',
|
||||||
# Face emotion classifier (Issue #359)
|
# Face emotion classifier (Issue #359)
|
||||||
'face_emotion = saltybot_bringup.face_emotion_node:main',
|
'face_emotion = saltybot_bringup.face_emotion_node:main',
|
||||||
# Person tracking for follow-me mode (Issue #363)
|
|
||||||
'person_tracking = saltybot_bringup.person_tracking_node:main',
|
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -1,599 +0,0 @@
|
|||||||
"""
|
|
||||||
test_person_tracker.py — Unit tests for the P0 person tracking pipeline.
|
|
||||||
|
|
||||||
Tests cover: IoU, Kalman filter, colour histogram, bearing geometry,
|
|
||||||
depth sampling, tracker state machine, and follow-target selection.
|
|
||||||
No camera, no detector, no ROS2 needed.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import math
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from saltybot_bringup._person_tracker import (
|
|
||||||
BBox,
|
|
||||||
CamParams,
|
|
||||||
Detection,
|
|
||||||
FollowTargetSelector,
|
|
||||||
KalmanBoxFilter,
|
|
||||||
PersonTrack,
|
|
||||||
PersonTracker,
|
|
||||||
TrackState,
|
|
||||||
DEPTH_GOOD, DEPTH_MARGINAL, DEPTH_EXTRAPOLATED, DEPTH_INVALID,
|
|
||||||
bearing_from_pixel,
|
|
||||||
depth_at_bbox,
|
|
||||||
extract_torso_hist,
|
|
||||||
hist_similarity,
|
|
||||||
iou,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# Helpers
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _det(x=10, y=20, w=60, h=150, conf=0.85, frame=None) -> Detection:
|
|
||||||
return Detection(BBox(x, y, w, h), conf, frame)
|
|
||||||
|
|
||||||
|
|
||||||
def _solid_bgr(h=200, w=100, b=128, g=64, r=32) -> np.ndarray:
|
|
||||||
"""Uniform colour BGR image."""
|
|
||||||
return np.full((h, w, 3), (b, g, r), dtype=np.uint8)
|
|
||||||
|
|
||||||
|
|
||||||
def _depth_image(h=480, w=640, val_mm=2000) -> np.ndarray:
|
|
||||||
"""Uniform uint16 depth image at val_mm mm."""
|
|
||||||
return np.full((h, w), val_mm, dtype=np.uint16)
|
|
||||||
|
|
||||||
|
|
||||||
def _run_tracker_n(tracker, bbox, n=5, cam=None):
|
|
||||||
"""Feed the same detection to a tracker n times."""
|
|
||||||
for _ in range(n):
|
|
||||||
tracker.update([_det(*bbox)], cam=cam)
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# BBox
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_bbox_fields():
|
|
||||||
b = BBox(10, 20, 60, 150)
|
|
||||||
assert b.x == 10 and b.y == 20 and b.w == 60 and b.h == 150
|
|
||||||
|
|
||||||
|
|
||||||
def test_bbox_is_named_tuple():
|
|
||||||
b = BBox(0, 0, 1, 1)
|
|
||||||
assert isinstance(b, tuple)
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# IoU
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_iou_identical():
|
|
||||||
b = BBox(0, 0, 100, 100)
|
|
||||||
assert abs(iou(b, b) - 1.0) < 1e-6
|
|
||||||
|
|
||||||
|
|
||||||
def test_iou_no_overlap():
|
|
||||||
a = BBox(0, 0, 50, 50)
|
|
||||||
b = BBox(100, 0, 50, 50)
|
|
||||||
assert iou(a, b) == pytest.approx(0.0, abs=1e-6)
|
|
||||||
|
|
||||||
|
|
||||||
def test_iou_partial_overlap():
|
|
||||||
a = BBox(0, 0, 100, 100)
|
|
||||||
b = BBox(50, 0, 100, 100) # 50 % horizontal overlap
|
|
||||||
result = iou(a, b)
|
|
||||||
assert 0.0 < result < 1.0
|
|
||||||
|
|
||||||
|
|
||||||
def test_iou_exact_overlap_value():
|
|
||||||
a = BBox(0, 0, 100, 100) # area 10000
|
|
||||||
b = BBox(0, 0, 50, 100) # area 5000, inter=5000, union=10000
|
|
||||||
assert abs(iou(a, b) - 0.5) < 1e-6
|
|
||||||
|
|
||||||
|
|
||||||
def test_iou_contained():
|
|
||||||
a = BBox(0, 0, 100, 100)
|
|
||||||
b = BBox(25, 25, 50, 50) # b inside a
|
|
||||||
result = iou(a, b)
|
|
||||||
# inter = 2500, union = 10000+2500-2500 = 10000
|
|
||||||
assert abs(result - 0.25) < 1e-6
|
|
||||||
|
|
||||||
|
|
||||||
def test_iou_symmetric():
|
|
||||||
a = BBox(10, 10, 80, 80)
|
|
||||||
b = BBox(50, 50, 80, 80)
|
|
||||||
assert abs(iou(a, b) - iou(b, a)) < 1e-9
|
|
||||||
|
|
||||||
|
|
||||||
def test_iou_touching_edges():
|
|
||||||
a = BBox(0, 0, 50, 50)
|
|
||||||
b = BBox(50, 0, 50, 50) # touch at x=50 → zero overlap
|
|
||||||
assert iou(a, b) == pytest.approx(0.0, abs=1e-6)
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# KalmanBoxFilter
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_kalman_predict_returns_bbox():
|
|
||||||
kf = KalmanBoxFilter(BBox(10, 20, 60, 150))
|
|
||||||
pred = kf.predict()
|
|
||||||
assert isinstance(pred, BBox)
|
|
||||||
assert pred.w >= 1 and pred.h >= 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_kalman_update_returns_bbox():
|
|
||||||
kf = KalmanBoxFilter(BBox(10, 20, 60, 150))
|
|
||||||
updated = kf.update(BBox(10, 20, 60, 150))
|
|
||||||
assert isinstance(updated, BBox)
|
|
||||||
|
|
||||||
|
|
||||||
def test_kalman_converges_to_stationary():
|
|
||||||
"""After many identical measurements, Kalman should converge near them."""
|
|
||||||
init = BBox(100, 100, 80, 160)
|
|
||||||
meas = BBox(102, 98, 80, 160)
|
|
||||||
kf = KalmanBoxFilter(init)
|
|
||||||
for _ in range(20):
|
|
||||||
kf.predict()
|
|
||||||
result = kf.update(meas)
|
|
||||||
# Should be within ~10 px of measurement
|
|
||||||
assert abs(result.x - meas.x) < 10
|
|
||||||
assert abs(result.y - meas.y) < 10
|
|
||||||
|
|
||||||
|
|
||||||
def test_kalman_predict_advances_with_velocity():
|
|
||||||
"""Give the filter a few frames of rightward motion; prediction overshoots."""
|
|
||||||
kf = KalmanBoxFilter(BBox(100, 100, 60, 150))
|
|
||||||
for i in range(5):
|
|
||||||
kf.predict()
|
|
||||||
kf.update(BBox(100 + i * 10, 100, 60, 150))
|
|
||||||
pred = kf.predict()
|
|
||||||
# After motion right, predicted cx should be further right
|
|
||||||
assert pred.x > 100 + 4 * 10 - 5 # at least near last position
|
|
||||||
|
|
||||||
|
|
||||||
def test_kalman_velocity_zero_initially():
|
|
||||||
kf = KalmanBoxFilter(BBox(100, 100, 60, 150))
|
|
||||||
assert kf.velocity_px == (0.0, 0.0)
|
|
||||||
|
|
||||||
|
|
||||||
def test_kalman_bbox_property():
|
|
||||||
b = BBox(50, 50, 80, 120)
|
|
||||||
kf = KalmanBoxFilter(b)
|
|
||||||
out = kf.bbox
|
|
||||||
assert isinstance(out, BBox)
|
|
||||||
# Should be near initial
|
|
||||||
assert abs(out.x - b.x) <= 2
|
|
||||||
assert abs(out.y - b.y) <= 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_kalman_width_height_stay_positive():
|
|
||||||
kf = KalmanBoxFilter(BBox(10, 10, 5, 5))
|
|
||||||
for _ in range(30):
|
|
||||||
kf.predict()
|
|
||||||
assert kf.bbox.w >= 1 and kf.bbox.h >= 1
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# extract_torso_hist
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_torso_hist_shape():
|
|
||||||
bgr = _solid_bgr()
|
|
||||||
h = extract_torso_hist(bgr, BBox(0, 0, 100, 200))
|
|
||||||
assert h is not None
|
|
||||||
assert h.shape == (128,) # 16 * 8
|
|
||||||
|
|
||||||
|
|
||||||
def test_torso_hist_normalised():
|
|
||||||
bgr = _solid_bgr()
|
|
||||||
h = extract_torso_hist(bgr, BBox(0, 0, 100, 200))
|
|
||||||
assert h is not None
|
|
||||||
assert abs(h.sum() - 1.0) < 1e-5
|
|
||||||
|
|
||||||
|
|
||||||
def test_torso_hist_none_for_none_frame():
|
|
||||||
assert extract_torso_hist(None, BBox(0, 0, 100, 200)) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_torso_hist_none_for_tiny_bbox():
|
|
||||||
bgr = _solid_bgr()
|
|
||||||
assert extract_torso_hist(bgr, BBox(0, 0, 2, 2)) is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_torso_hist_different_colours():
|
|
||||||
"""Two differently coloured crops should produce different histograms."""
|
|
||||||
bgr_red = np.full((200, 100, 3), (0, 0, 255), dtype=np.uint8)
|
|
||||||
bgr_blue = np.full((200, 100, 3), (255, 0, 0), dtype=np.uint8)
|
|
||||||
h_red = extract_torso_hist(bgr_red, BBox(0, 0, 100, 200))
|
|
||||||
h_blue = extract_torso_hist(bgr_blue, BBox(0, 0, 100, 200))
|
|
||||||
assert h_red is not None and h_blue is not None
|
|
||||||
assert not np.allclose(h_red, h_blue)
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# hist_similarity
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_hist_similarity_identical():
|
|
||||||
h = np.ones(128, dtype=np.float32) / 128.0
|
|
||||||
assert abs(hist_similarity(h, h) - 1.0) < 1e-6
|
|
||||||
|
|
||||||
|
|
||||||
def test_hist_similarity_orthogonal():
|
|
||||||
"""Non-overlapping histograms → 0 similarity."""
|
|
||||||
h1 = np.zeros(128, dtype=np.float32)
|
|
||||||
h2 = np.zeros(128, dtype=np.float32)
|
|
||||||
h1[:64] = 1.0 / 64
|
|
||||||
h2[64:] = 1.0 / 64
|
|
||||||
assert abs(hist_similarity(h1, h2)) < 1e-6
|
|
||||||
|
|
||||||
|
|
||||||
def test_hist_similarity_range():
|
|
||||||
rng = np.random.RandomState(0)
|
|
||||||
for _ in range(20):
|
|
||||||
h1 = rng.rand(128).astype(np.float32)
|
|
||||||
h2 = rng.rand(128).astype(np.float32)
|
|
||||||
h1 /= h1.sum(); h2 /= h2.sum()
|
|
||||||
s = hist_similarity(h1, h2)
|
|
||||||
assert 0.0 <= s <= 1.0 + 1e-6
|
|
||||||
|
|
||||||
|
|
||||||
def test_hist_similarity_symmetric():
|
|
||||||
h1 = np.random.RandomState(1).rand(128).astype(np.float32)
|
|
||||||
h2 = np.random.RandomState(2).rand(128).astype(np.float32)
|
|
||||||
h1 /= h1.sum(); h2 /= h2.sum()
|
|
||||||
assert abs(hist_similarity(h1, h2) - hist_similarity(h2, h1)) < 1e-6
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# bearing_from_pixel
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_bearing_centre_is_zero():
|
|
||||||
"""Pixel at principal point → bearing = 0°."""
|
|
||||||
assert abs(bearing_from_pixel(320.0, 320.0, 615.0)) < 1e-9
|
|
||||||
|
|
||||||
|
|
||||||
def test_bearing_right_is_positive():
|
|
||||||
b = bearing_from_pixel(400.0, 320.0, 615.0)
|
|
||||||
assert b > 0.0
|
|
||||||
|
|
||||||
|
|
||||||
def test_bearing_left_is_negative():
|
|
||||||
b = bearing_from_pixel(200.0, 320.0, 615.0)
|
|
||||||
assert b < 0.0
|
|
||||||
|
|
||||||
|
|
||||||
def test_bearing_symmetric():
|
|
||||||
b_right = bearing_from_pixel(400.0, 320.0, 615.0)
|
|
||||||
b_left = bearing_from_pixel(240.0, 320.0, 615.0)
|
|
||||||
assert abs(b_right + b_left) < 1e-9
|
|
||||||
|
|
||||||
|
|
||||||
def test_bearing_approx_at_45_deg():
|
|
||||||
"""u - cx = fx → atan(1) = 45°."""
|
|
||||||
bearing = bearing_from_pixel(935.0, 320.0, 615.0)
|
|
||||||
assert abs(bearing - 45.0) < 0.1
|
|
||||||
|
|
||||||
|
|
||||||
def test_bearing_degrees_not_radians():
|
|
||||||
"""Result must be in degrees (much larger than a radian value)."""
|
|
||||||
b = bearing_from_pixel(400.0, 320.0, 615.0)
|
|
||||||
assert abs(b) > 0.5 # atan2(80/615) ≈ 7.4°
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# depth_at_bbox
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_depth_uniform_field():
|
|
||||||
d = _depth_image(val_mm=2000)
|
|
||||||
depth_m, quality = depth_at_bbox(d, BBox(200, 150, 80, 200))
|
|
||||||
assert abs(depth_m - 2.0) < 0.01
|
|
||||||
assert quality == DEPTH_GOOD
|
|
||||||
|
|
||||||
|
|
||||||
def test_depth_zero_image_invalid():
|
|
||||||
d = np.zeros((480, 640), dtype=np.uint16)
|
|
||||||
depth_m, quality = depth_at_bbox(d, BBox(200, 150, 80, 200))
|
|
||||||
assert depth_m == 0.0
|
|
||||||
assert quality == DEPTH_INVALID
|
|
||||||
|
|
||||||
|
|
||||||
def test_depth_partial_fill_marginal():
|
|
||||||
d = np.zeros((480, 640), dtype=np.uint16)
|
|
||||||
# Fill only 40 % of the central window with valid readings
|
|
||||||
d[200:260, 280:360] = 1500
|
|
||||||
_, quality = depth_at_bbox(d, BBox(200, 150, 160, 200), window_frac=1.0)
|
|
||||||
assert quality in (DEPTH_MARGINAL, DEPTH_EXTRAPOLATED, DEPTH_GOOD)
|
|
||||||
|
|
||||||
|
|
||||||
def test_depth_scale_applied():
|
|
||||||
d = _depth_image(val_mm=3000)
|
|
||||||
depth_m, _ = depth_at_bbox(d, BBox(100, 100, 80, 150), depth_scale=0.001)
|
|
||||||
assert abs(depth_m - 3.0) < 0.01
|
|
||||||
|
|
||||||
|
|
||||||
def test_depth_out_of_bounds_bbox():
|
|
||||||
"""Bbox outside image should return DEPTH_INVALID."""
|
|
||||||
d = _depth_image()
|
|
||||||
_, quality = depth_at_bbox(d, BBox(700, 500, 80, 100)) # off-screen
|
|
||||||
assert quality == DEPTH_INVALID
|
|
||||||
|
|
||||||
|
|
||||||
def test_depth_at_5m():
|
|
||||||
d = _depth_image(val_mm=5000)
|
|
||||||
depth_m, _ = depth_at_bbox(d, BBox(200, 100, 80, 200))
|
|
||||||
assert abs(depth_m - 5.0) < 0.05
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# PersonTracker — state machine
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_tracker_new_track_tentative():
|
|
||||||
t = PersonTracker(min_hits=3)
|
|
||||||
active = t.update([_det()])
|
|
||||||
# min_hits=3: first frame → TENTATIVE, not in active output
|
|
||||||
assert len(active) == 0
|
|
||||||
assert len(t.tracks) == 1
|
|
||||||
assert t.tracks[0].state == TrackState.TENTATIVE
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_track_becomes_active():
|
|
||||||
t = PersonTracker(min_hits=3)
|
|
||||||
for _ in range(3):
|
|
||||||
active = t.update([_det()])
|
|
||||||
assert len(active) == 1
|
|
||||||
assert active[0].state == TrackState.ACTIVE
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_persistent_id():
|
|
||||||
t = PersonTracker(min_hits=2)
|
|
||||||
_run_tracker_n(t, (10, 20, 60, 150), n=4)
|
|
||||||
assert len(t.tracks) == 1
|
|
||||||
assert t.tracks[0].track_id == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_two_detections_two_tracks():
|
|
||||||
t = PersonTracker(min_hits=2)
|
|
||||||
# Two widely-separated boxes → two tracks
|
|
||||||
dets = [
|
|
||||||
Detection(BBox(10, 20, 60, 150), 0.9),
|
|
||||||
Detection(BBox(400, 20, 60, 150), 0.9),
|
|
||||||
]
|
|
||||||
for _ in range(3):
|
|
||||||
t.update(dets)
|
|
||||||
assert len(t.tracks) == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_ids_increment():
|
|
||||||
t = PersonTracker(min_hits=1)
|
|
||||||
t.update([_det(x=10)])
|
|
||||||
t.update([_det(x=10), _det(x=300)])
|
|
||||||
ids = {trk.track_id for trk in t.tracks}
|
|
||||||
assert len(ids) == 2
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_track_goes_lost():
|
|
||||||
t = PersonTracker(min_hits=2, max_lost_frames=5)
|
|
||||||
_run_tracker_n(t, (10, 20, 60, 150), n=3)
|
|
||||||
# Now send no detections
|
|
||||||
t.update([])
|
|
||||||
lost = [tr for tr in t.tracks if tr.state == TrackState.LOST]
|
|
||||||
assert len(lost) == 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_track_removed_after_max_lost():
|
|
||||||
t = PersonTracker(min_hits=2, max_lost_frames=3)
|
|
||||||
_run_tracker_n(t, (10, 20, 60, 150), n=3)
|
|
||||||
for _ in range(5):
|
|
||||||
t.update([])
|
|
||||||
assert len(t.tracks) == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_iou_matching_same_track():
|
|
||||||
"""Slightly moved detection should match the existing track (not create new)."""
|
|
||||||
t = PersonTracker(min_hits=2, iou_threshold=0.3)
|
|
||||||
_run_tracker_n(t, (10, 20, 60, 150), n=3)
|
|
||||||
n_before = len(t.tracks)
|
|
||||||
t.update([_det(x=15, y=20, w=60, h=150)]) # small shift, high IoU
|
|
||||||
assert len(t.tracks) == n_before
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_no_overlap_creates_new_track():
|
|
||||||
t = PersonTracker(min_hits=2, iou_threshold=0.3)
|
|
||||||
_run_tracker_n(t, (10, 20, 60, 150), n=3)
|
|
||||||
t.update([_det(x=500, y=20, w=60, h=150)]) # far away → new track
|
|
||||||
assert len(t.tracks) == 2 # old (lost) + new (tentative)
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_reset():
|
|
||||||
t = PersonTracker(min_hits=2)
|
|
||||||
_run_tracker_n(t, (10, 20, 60, 150), n=3)
|
|
||||||
t.reset()
|
|
||||||
assert len(t.tracks) == 0
|
|
||||||
assert t._next_id == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_bearing_set_with_cam():
|
|
||||||
cam = CamParams(fx=615.0, cx=320.0)
|
|
||||||
t = PersonTracker(min_hits=2)
|
|
||||||
_run_tracker_n(t, (10, 20, 60, 150), n=3, cam=cam)
|
|
||||||
active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE]
|
|
||||||
assert len(active) > 0
|
|
||||||
# bearing of a box at x=10..70 (cx ≈ 40, image cx=320) → negative (left)
|
|
||||||
assert active[0].bearing < 0.0
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_distance_set_with_depth():
|
|
||||||
cam = CamParams(fx=615.0, cx=320.0)
|
|
||||||
t = PersonTracker(min_hits=2)
|
|
||||||
d = _depth_image(val_mm=2000)
|
|
||||||
for _ in range(3):
|
|
||||||
t.update([_det(x=200, y=100, w=80, h=200)], cam=cam, depth_u16=d)
|
|
||||||
active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE]
|
|
||||||
assert len(active) > 0
|
|
||||||
assert abs(active[0].distance - 2.0) < 0.1
|
|
||||||
|
|
||||||
|
|
||||||
def test_tracker_no_depth_distance_zero():
|
|
||||||
cam = CamParams()
|
|
||||||
t = PersonTracker(min_hits=2)
|
|
||||||
for _ in range(3):
|
|
||||||
t.update([_det()], cam=cam, depth_u16=None)
|
|
||||||
active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE]
|
|
||||||
assert len(active) > 0
|
|
||||||
assert active[0].distance == 0.0
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# PersonTracker — re-identification
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_reid_restores_track_id():
|
|
||||||
"""Person disappears for a few frames then reappears; same track_id."""
|
|
||||||
# Blue person in centre
|
|
||||||
bgr_blue = np.full((480, 640, 3), (200, 50, 0), dtype=np.uint8)
|
|
||||||
bbox = (250, 50, 80, 200)
|
|
||||||
t = PersonTracker(
|
|
||||||
min_hits=2, max_lost_frames=10,
|
|
||||||
reid_threshold=0.4, reid_max_dist=200.0,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Establish track
|
|
||||||
for _ in range(3):
|
|
||||||
t.update([Detection(BBox(*bbox), 0.9, bgr_blue)])
|
|
||||||
|
|
||||||
track_id_before = t.tracks[0].track_id
|
|
||||||
|
|
||||||
# Disappear for 3 frames
|
|
||||||
for _ in range(3):
|
|
||||||
t.update([])
|
|
||||||
|
|
||||||
# Re-appear at same position with same colour
|
|
||||||
t.update([Detection(BBox(*bbox), 0.9, bgr_blue)])
|
|
||||||
|
|
||||||
# Track should be re-identified with the same ID
|
|
||||||
assert any(tr.track_id == track_id_before for tr in t.tracks)
|
|
||||||
|
|
||||||
|
|
||||||
def test_reid_different_colour_creates_new_track():
|
|
||||||
"""Person disappears; different colour appears same place → new track ID."""
|
|
||||||
bgr_blue = np.full((480, 640, 3), (200, 50, 0), dtype=np.uint8)
|
|
||||||
bgr_red = np.full((480, 640, 3), (0, 50, 200), dtype=np.uint8)
|
|
||||||
bbox = (250, 50, 80, 200)
|
|
||||||
t = PersonTracker(
|
|
||||||
min_hits=2, max_lost_frames=5,
|
|
||||||
reid_threshold=0.85, # strict threshold → red won't match blue
|
|
||||||
reid_max_dist=200.0,
|
|
||||||
)
|
|
||||||
|
|
||||||
for _ in range(3):
|
|
||||||
t.update([Detection(BBox(*bbox), 0.9, bgr_blue)])
|
|
||||||
|
|
||||||
track_id_before = t.tracks[0].track_id
|
|
||||||
|
|
||||||
for _ in range(2):
|
|
||||||
t.update([])
|
|
||||||
|
|
||||||
# Red person (different colour)
|
|
||||||
for _ in range(3):
|
|
||||||
t.update([Detection(BBox(*bbox), 0.9, bgr_red)])
|
|
||||||
|
|
||||||
new_ids = {tr.track_id for tr in t.tracks}
|
|
||||||
# Should contain a new ID (not only the original)
|
|
||||||
assert track_id_before not in new_ids or len(new_ids) > 1
|
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# FollowTargetSelector
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _make_track(track_id, x, y, w, h, dist=0.0) -> PersonTrack:
|
|
||||||
trk = PersonTrack(
|
|
||||||
track_id=track_id, state=TrackState.ACTIVE,
|
|
||||||
bbox=BBox(x, y, w, h), distance=dist,
|
|
||||||
)
|
|
||||||
return trk
|
|
||||||
|
|
||||||
|
|
||||||
def test_selector_inactive_returns_none():
|
|
||||||
sel = FollowTargetSelector()
|
|
||||||
sel.stop()
|
|
||||||
result = sel.update([_make_track(0, 100, 50, 80, 200, dist=3.0)])
|
|
||||||
assert result is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_selector_active_selects_nearest_by_distance():
|
|
||||||
sel = FollowTargetSelector()
|
|
||||||
sel.start()
|
|
||||||
tracks = [
|
|
||||||
_make_track(0, 100, 50, 80, 200, dist=4.0),
|
|
||||||
_make_track(1, 300, 50, 80, 200, dist=1.5),
|
|
||||||
]
|
|
||||||
result = sel.update(tracks, img_cx=320.0)
|
|
||||||
assert result.track_id == 1 # closer
|
|
||||||
|
|
||||||
|
|
||||||
def test_selector_locks_on_same_track():
|
|
||||||
sel = FollowTargetSelector()
|
|
||||||
sel.start()
|
|
||||||
tracks = [
|
|
||||||
_make_track(0, 100, 50, 80, 200, dist=2.0),
|
|
||||||
_make_track(1, 300, 50, 80, 200, dist=5.0),
|
|
||||||
]
|
|
||||||
first = sel.update(tracks, img_cx=320.0)
|
|
||||||
# Second call — switch distance but should keep locked ID
|
|
||||||
tracks[0] = _make_track(0, 100, 50, 80, 200, dist=2.0)
|
|
||||||
second = sel.update(tracks, img_cx=320.0)
|
|
||||||
assert second.track_id == first.track_id
|
|
||||||
|
|
||||||
|
|
||||||
def test_selector_holds_last_when_tracks_empty():
|
|
||||||
sel = FollowTargetSelector(hold_frames=5)
|
|
||||||
sel.start()
|
|
||||||
sel.update([_make_track(0, 100, 50, 80, 200, dist=2.0)])
|
|
||||||
# Now empty — should hold for up to 5 frames
|
|
||||||
result = sel.update([], img_cx=320.0)
|
|
||||||
assert result is not None
|
|
||||||
assert result.track_id == 0
|
|
||||||
|
|
||||||
|
|
||||||
def test_selector_stops_holding_after_hold_frames():
|
|
||||||
sel = FollowTargetSelector(hold_frames=2)
|
|
||||||
sel.start()
|
|
||||||
sel.update([_make_track(0, 100, 50, 80, 200)])
|
|
||||||
for _ in range(5):
|
|
||||||
result = sel.update([])
|
|
||||||
assert result is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_selector_selects_by_centre_when_no_depth():
|
|
||||||
sel = FollowTargetSelector()
|
|
||||||
sel.start()
|
|
||||||
# Track 0: x=0..80 → centre x=40 (far left of image)
|
|
||||||
# Track 1: x=280..360 → centre x=320 (near image centre)
|
|
||||||
tracks = [
|
|
||||||
_make_track(0, 0, 50, 80, 200, dist=0.0),
|
|
||||||
_make_track(1, 280, 50, 80, 200, dist=0.0),
|
|
||||||
]
|
|
||||||
result = sel.update(tracks, img_cx=320.0)
|
|
||||||
assert result.track_id == 1 # nearer to image centre
|
|
||||||
|
|
||||||
|
|
||||||
def test_selector_restart_reselects():
|
|
||||||
sel = FollowTargetSelector()
|
|
||||||
sel.start()
|
|
||||||
t0 = _make_track(0, 100, 50, 80, 200, dist=2.0)
|
|
||||||
sel.update([t0])
|
|
||||||
sel.stop()
|
|
||||||
sel.start()
|
|
||||||
t1 = _make_track(1, 300, 50, 80, 200, dist=1.0)
|
|
||||||
result = sel.update([t0, t1])
|
|
||||||
assert result.track_id == 1 # re-selected nearest
|
|
||||||
@ -35,8 +35,6 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||||||
# Issue #359 — face emotion classifier
|
# Issue #359 — face emotion classifier
|
||||||
"msg/FaceEmotion.msg"
|
"msg/FaceEmotion.msg"
|
||||||
"msg/FaceEmotionArray.msg"
|
"msg/FaceEmotionArray.msg"
|
||||||
# Issue #363 — person tracking for follow-me mode
|
|
||||||
"msg/TargetTrack.msg"
|
|
||||||
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -1,13 +0,0 @@
|
|||||||
std_msgs/Header header
|
|
||||||
bool tracking_active # false when no target is locked
|
|
||||||
uint32 track_id # persistent ID across frames
|
|
||||||
float32 bearing_deg # horizontal bearing to target (°, right=+, left=−)
|
|
||||||
float32 distance_m # range to target (m); 0 = unknown / depth invalid
|
|
||||||
float32 confidence # 0.0–1.0 overall track quality
|
|
||||||
int32 bbox_x # colour-image bounding box (pixels, top-left origin)
|
|
||||||
int32 bbox_y
|
|
||||||
int32 bbox_w
|
|
||||||
int32 bbox_h
|
|
||||||
float32 vel_bearing_dps # bearing rate (°/s, from Kalman velocity state)
|
|
||||||
float32 vel_dist_mps # distance rate (m/s, + = moving away)
|
|
||||||
uint8 depth_quality # 0=invalid 1=extrapolated 2=marginal 3=good
|
|
||||||
Loading…
x
Reference in New Issue
Block a user