From c620dc51a74a6f0b568b3435b8259bccfa62b479 Mon Sep 17 00:00:00 2001 From: sl-perception Date: Tue, 3 Mar 2026 15:19:02 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20Issue=20#363=20=E2=80=94=20P0=20p?= =?UTF-8?q?erson=20tracking=20for=20follow-me=20mode?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements real-time person detection + tracking pipeline for the follow-me motion controller on Jetson Orin Nano Super (D435i). Core components - TargetTrack.msg: bearing_deg, distance_m, confidence, bbox, vel_bearing_dps, vel_dist_mps, depth_quality (0-3) - _person_tracker.py (pure-Python, no ROS2/runtime deps): · 8-state constant-velocity Kalman filter [cx,cy,w,h,vcx,vcy,vw,vh] · Greedy IoU data association · HSV torso colour histogram re-ID (16H×8S, Bhattacharyya similarity) with fixed saturation clamping (s = (cmax−cmin)/cmax, clipped to [0,1]) · FollowTargetSelector: nearest person auto-lock, hold_frames hysteresis · TENTATIVE→ACTIVE after min_hits; LOST track removal after max_lost_frames with per-frame lost_age increment across all LOST tracks · bearing_from_pixel, depth_at_bbox (median, quality flags) - person_tracking_node.py: · YOLOv8n via ultralytics (TRT FP16 on first run) → HOG+SVM fallback · Subscribes colour + depth + camera_info + follow_start/stop · Publishes /saltybot/target_track at ≤30 fps - test/test_person_tracker.py: 59/59 tests passing Co-Authored-By: Claude Sonnet 4.6 --- .../saltybot_bringup/_person_tracker.py | 729 ++++++++++++++++++ .../saltybot_bringup/person_tracking_node.py | 371 +++++++++ jetson/ros2_ws/src/saltybot_bringup/setup.py | 2 + .../test/test_person_tracker.py | 599 ++++++++++++++ .../src/saltybot_scene_msgs/CMakeLists.txt | 2 + .../saltybot_scene_msgs/msg/TargetTrack.msg | 13 + 6 files changed, 1716 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_person_tracker.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/person_tracking_node.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/test/test_person_tracker.py create mode 100644 jetson/ros2_ws/src/saltybot_scene_msgs/msg/TargetTrack.msg diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_person_tracker.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_person_tracker.py new file mode 100644 index 0000000..f1822df --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_person_tracker.py @@ -0,0 +1,729 @@ +""" +_person_tracker.py — Multi-person tracker for follow-me mode (no ROS2 deps). + +Pipeline (called once per colour frame) +----------------------------------------- +1. Person detections (bounding boxes + detector confidence) arrive from an + external detector (YOLOv8n, MobileNetSSD, etc.) — not this module's concern. +2. Active tracks are predicted one step forward via a constant-velocity Kalman. +3. Detections are matched to predicted tracks with greedy IoU ≥ iou_threshold. +4. Unmatched detections that survive re-ID histogram matching against LOST + tracks are reattached to those tracks (brief-occlusion recovery). +5. Remaining unmatched detections start new TENTATIVE tracks. +6. Tracks not updated for max_lost_frames are removed permanently. +7. Bearing and range to a designated follow target are derived from camera + intrinsics + an aligned depth image. + +Re-identification +----------------- +Each track stores an HSV colour histogram of the person's torso region +(middle 50 % of bbox height, centre 80 % width). After occlusion, new +detections whose histogram Bhattacharyya similarity exceeds reid_threshold +*and* whose predicted position is within reid_max_dist pixels are candidates +for re-identification. Closest histogram match wins. + +Kalman state (8-D, one per track) +---------------------------------- + x = [cx, cy, w, h, vcx, vcy, vw, vh] + Measurement z = [cx, cy, w, h] + dt = 1 frame (constant velocity model) + +Public API +---------- + BBox NamedTuple (x, y, w, h) — pixel coordinates + Detection NamedTuple (bbox, confidence, frame_bgr) + TrackState Enum TENTATIVE / ACTIVE / LOST + PersonTrack dataclass per-track state snapshot + PersonTracker class .update() → list[PersonTrack] + + iou(a, b) → float + bearing_from_pixel(u, cx_px, fx) → float (degrees) + depth_at_bbox(depth_u16, bbox, …) → (depth_m, quality) + extract_torso_hist(bgr, bbox, …) → ndarray | None + hist_similarity(h1, h2) → float (0 = different, 1 = same) + KalmanBoxFilter class +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from enum import IntEnum +from typing import List, NamedTuple, Optional, Tuple + +import numpy as np + + +# ── Simple types ────────────────────────────────────────────────────────────── + +class BBox(NamedTuple): + """Axis-aligned bounding box in pixel coordinates.""" + x: int # left edge + y: int # top edge + w: int # width (≥ 1) + h: int # height (≥ 1) + + +class Detection(NamedTuple): + """One person detection from an external detector.""" + bbox: BBox + confidence: float # 0–1 detector score + frame_bgr: Optional[np.ndarray] = None # colour frame (for histogram) + + +class TrackState(IntEnum): + TENTATIVE = 0 # seen < min_hits frames; not yet published + ACTIVE = 1 # confirmed; published to follow-me controller + LOST = 2 # missing; still kept for re-ID up to max_lost_frames + + +# ── Depth quality levels ────────────────────────────────────────────────────── +DEPTH_INVALID = 0 +DEPTH_EXTRAPOLATED = 1 +DEPTH_MARGINAL = 2 +DEPTH_GOOD = 3 + + +# ── IoU ─────────────────────────────────────────────────────────────────────── + +def iou(a: BBox, b: BBox) -> float: + """Intersection-over-union of two bounding boxes.""" + ax2, ay2 = a.x + a.w, a.y + a.h + bx2, by2 = b.x + b.w, b.y + b.h + ix1 = max(a.x, b.x); iy1 = max(a.y, b.y) + ix2 = min(ax2, bx2); iy2 = min(ay2, by2) + inter = max(0, ix2 - ix1) * max(0, iy2 - iy1) + union = a.w * a.h + b.w * b.h - inter + return float(inter) / max(float(union), 1e-6) + + +# ── Kalman box filter ───────────────────────────────────────────────────────── + +class KalmanBoxFilter: + """ + Constant-velocity 8-state Kalman filter for bounding-box tracking. + + State x = [cx, cy, w, h, vcx, vcy, vw, vh] + Meas z = [cx, cy, w, h] + + Process noise Q: position uncertainty σ=0.1 px/frame², + velocity uncertainty σ=10 px/frame + Measurement noise R: ±2 px std-dev on bbox edges + """ + + def __init__(self, initial_bbox: BBox) -> None: + # Transition matrix (dt = 1 frame) + self._F = np.eye(8, dtype=np.float64) + self._F[:4, 4:] = np.eye(4) + + # Measurement matrix + self._H = np.zeros((4, 8), dtype=np.float64) + self._H[:4, :4] = np.eye(4) + + # Process noise + self._Q = np.diag([1., 1., 1., 1., 100., 100., 100., 100.]).astype(np.float64) + + # Measurement noise (~2 px std-dev → var = 4) + self._R = np.eye(4, dtype=np.float64) * 4.0 + + # Initial state + cx = initial_bbox.x + initial_bbox.w * 0.5 + cy = initial_bbox.y + initial_bbox.h * 0.5 + self._x = np.array( + [cx, cy, float(initial_bbox.w), float(initial_bbox.h), 0., 0., 0., 0.], + dtype=np.float64, + ) + + # Initial covariance — small pos uncertainty, large velocity uncertainty + self._P = np.diag([10., 10., 10., 10., 1000., 1000., 1000., 1000.]).astype(np.float64) + + # -- predict ────────────────────────────────────────────────────────────── + + def predict(self) -> BBox: + """Advance state one step forward; return predicted BBox.""" + self._x = self._F @ self._x + self._P = self._F @ self._P @ self._F.T + self._Q + return self._to_bbox() + + # -- update ─────────────────────────────────────────────────────────────── + + def update(self, bbox: BBox) -> BBox: + """Correct state with observation; return corrected BBox.""" + z = np.array( + [bbox.x + bbox.w * 0.5, bbox.y + bbox.h * 0.5, + float(bbox.w), float(bbox.h)], + dtype=np.float64, + ) + y = z - self._H @ self._x + S = self._H @ self._P @ self._H.T + self._R + K = self._P @ self._H.T @ np.linalg.inv(S) + self._x = self._x + K @ y + self._P = (np.eye(8) - K @ self._H) @ self._P + return self._to_bbox() + + # -- accessors ──────────────────────────────────────────────────────────── + + @property + def velocity_px(self) -> Tuple[float, float]: + """(vcx, vcy) in pixels / frame from Kalman state.""" + return float(self._x[4]), float(self._x[5]) + + @property + def bbox(self) -> BBox: + return self._to_bbox() + + def _to_bbox(self) -> BBox: + cx, cy, w, h = self._x[:4] + w = max(1.0, w); h = max(1.0, h) + return BBox( + int(round(cx - w * 0.5)), + int(round(cy - h * 0.5)), + int(round(w)), + int(round(h)), + ) + + +# ── Colour histogram (HSV torso) ────────────────────────────────────────────── + +_HIST_H_BINS = 16 +_HIST_S_BINS = 8 +_HIST_SIZE = _HIST_H_BINS * _HIST_S_BINS # 128 + + +def extract_torso_hist( + bgr: np.ndarray, + bbox: BBox, + h_bins: int = _HIST_H_BINS, + s_bins: int = _HIST_S_BINS, +) -> Optional[np.ndarray]: + """ + Extract a normalised HSV colour histogram from the torso region of a + person bounding box. + + Torso region: middle 50 % of bbox height, centre 80 % of bbox width. + + Parameters + ---------- + bgr : (H, W, 3) uint8 colour image + bbox : person bounding box + h_bins, s_bins : histogram bins for H and S channels + + Returns + ------- + Normalised 1-D histogram of length h_bins * s_bins, or None if the + crop is too small or bgr is None. + """ + if bgr is None: + return None + + ih, iw = bgr.shape[:2] + + # Torso crop + y0 = bbox.y + bbox.h // 4 # skip top 25 % (head) + y1 = bbox.y + bbox.h * 3 // 4 # skip bottom 25 % (legs) + x0 = bbox.x + bbox.w // 10 + x1 = bbox.x + bbox.w * 9 // 10 + + y0 = max(0, y0); y1 = min(ih, y1) + x0 = max(0, x0); x1 = min(iw, x1) + + if y1 - y0 < 4 or x1 - x0 < 4: + return None + + crop = bgr[y0:y1, x0:x1] + + # BGR → HSV (manual, no OpenCV dependency for tests) + crop_f = crop.astype(np.float32) / 255.0 + r, g, b = crop_f[..., 2], crop_f[..., 1], crop_f[..., 0] + + cmax = np.maximum(np.maximum(r, g), b) + cmin = np.minimum(np.minimum(r, g), b) + delta_h = cmax - cmin + 1e-7 # epsilon only for hue angle computation + + # Hue in [0, 360) + h = np.where( + cmax == r, 60.0 * ((g - b) / delta_h % 6), + np.where(cmax == g, 60.0 * ((b - r) / delta_h + 2), + 60.0 * ((r - g) / delta_h + 4)), + ) + h = np.clip(h % 360.0, 0.0, 359.9999) + # Saturation in [0, 1] — no epsilon so pure-colour pixels stay ≤ 1.0 + s = np.clip(np.where(cmax > 1e-6, (cmax - cmin) / cmax, 0.0), 0.0, 1.0) + + h_flat = h.ravel() + s_flat = s.ravel() + + hist, _, _ = np.histogram2d( + h_flat, s_flat, + bins=[h_bins, s_bins], + range=[[0, 360], [0, 1]], + ) + hist = hist.ravel().astype(np.float32) + total = hist.sum() + if total > 0: + hist /= total + return hist + + +def hist_similarity(h1: np.ndarray, h2: np.ndarray) -> float: + """ + Bhattacharyya similarity between two normalised histograms. + + Returns a value in [0, 1]: 1 = identical, 0 = completely different. + """ + bc = float(np.sum(np.sqrt(h1 * h2))) + return float(np.clip(bc, 0.0, 1.0)) + + +# ── Camera geometry ─────────────────────────────────────────────────────────── + +def bearing_from_pixel( + u: float, + cx_px: float, + fx: float, +) -> float: + """ + Convert a horizontal pixel coordinate to a bearing angle. + + Parameters + ---------- + u : pixel column (horizontal image coordinate) + cx_px : principal point x (from CameraInfo.K[2]) + fx : horizontal focal length in pixels (from CameraInfo.K[0]) + + Returns + ------- + bearing_deg : signed degrees; positive = right of camera centre. + """ + return math.degrees(math.atan2(float(u - cx_px), float(fx))) + + +def depth_at_bbox( + depth_u16: np.ndarray, + bbox: BBox, + depth_scale: float = 0.001, + window_frac: float = 0.3, +) -> Tuple[float, int]: + """ + Sample median depth from the central torso region of a bounding box in a + uint16 depth image (D435i mm units by default). + + Parameters + ---------- + depth_u16 : (H, W) uint16 depth image + bbox : person bounding box (colour image coordinates, assumed aligned) + depth_scale : multiply raw value to get metres (D435i: 0.001) + window_frac : fraction of bbox dimensions to use as central sample window + + Returns + ------- + (depth_m, quality) + depth_m : median depth in metres (0.0 when no valid pixels) + quality : DEPTH_GOOD / DEPTH_MARGINAL / DEPTH_EXTRAPOLATED / DEPTH_INVALID + """ + ih, iw = depth_u16.shape + + # Central window + wf = max(0.1, min(1.0, window_frac)) + cx = bbox.x + bbox.w * 0.5 + cy = bbox.y + bbox.h * 0.5 + hw = bbox.w * wf * 0.5 + hh = bbox.h * wf * 0.5 + + r0 = int(max(0, cy - hh)) + r1 = int(min(ih, cy + hh + 1)) + c0 = int(max(0, cx - hw)) + c1 = int(min(iw, cx + hw + 1)) + + if r1 <= r0 or c1 <= c0: + return 0.0, DEPTH_INVALID + + patch = depth_u16[r0:r1, c0:c1] + valid = patch[patch > 0] + n_total = patch.size + + if len(valid) == 0: + return 0.0, DEPTH_INVALID + + fill_ratio = len(valid) / max(n_total, 1) + depth_m = float(np.median(valid)) * depth_scale + + if fill_ratio > 0.6: + quality = DEPTH_GOOD + elif fill_ratio > 0.25: + quality = DEPTH_MARGINAL + else: + quality = DEPTH_EXTRAPOLATED + + return depth_m, quality + + +# ── Per-track state ─────────────────────────────────────────────────────────── + +@dataclass +class PersonTrack: + """Current state of one tracked person.""" + track_id: int + state: TrackState + bbox: BBox # smoothed Kalman bbox (colour img px) + bearing: float = 0.0 # degrees; 0 until cam params set + distance: float = 0.0 # metres; 0 until depth available + depth_qual: int = DEPTH_INVALID + confidence: float = 0.0 # 0–1 combined score + vel_u: float = 0.0 # Kalman horizontal velocity (px/frame) + vel_v: float = 0.0 # Kalman vertical velocity (px/frame) + + hits: int = 0 # consecutive matched frames + age: int = 0 # total frames since creation + lost_age: int = 0 # consecutive unmatched frames + + color_hist: Optional[np.ndarray] = None # HSV torso histogram + + # Internal — not serialised + _kalman: Optional[KalmanBoxFilter] = field(default=None, repr=False) + + +# ── Camera parameters (minimal) ────────────────────────────────────────────── + +@dataclass +class CamParams: + """Minimal camera intrinsics needed for bearing calculation.""" + fx: float = 615.0 # D435i 640×480 depth defaults + fy: float = 615.0 + cx: float = 320.0 + cy: float = 240.0 + fps: float = 30.0 # frame rate (for velocity conversion px→m/s) + + +# ── PersonTracker ───────────────────────────────────────────────────────────── + +class PersonTracker: + """ + Multi-person tracker combining Kalman prediction, IoU data association, + and HSV colour histogram re-identification. + + Parameters + ---------- + iou_threshold : minimum IoU to consider a detection-track match + min_hits : frames before a track transitions TENTATIVE → ACTIVE + max_lost_frames : frames a track survives without a detection before removal + reid_threshold : minimum histogram Bhattacharyya similarity for re-ID + reid_max_dist : max predicted-to-detection centre distance for re-ID (px) + """ + + def __init__( + self, + iou_threshold: float = 0.30, + min_hits: int = 3, + max_lost_frames: int = 30, + reid_threshold: float = 0.55, + reid_max_dist: float = 150.0, + ) -> None: + self.iou_threshold = iou_threshold + self.min_hits = min_hits + self.max_lost_frames = max_lost_frames + self.reid_threshold = reid_threshold + self.reid_max_dist = reid_max_dist + + self._tracks: List[PersonTrack] = [] + self._next_id: int = 0 + + # ── Public API ──────────────────────────────────────────────────────────── + + @property + def tracks(self) -> List[PersonTrack]: + return list(self._tracks) + + def update( + self, + detections: List[Detection], + cam: Optional[CamParams] = None, + depth_u16: Optional[np.ndarray] = None, + depth_scale: float = 0.001, + ) -> List[PersonTrack]: + """ + Process one frame of detections. + + Parameters + ---------- + detections : list of Detection from an external detector + cam : camera intrinsics (for bearing computation); None = skip + depth_u16 : aligned uint16 depth image; None = depth unavailable + depth_scale : mm-to-metres scale factor (D435i default 0.001) + + Returns + ------- + list of PersonTrack (ACTIVE state only) + """ + # ── Step 1: Predict all tracks ──────────────────────────────────────── + for trk in self._tracks: + if trk._kalman is not None: + trk.bbox = trk._kalman.predict() + trk.age += 1 + + # ── Step 2: IoU matching ────────────────────────────────────────────── + active_tracks = [t for t in self._tracks if t.state != TrackState.LOST] + matched_t, matched_d, unmatched_t, unmatched_d = \ + self._match_iou(active_tracks, detections) + + # ── Step 3: Update matched tracks ──────────────────────────────────── + for t_idx, d_idx in zip(matched_t, matched_d): + trk = active_tracks[t_idx] + det = detections[d_idx] + if trk._kalman is not None: + trk.bbox = trk._kalman.update(det.bbox) + trk.vel_u, trk.vel_v = trk._kalman.velocity_px + trk.hits += 1 + trk.lost_age = 0 + trk.confidence = float(det.confidence) + if trk.state == TrackState.TENTATIVE and trk.hits >= self.min_hits: + trk.state = TrackState.ACTIVE + self._update_hist(trk, det) + + # ── Step 4: Re-ID for unmatched detections vs LOST tracks ──────────── + lost_tracks = [t for t in self._tracks if t.state == TrackState.LOST] + still_unmatched_d = list(unmatched_d) + + for d_idx in list(still_unmatched_d): + det = detections[d_idx] + best_trk, best_sim = self._reid_match(det, lost_tracks) + if best_trk is not None: + if best_trk._kalman is not None: + best_trk.bbox = best_trk._kalman.update(det.bbox) + else: + best_trk.bbox = det.bbox + best_trk.state = TrackState.ACTIVE + best_trk.hits += 1 + best_trk.lost_age = 0 + best_trk.confidence = float(det.confidence) + self._update_hist(best_trk, det) + still_unmatched_d.remove(d_idx) + + # ── Step 5: Create new tracks for still-unmatched detections ───────── + for d_idx in still_unmatched_d: + det = detections[d_idx] + trk = PersonTrack( + track_id = self._next_id, + state = TrackState.TENTATIVE, + bbox = det.bbox, + hits = 1, + confidence= float(det.confidence), + _kalman = KalmanBoxFilter(det.bbox), + ) + self._update_hist(trk, det) + self._tracks.append(trk) + self._next_id += 1 + + # ── Step 6: Age lost tracks, remove stale ──────────────────────────── + # Mark newly-unmatched tracks as LOST (reset lost_age to 0) + for t_idx in unmatched_t: + trk = active_tracks[t_idx] + if trk.state != TrackState.LOST: + trk.lost_age = 0 + trk.state = TrackState.LOST + + # Increment lost_age for every LOST track (including previously LOST ones) + for trk in self._tracks: + if trk.state == TrackState.LOST: + trk.lost_age += 1 + + self._tracks = [ + t for t in self._tracks + if t.lost_age < self.max_lost_frames + ] + + # ── Step 7: Update bearing / depth for all active tracks ───────────── + for trk in self._tracks: + if trk.state != TrackState.ACTIVE: + continue + u_centre = trk.bbox.x + trk.bbox.w * 0.5 + if cam is not None: + trk.bearing = bearing_from_pixel(u_centre, cam.cx, cam.fx) + if depth_u16 is not None: + trk.distance, trk.depth_qual = depth_at_bbox( + depth_u16, trk.bbox, depth_scale=depth_scale + ) + + return [t for t in self._tracks if t.state == TrackState.ACTIVE] + + def reset(self) -> None: + """Clear all tracks.""" + self._tracks.clear() + self._next_id = 0 + + # ── Internal helpers ────────────────────────────────────────────────────── + + def _match_iou( + self, + tracks: List[PersonTrack], + detections: List[Detection], + ) -> Tuple[List[int], List[int], List[int], List[int]]: + """ + Greedy IoU matching between tracks and detections. + + Returns (matched_t_idx, matched_d_idx, unmatched_t_idx, unmatched_d_idx). + """ + if not tracks or not detections: + return [], [], list(range(len(tracks))), list(range(len(detections))) + + iou_mat = np.zeros((len(tracks), len(detections)), dtype=np.float32) + for ti, trk in enumerate(tracks): + for di, det in enumerate(detections): + iou_mat[ti, di] = iou(trk.bbox, det.bbox) + + matched_t: List[int] = [] + matched_d: List[int] = [] + used_t = set() + used_d = set() + + # Greedy: highest IoU first + flat_order = np.argsort(iou_mat.ravel())[::-1] + for flat_idx in flat_order: + ti, di = divmod(int(flat_idx), len(detections)) + if iou_mat[ti, di] < self.iou_threshold: + break + if ti not in used_t and di not in used_d: + matched_t.append(ti) + matched_d.append(di) + used_t.add(ti) + used_d.add(di) + + unmatched_t = [i for i in range(len(tracks)) if i not in used_t] + unmatched_d = [i for i in range(len(detections)) if i not in used_d] + return matched_t, matched_d, unmatched_t, unmatched_d + + def _reid_match( + self, + det: Detection, + lost: List[PersonTrack], + ) -> Tuple[Optional[PersonTrack], float]: + """ + Find the best re-identification match for a detection among lost tracks. + + Returns (best_track, similarity) or (None, 0.0) if no match found. + """ + if not lost: + return None, 0.0 + + det_hist = None + if det.frame_bgr is not None: + det_hist = extract_torso_hist(det.frame_bgr, det.bbox) + + best_trk: Optional[PersonTrack] = None + best_sim: float = 0.0 + + det_cx = det.bbox.x + det.bbox.w * 0.5 + det_cy = det.bbox.y + det.bbox.h * 0.5 + + for trk in lost: + # Spatial gate: predicted centre must be close enough + trk_cx = trk.bbox.x + trk.bbox.w * 0.5 + trk_cy = trk.bbox.y + trk.bbox.h * 0.5 + dist = math.sqrt((det_cx - trk_cx) ** 2 + (det_cy - trk_cy) ** 2) + if dist > self.reid_max_dist: + continue + + # Histogram similarity + if det_hist is not None and trk.color_hist is not None: + sim = hist_similarity(det_hist, trk.color_hist) + if sim > self.reid_threshold and sim > best_sim: + best_sim = sim + best_trk = trk + + return best_trk, best_sim + + @staticmethod + def _update_hist(trk: PersonTrack, det: Detection) -> None: + """Update track's colour histogram with exponential decay.""" + if det.frame_bgr is None: + return + new_hist = extract_torso_hist(det.frame_bgr, det.bbox) + if new_hist is None: + return + if trk.color_hist is None: + trk.color_hist = new_hist + else: + # Running average (α = 0.3 — new frame contributes 30 %) + trk.color_hist = 0.7 * trk.color_hist + 0.3 * new_hist + + +# ── Follow-target selector ──────────────────────────────────────────────────── + +class FollowTargetSelector: + """ + Selects and locks onto a single PersonTrack as the follow target. + + Strategy + -------- + • On start() or when no target is locked: choose the nearest active track + (by depth distance, or by image-centre proximity when depth unavailable). + • Re-lock onto the same track_id each frame (continuous tracking). + • If the locked track disappears: hold the last known state for + `hold_frames` frames, then go inactive. + """ + + def __init__(self, hold_frames: int = 15) -> None: + self.hold_frames = hold_frames + self._target_id: Optional[int] = None + self._last_target: Optional[PersonTrack] = None + self._held_frames: int = 0 + self._active: bool = False + + # -- control ────────────────────────────────────────────────────────────── + + def start(self) -> None: + """(Re-)enable follow mode; re-select target on next update.""" + self._active = True + self._target_id = None + self._held_frames = 0 + + def stop(self) -> None: + """Disable follow mode.""" + self._active = False + self._target_id = None + self._last_target = None + + # -- update ─────────────────────────────────────────────────────────────── + + def update( + self, + active_tracks: List[PersonTrack], + img_cx: float = 320.0, # image centre x (px) + ) -> Optional[PersonTrack]: + """ + Select the follow target from a list of active tracks. + + Returns the locked PersonTrack, or None if follow mode is inactive or + no candidate found. + """ + if not self._active: + return None + + if not active_tracks: + if self._last_target is not None and self._held_frames < self.hold_frames: + self._held_frames += 1 + return self._last_target + self._last_target = None + return None + + self._held_frames = 0 + + # Re-find locked track + if self._target_id is not None: + for t in active_tracks: + if t.track_id == self._target_id: + self._last_target = t + return t + # Locked track lost — re-select + self._target_id = None + + # Select: prefer by smallest distance, then by image-centre proximity + def _score(t: PersonTrack) -> float: + if t.distance > 0: + return t.distance + return abs((t.bbox.x + t.bbox.w * 0.5) - img_cx) + + chosen = min(active_tracks, key=_score) + self._target_id = chosen.track_id + self._last_target = chosen + return chosen diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/person_tracking_node.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/person_tracking_node.py new file mode 100644 index 0000000..dd00215 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/person_tracking_node.py @@ -0,0 +1,371 @@ +""" +person_tracking_node.py — P0 follow-me person tracking node (Issue #363). + +Runs a real-time person detection + tracking pipeline on the D435i colour and +depth streams and publishes a single TargetTrack message for the follow-me +motion controller. + +Detection backend +----------------- + Attempts YOLOv8n via ultralytics (auto-converted to TensorRT FP16 on first + run for ≥ 15 fps on Orin Nano). Falls back to a simple HOG+SVM detector + when ultralytics is unavailable. + +Subscribes +---------- + /camera/color/image_raw sensor_msgs/Image (BEST_EFFORT) + /camera/depth/image_rect_raw sensor_msgs/Image (BEST_EFFORT) + /camera/depth/camera_info sensor_msgs/CameraInfo (RELIABLE) + /saltybot/follow_start std_msgs/Empty — start/resume following + /saltybot/follow_stop std_msgs/Empty — stop following + +Publishes +--------- + /saltybot/target_track saltybot_scene_msgs/TargetTrack + +Parameters +---------- + detector_model str 'yolov8n.pt' Ultralytics model file or TRT engine + use_tensorrt bool True Convert to TensorRT FP16 on first run + max_fps float 30.0 Maximum processing rate (Hz) + iou_threshold float 0.30 Tracker IoU matching threshold + min_hits int 3 Frames before TENTATIVE → ACTIVE + max_lost_frames int 30 Frames a track survives without det + reid_threshold float 0.55 HSV histogram re-ID similarity cutoff + depth_scale float 0.001 D435i raw-to-metres scale + depth_max_m float 5.0 Range beyond which depth degrades + auto_follow bool True Auto-select nearest person on start +""" + +from __future__ import annotations + +import threading +import time +from typing import List, Optional + +import numpy as np + +import rclpy +from rclpy.node import Node +from rclpy.qos import ( + QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy, +) + +from sensor_msgs.msg import CameraInfo, Image +from std_msgs.msg import Empty +from saltybot_scene_msgs.msg import TargetTrack +from ._person_tracker import ( + BBox, CamParams, Detection, + PersonTracker, FollowTargetSelector, + DEPTH_GOOD, DEPTH_MARGINAL, +) + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=2, +) +_RELIABLE_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=1, + durability=DurabilityPolicy.TRANSIENT_LOCAL, +) + + +# ── Detector wrappers ───────────────────────────────────────────────────────── + +class _YoloDetector: + """Lazy-initialised YOLOv8n person detector (ultralytics / TensorRT).""" + + def __init__( + self, + model_path: str = 'yolov8n.pt', + use_tensorrt: bool = True, + logger=None, + ) -> None: + self._model_path = model_path + self._use_trt = use_tensorrt + self._log = logger + self._model = None + self._available = False + self._ready = threading.Event() + threading.Thread(target=self._load, daemon=True).start() + + def _load(self) -> None: + try: + from ultralytics import YOLO + model = YOLO(self._model_path) + if self._use_trt: + try: + model = YOLO(model.export(format='engine', half=True, device=0)) + except Exception as e: + if self._log: + self._log.warn(f'TRT export failed ({e}); using PyTorch') + self._model = model + self._available = True + if self._log: + self._log.info(f'YOLO detector loaded: {self._model_path}') + except Exception as e: + self._available = False + if self._log: + self._log.warn(f'ultralytics not available ({e}); using HOG fallback') + finally: + self._ready.set() + + def detect(self, bgr: np.ndarray, conf_thresh: float = 0.40) -> List[Detection]: + if not self._ready.wait(timeout=0.01) or not self._available: + return [] + results = self._model(bgr, classes=[0], conf=conf_thresh, verbose=False) + dets: List[Detection] = [] + for r in results: + for box in r.boxes: + x1, y1, x2, y2 = (int(v) for v in box.xyxy[0].cpu().numpy()) + w, h = max(1, x2 - x1), max(1, y2 - y1) + dets.append(Detection( + bbox = BBox(x1, y1, w, h), + confidence = float(box.conf[0]), + frame_bgr = bgr, + )) + return dets + + +class _HogDetector: + """OpenCV HOG+SVM person detector — CPU fallback, ~5–10 fps.""" + + def __init__(self) -> None: + import cv2 + self._hog = cv2.HOGDescriptor() + self._hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector()) + + def detect(self, bgr: np.ndarray, conf_thresh: float = 0.40) -> List[Detection]: + import cv2 + small = cv2.resize(bgr, (320, 240)) if bgr.shape[1] > 320 else bgr + scale = bgr.shape[1] / small.shape[1] + rects, weights = self._hog.detectMultiScale( + small, winStride=(8, 8), padding=(4, 4), scale=1.05, + ) + dets: List[Detection] = [] + for rect, w in zip(rects, weights): + conf = float(np.clip(w, 0.0, 1.0)) + if conf < conf_thresh: + continue + x, y, rw, rh = rect + dets.append(Detection( + bbox = BBox( + int(x * scale), int(y * scale), + int(rw * scale), int(rh * scale), + ), + confidence = conf, + frame_bgr = bgr, + )) + return dets + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class PersonTrackingNode(Node): + + def __init__(self) -> None: + super().__init__('person_tracking_node') + + # ── Parameters ────────────────────────────────────────────────────── + self.declare_parameter('detector_model', 'yolov8n.pt') + self.declare_parameter('use_tensorrt', True) + self.declare_parameter('max_fps', 30.0) + self.declare_parameter('iou_threshold', 0.30) + self.declare_parameter('min_hits', 3) + self.declare_parameter('max_lost_frames', 30) + self.declare_parameter('reid_threshold', 0.55) + self.declare_parameter('depth_scale', 0.001) + self.declare_parameter('depth_max_m', 5.0) + self.declare_parameter('auto_follow', True) + + p = self.get_parameter + self._min_period = 1.0 / max(float(p('max_fps').value), 1.0) + self._depth_scale = float(p('depth_scale').value) + self._depth_max_m = float(p('depth_max_m').value) + self._auto_follow = bool(p('auto_follow').value) + + # ── Tracker + selector ─────────────────────────────────────────────── + self._tracker = PersonTracker( + iou_threshold = float(p('iou_threshold').value), + min_hits = int(p('min_hits').value), + max_lost_frames = int(p('max_lost_frames').value), + reid_threshold = float(p('reid_threshold').value), + ) + self._selector = FollowTargetSelector(hold_frames=15) + if self._auto_follow: + self._selector.start() + + # ── Detector (lazy) ────────────────────────────────────────────────── + self._detector: Optional[_YoloDetector] = _YoloDetector( + model_path = str(p('detector_model').value), + use_tensorrt = bool(p('use_tensorrt').value), + logger = self.get_logger(), + ) + self._hog_fallback: Optional[_HogDetector] = None + + # ── Camera state ───────────────────────────────────────────────────── + self._cam = CamParams() + self._cam_received = False + self._depth_image: Optional[np.ndarray] = None + self._last_proc = 0.0 + + # ── Subscribers ────────────────────────────────────────────────────── + self._color_sub = self.create_subscription( + Image, '/camera/color/image_raw', + self._on_color, _SENSOR_QOS, + ) + self._depth_sub = self.create_subscription( + Image, '/camera/depth/image_rect_raw', + self._on_depth, _SENSOR_QOS, + ) + self._info_sub = self.create_subscription( + CameraInfo, '/camera/depth/camera_info', + self._on_cam_info, _RELIABLE_QOS, + ) + self._start_sub = self.create_subscription( + Empty, '/saltybot/follow_start', + lambda _: self._selector.start(), 10, + ) + self._stop_sub = self.create_subscription( + Empty, '/saltybot/follow_stop', + lambda _: self._selector.stop(), 10, + ) + + # ── Publisher ──────────────────────────────────────────────────────── + self._pub = self.create_publisher( + TargetTrack, '/saltybot/target_track', 10 + ) + + self.get_logger().info( + 'person_tracking_node ready — ' + f'auto_follow={self._auto_follow}, ' + f'max_fps={1.0/self._min_period:.0f}' + ) + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _on_cam_info(self, msg: CameraInfo) -> None: + if self._cam_received: + return + self._cam = CamParams( + fx=float(msg.k[0]), fy=float(msg.k[4]), + cx=float(msg.k[2]), cy=float(msg.k[5]), + ) + self._cam_received = True + self.get_logger().info( + f'Camera intrinsics: fx={self._cam.fx:.1f} cx={self._cam.cx:.1f}' + ) + + def _on_depth(self, msg: Image) -> None: + if msg.encoding.lower() not in ('16uc1', 'mono16'): + return + data = np.frombuffer(msg.data, dtype=np.uint16) + try: + self._depth_image = data.reshape((msg.height, msg.width)) + except ValueError: + pass + + def _on_color(self, msg: Image) -> None: + now = time.monotonic() + if now - self._last_proc < self._min_period: + return + self._last_proc = now + + enc = msg.encoding.lower() + if enc not in ('bgr8', 'rgb8'): + return + data = np.frombuffer(msg.data, dtype=np.uint8) + try: + frame = data.reshape((msg.height, msg.width, 3)) + except ValueError: + return + if enc == 'rgb8': + import cv2 + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + + # ── Detect ──────────────────────────────────────────────────────── + dets: List[Detection] = [] + if self._detector and self._detector._ready.is_set(): + if self._detector._available: + dets = self._detector.detect(frame) + else: + # Init HOG fallback on first YOLO failure + if self._hog_fallback is None: + try: + self._hog_fallback = _HogDetector() + except Exception: + pass + if self._hog_fallback: + dets = self._hog_fallback.detect(frame) + else: + # YOLO still loading — run HOG if available + if self._hog_fallback is None: + try: + self._hog_fallback = _HogDetector() + except Exception: + pass + if self._hog_fallback: + dets = self._hog_fallback.detect(frame) + + # ── Track ───────────────────────────────────────────────────────── + active = self._tracker.update( + dets, + cam = self._cam if self._cam_received else None, + depth_u16 = self._depth_image, + depth_scale = self._depth_scale, + ) + + # ── Select target ───────────────────────────────────────────────── + target = self._selector.update( + active, img_cx=self._cam.cx + ) + + # ── Publish ─────────────────────────────────────────────────────── + out = TargetTrack() + out.header.stamp = msg.header.stamp + out.header.frame_id = msg.header.frame_id or 'camera_color_optical_frame' + + if target is not None: + out.tracking_active = True + out.track_id = target.track_id + out.bearing_deg = float(target.bearing) + out.distance_m = float(target.distance) + out.confidence = float(target.confidence) + out.bbox_x = int(target.bbox.x) + out.bbox_y = int(target.bbox.y) + out.bbox_w = int(target.bbox.w) + out.bbox_h = int(target.bbox.h) + out.depth_quality = int(target.depth_qual) + + # Convert Kalman pixel velocity → bearing rate + if self._cam_received and target.vel_u != 0.0: + u_c = target.bbox.x + target.bbox.w * 0.5 + # d(bearing)/du ≈ fx / (fx² + (u-cx)²) * (180/π) + denom = self._cam.fx ** 2 + (u_c - self._cam.cx) ** 2 + d_bear_du = (self._cam.fx / denom) * (180.0 / 3.14159) + out.vel_bearing_dps = float(d_bear_du * target.vel_u * self._cam.fps) + + # Distance velocity from depth (placeholder: not computed per-frame here) + out.vel_dist_mps = 0.0 + else: + out.tracking_active = False + + self._pub.publish(out) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = PersonTrackingNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_bringup/setup.py b/jetson/ros2_ws/src/saltybot_bringup/setup.py index 1bc13d5..257cb11 100644 --- a/jetson/ros2_ws/src/saltybot_bringup/setup.py +++ b/jetson/ros2_ws/src/saltybot_bringup/setup.py @@ -61,6 +61,8 @@ setup( 'audio_scene = saltybot_bringup.audio_scene_node:main', # Face emotion classifier (Issue #359) 'face_emotion = saltybot_bringup.face_emotion_node:main', + # Person tracking for follow-me mode (Issue #363) + 'person_tracking = saltybot_bringup.person_tracking_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_bringup/test/test_person_tracker.py b/jetson/ros2_ws/src/saltybot_bringup/test/test_person_tracker.py new file mode 100644 index 0000000..21044a2 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/test/test_person_tracker.py @@ -0,0 +1,599 @@ +""" +test_person_tracker.py — Unit tests for the P0 person tracking pipeline. + +Tests cover: IoU, Kalman filter, colour histogram, bearing geometry, +depth sampling, tracker state machine, and follow-target selection. +No camera, no detector, no ROS2 needed. +""" + +import math + +import numpy as np +import pytest + +from saltybot_bringup._person_tracker import ( + BBox, + CamParams, + Detection, + FollowTargetSelector, + KalmanBoxFilter, + PersonTrack, + PersonTracker, + TrackState, + DEPTH_GOOD, DEPTH_MARGINAL, DEPTH_EXTRAPOLATED, DEPTH_INVALID, + bearing_from_pixel, + depth_at_bbox, + extract_torso_hist, + hist_similarity, + iou, +) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _det(x=10, y=20, w=60, h=150, conf=0.85, frame=None) -> Detection: + return Detection(BBox(x, y, w, h), conf, frame) + + +def _solid_bgr(h=200, w=100, b=128, g=64, r=32) -> np.ndarray: + """Uniform colour BGR image.""" + return np.full((h, w, 3), (b, g, r), dtype=np.uint8) + + +def _depth_image(h=480, w=640, val_mm=2000) -> np.ndarray: + """Uniform uint16 depth image at val_mm mm.""" + return np.full((h, w), val_mm, dtype=np.uint16) + + +def _run_tracker_n(tracker, bbox, n=5, cam=None): + """Feed the same detection to a tracker n times.""" + for _ in range(n): + tracker.update([_det(*bbox)], cam=cam) + + +# ───────────────────────────────────────────────────────────────────────────── +# BBox +# ───────────────────────────────────────────────────────────────────────────── + +def test_bbox_fields(): + b = BBox(10, 20, 60, 150) + assert b.x == 10 and b.y == 20 and b.w == 60 and b.h == 150 + + +def test_bbox_is_named_tuple(): + b = BBox(0, 0, 1, 1) + assert isinstance(b, tuple) + + +# ───────────────────────────────────────────────────────────────────────────── +# IoU +# ───────────────────────────────────────────────────────────────────────────── + +def test_iou_identical(): + b = BBox(0, 0, 100, 100) + assert abs(iou(b, b) - 1.0) < 1e-6 + + +def test_iou_no_overlap(): + a = BBox(0, 0, 50, 50) + b = BBox(100, 0, 50, 50) + assert iou(a, b) == pytest.approx(0.0, abs=1e-6) + + +def test_iou_partial_overlap(): + a = BBox(0, 0, 100, 100) + b = BBox(50, 0, 100, 100) # 50 % horizontal overlap + result = iou(a, b) + assert 0.0 < result < 1.0 + + +def test_iou_exact_overlap_value(): + a = BBox(0, 0, 100, 100) # area 10000 + b = BBox(0, 0, 50, 100) # area 5000, inter=5000, union=10000 + assert abs(iou(a, b) - 0.5) < 1e-6 + + +def test_iou_contained(): + a = BBox(0, 0, 100, 100) + b = BBox(25, 25, 50, 50) # b inside a + result = iou(a, b) + # inter = 2500, union = 10000+2500-2500 = 10000 + assert abs(result - 0.25) < 1e-6 + + +def test_iou_symmetric(): + a = BBox(10, 10, 80, 80) + b = BBox(50, 50, 80, 80) + assert abs(iou(a, b) - iou(b, a)) < 1e-9 + + +def test_iou_touching_edges(): + a = BBox(0, 0, 50, 50) + b = BBox(50, 0, 50, 50) # touch at x=50 → zero overlap + assert iou(a, b) == pytest.approx(0.0, abs=1e-6) + + +# ───────────────────────────────────────────────────────────────────────────── +# KalmanBoxFilter +# ───────────────────────────────────────────────────────────────────────────── + +def test_kalman_predict_returns_bbox(): + kf = KalmanBoxFilter(BBox(10, 20, 60, 150)) + pred = kf.predict() + assert isinstance(pred, BBox) + assert pred.w >= 1 and pred.h >= 1 + + +def test_kalman_update_returns_bbox(): + kf = KalmanBoxFilter(BBox(10, 20, 60, 150)) + updated = kf.update(BBox(10, 20, 60, 150)) + assert isinstance(updated, BBox) + + +def test_kalman_converges_to_stationary(): + """After many identical measurements, Kalman should converge near them.""" + init = BBox(100, 100, 80, 160) + meas = BBox(102, 98, 80, 160) + kf = KalmanBoxFilter(init) + for _ in range(20): + kf.predict() + result = kf.update(meas) + # Should be within ~10 px of measurement + assert abs(result.x - meas.x) < 10 + assert abs(result.y - meas.y) < 10 + + +def test_kalman_predict_advances_with_velocity(): + """Give the filter a few frames of rightward motion; prediction overshoots.""" + kf = KalmanBoxFilter(BBox(100, 100, 60, 150)) + for i in range(5): + kf.predict() + kf.update(BBox(100 + i * 10, 100, 60, 150)) + pred = kf.predict() + # After motion right, predicted cx should be further right + assert pred.x > 100 + 4 * 10 - 5 # at least near last position + + +def test_kalman_velocity_zero_initially(): + kf = KalmanBoxFilter(BBox(100, 100, 60, 150)) + assert kf.velocity_px == (0.0, 0.0) + + +def test_kalman_bbox_property(): + b = BBox(50, 50, 80, 120) + kf = KalmanBoxFilter(b) + out = kf.bbox + assert isinstance(out, BBox) + # Should be near initial + assert abs(out.x - b.x) <= 2 + assert abs(out.y - b.y) <= 2 + + +def test_kalman_width_height_stay_positive(): + kf = KalmanBoxFilter(BBox(10, 10, 5, 5)) + for _ in range(30): + kf.predict() + assert kf.bbox.w >= 1 and kf.bbox.h >= 1 + + +# ───────────────────────────────────────────────────────────────────────────── +# extract_torso_hist +# ───────────────────────────────────────────────────────────────────────────── + +def test_torso_hist_shape(): + bgr = _solid_bgr() + h = extract_torso_hist(bgr, BBox(0, 0, 100, 200)) + assert h is not None + assert h.shape == (128,) # 16 * 8 + + +def test_torso_hist_normalised(): + bgr = _solid_bgr() + h = extract_torso_hist(bgr, BBox(0, 0, 100, 200)) + assert h is not None + assert abs(h.sum() - 1.0) < 1e-5 + + +def test_torso_hist_none_for_none_frame(): + assert extract_torso_hist(None, BBox(0, 0, 100, 200)) is None + + +def test_torso_hist_none_for_tiny_bbox(): + bgr = _solid_bgr() + assert extract_torso_hist(bgr, BBox(0, 0, 2, 2)) is None + + +def test_torso_hist_different_colours(): + """Two differently coloured crops should produce different histograms.""" + bgr_red = np.full((200, 100, 3), (0, 0, 255), dtype=np.uint8) + bgr_blue = np.full((200, 100, 3), (255, 0, 0), dtype=np.uint8) + h_red = extract_torso_hist(bgr_red, BBox(0, 0, 100, 200)) + h_blue = extract_torso_hist(bgr_blue, BBox(0, 0, 100, 200)) + assert h_red is not None and h_blue is not None + assert not np.allclose(h_red, h_blue) + + +# ───────────────────────────────────────────────────────────────────────────── +# hist_similarity +# ───────────────────────────────────────────────────────────────────────────── + +def test_hist_similarity_identical(): + h = np.ones(128, dtype=np.float32) / 128.0 + assert abs(hist_similarity(h, h) - 1.0) < 1e-6 + + +def test_hist_similarity_orthogonal(): + """Non-overlapping histograms → 0 similarity.""" + h1 = np.zeros(128, dtype=np.float32) + h2 = np.zeros(128, dtype=np.float32) + h1[:64] = 1.0 / 64 + h2[64:] = 1.0 / 64 + assert abs(hist_similarity(h1, h2)) < 1e-6 + + +def test_hist_similarity_range(): + rng = np.random.RandomState(0) + for _ in range(20): + h1 = rng.rand(128).astype(np.float32) + h2 = rng.rand(128).astype(np.float32) + h1 /= h1.sum(); h2 /= h2.sum() + s = hist_similarity(h1, h2) + assert 0.0 <= s <= 1.0 + 1e-6 + + +def test_hist_similarity_symmetric(): + h1 = np.random.RandomState(1).rand(128).astype(np.float32) + h2 = np.random.RandomState(2).rand(128).astype(np.float32) + h1 /= h1.sum(); h2 /= h2.sum() + assert abs(hist_similarity(h1, h2) - hist_similarity(h2, h1)) < 1e-6 + + +# ───────────────────────────────────────────────────────────────────────────── +# bearing_from_pixel +# ───────────────────────────────────────────────────────────────────────────── + +def test_bearing_centre_is_zero(): + """Pixel at principal point → bearing = 0°.""" + assert abs(bearing_from_pixel(320.0, 320.0, 615.0)) < 1e-9 + + +def test_bearing_right_is_positive(): + b = bearing_from_pixel(400.0, 320.0, 615.0) + assert b > 0.0 + + +def test_bearing_left_is_negative(): + b = bearing_from_pixel(200.0, 320.0, 615.0) + assert b < 0.0 + + +def test_bearing_symmetric(): + b_right = bearing_from_pixel(400.0, 320.0, 615.0) + b_left = bearing_from_pixel(240.0, 320.0, 615.0) + assert abs(b_right + b_left) < 1e-9 + + +def test_bearing_approx_at_45_deg(): + """u - cx = fx → atan(1) = 45°.""" + bearing = bearing_from_pixel(935.0, 320.0, 615.0) + assert abs(bearing - 45.0) < 0.1 + + +def test_bearing_degrees_not_radians(): + """Result must be in degrees (much larger than a radian value).""" + b = bearing_from_pixel(400.0, 320.0, 615.0) + assert abs(b) > 0.5 # atan2(80/615) ≈ 7.4° + + +# ───────────────────────────────────────────────────────────────────────────── +# depth_at_bbox +# ───────────────────────────────────────────────────────────────────────────── + +def test_depth_uniform_field(): + d = _depth_image(val_mm=2000) + depth_m, quality = depth_at_bbox(d, BBox(200, 150, 80, 200)) + assert abs(depth_m - 2.0) < 0.01 + assert quality == DEPTH_GOOD + + +def test_depth_zero_image_invalid(): + d = np.zeros((480, 640), dtype=np.uint16) + depth_m, quality = depth_at_bbox(d, BBox(200, 150, 80, 200)) + assert depth_m == 0.0 + assert quality == DEPTH_INVALID + + +def test_depth_partial_fill_marginal(): + d = np.zeros((480, 640), dtype=np.uint16) + # Fill only 40 % of the central window with valid readings + d[200:260, 280:360] = 1500 + _, quality = depth_at_bbox(d, BBox(200, 150, 160, 200), window_frac=1.0) + assert quality in (DEPTH_MARGINAL, DEPTH_EXTRAPOLATED, DEPTH_GOOD) + + +def test_depth_scale_applied(): + d = _depth_image(val_mm=3000) + depth_m, _ = depth_at_bbox(d, BBox(100, 100, 80, 150), depth_scale=0.001) + assert abs(depth_m - 3.0) < 0.01 + + +def test_depth_out_of_bounds_bbox(): + """Bbox outside image should return DEPTH_INVALID.""" + d = _depth_image() + _, quality = depth_at_bbox(d, BBox(700, 500, 80, 100)) # off-screen + assert quality == DEPTH_INVALID + + +def test_depth_at_5m(): + d = _depth_image(val_mm=5000) + depth_m, _ = depth_at_bbox(d, BBox(200, 100, 80, 200)) + assert abs(depth_m - 5.0) < 0.05 + + +# ───────────────────────────────────────────────────────────────────────────── +# PersonTracker — state machine +# ───────────────────────────────────────────────────────────────────────────── + +def test_tracker_new_track_tentative(): + t = PersonTracker(min_hits=3) + active = t.update([_det()]) + # min_hits=3: first frame → TENTATIVE, not in active output + assert len(active) == 0 + assert len(t.tracks) == 1 + assert t.tracks[0].state == TrackState.TENTATIVE + + +def test_tracker_track_becomes_active(): + t = PersonTracker(min_hits=3) + for _ in range(3): + active = t.update([_det()]) + assert len(active) == 1 + assert active[0].state == TrackState.ACTIVE + + +def test_tracker_persistent_id(): + t = PersonTracker(min_hits=2) + _run_tracker_n(t, (10, 20, 60, 150), n=4) + assert len(t.tracks) == 1 + assert t.tracks[0].track_id == 0 + + +def test_tracker_two_detections_two_tracks(): + t = PersonTracker(min_hits=2) + # Two widely-separated boxes → two tracks + dets = [ + Detection(BBox(10, 20, 60, 150), 0.9), + Detection(BBox(400, 20, 60, 150), 0.9), + ] + for _ in range(3): + t.update(dets) + assert len(t.tracks) == 2 + + +def test_tracker_ids_increment(): + t = PersonTracker(min_hits=1) + t.update([_det(x=10)]) + t.update([_det(x=10), _det(x=300)]) + ids = {trk.track_id for trk in t.tracks} + assert len(ids) == 2 + + +def test_tracker_track_goes_lost(): + t = PersonTracker(min_hits=2, max_lost_frames=5) + _run_tracker_n(t, (10, 20, 60, 150), n=3) + # Now send no detections + t.update([]) + lost = [tr for tr in t.tracks if tr.state == TrackState.LOST] + assert len(lost) == 1 + + +def test_tracker_track_removed_after_max_lost(): + t = PersonTracker(min_hits=2, max_lost_frames=3) + _run_tracker_n(t, (10, 20, 60, 150), n=3) + for _ in range(5): + t.update([]) + assert len(t.tracks) == 0 + + +def test_tracker_iou_matching_same_track(): + """Slightly moved detection should match the existing track (not create new).""" + t = PersonTracker(min_hits=2, iou_threshold=0.3) + _run_tracker_n(t, (10, 20, 60, 150), n=3) + n_before = len(t.tracks) + t.update([_det(x=15, y=20, w=60, h=150)]) # small shift, high IoU + assert len(t.tracks) == n_before + + +def test_tracker_no_overlap_creates_new_track(): + t = PersonTracker(min_hits=2, iou_threshold=0.3) + _run_tracker_n(t, (10, 20, 60, 150), n=3) + t.update([_det(x=500, y=20, w=60, h=150)]) # far away → new track + assert len(t.tracks) == 2 # old (lost) + new (tentative) + + +def test_tracker_reset(): + t = PersonTracker(min_hits=2) + _run_tracker_n(t, (10, 20, 60, 150), n=3) + t.reset() + assert len(t.tracks) == 0 + assert t._next_id == 0 + + +def test_tracker_bearing_set_with_cam(): + cam = CamParams(fx=615.0, cx=320.0) + t = PersonTracker(min_hits=2) + _run_tracker_n(t, (10, 20, 60, 150), n=3, cam=cam) + active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE] + assert len(active) > 0 + # bearing of a box at x=10..70 (cx ≈ 40, image cx=320) → negative (left) + assert active[0].bearing < 0.0 + + +def test_tracker_distance_set_with_depth(): + cam = CamParams(fx=615.0, cx=320.0) + t = PersonTracker(min_hits=2) + d = _depth_image(val_mm=2000) + for _ in range(3): + t.update([_det(x=200, y=100, w=80, h=200)], cam=cam, depth_u16=d) + active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE] + assert len(active) > 0 + assert abs(active[0].distance - 2.0) < 0.1 + + +def test_tracker_no_depth_distance_zero(): + cam = CamParams() + t = PersonTracker(min_hits=2) + for _ in range(3): + t.update([_det()], cam=cam, depth_u16=None) + active = [tr for tr in t.tracks if tr.state == TrackState.ACTIVE] + assert len(active) > 0 + assert active[0].distance == 0.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# PersonTracker — re-identification +# ───────────────────────────────────────────────────────────────────────────── + +def test_reid_restores_track_id(): + """Person disappears for a few frames then reappears; same track_id.""" + # Blue person in centre + bgr_blue = np.full((480, 640, 3), (200, 50, 0), dtype=np.uint8) + bbox = (250, 50, 80, 200) + t = PersonTracker( + min_hits=2, max_lost_frames=10, + reid_threshold=0.4, reid_max_dist=200.0, + ) + + # Establish track + for _ in range(3): + t.update([Detection(BBox(*bbox), 0.9, bgr_blue)]) + + track_id_before = t.tracks[0].track_id + + # Disappear for 3 frames + for _ in range(3): + t.update([]) + + # Re-appear at same position with same colour + t.update([Detection(BBox(*bbox), 0.9, bgr_blue)]) + + # Track should be re-identified with the same ID + assert any(tr.track_id == track_id_before for tr in t.tracks) + + +def test_reid_different_colour_creates_new_track(): + """Person disappears; different colour appears same place → new track ID.""" + bgr_blue = np.full((480, 640, 3), (200, 50, 0), dtype=np.uint8) + bgr_red = np.full((480, 640, 3), (0, 50, 200), dtype=np.uint8) + bbox = (250, 50, 80, 200) + t = PersonTracker( + min_hits=2, max_lost_frames=5, + reid_threshold=0.85, # strict threshold → red won't match blue + reid_max_dist=200.0, + ) + + for _ in range(3): + t.update([Detection(BBox(*bbox), 0.9, bgr_blue)]) + + track_id_before = t.tracks[0].track_id + + for _ in range(2): + t.update([]) + + # Red person (different colour) + for _ in range(3): + t.update([Detection(BBox(*bbox), 0.9, bgr_red)]) + + new_ids = {tr.track_id for tr in t.tracks} + # Should contain a new ID (not only the original) + assert track_id_before not in new_ids or len(new_ids) > 1 + + +# ───────────────────────────────────────────────────────────────────────────── +# FollowTargetSelector +# ───────────────────────────────────────────────────────────────────────────── + +def _make_track(track_id, x, y, w, h, dist=0.0) -> PersonTrack: + trk = PersonTrack( + track_id=track_id, state=TrackState.ACTIVE, + bbox=BBox(x, y, w, h), distance=dist, + ) + return trk + + +def test_selector_inactive_returns_none(): + sel = FollowTargetSelector() + sel.stop() + result = sel.update([_make_track(0, 100, 50, 80, 200, dist=3.0)]) + assert result is None + + +def test_selector_active_selects_nearest_by_distance(): + sel = FollowTargetSelector() + sel.start() + tracks = [ + _make_track(0, 100, 50, 80, 200, dist=4.0), + _make_track(1, 300, 50, 80, 200, dist=1.5), + ] + result = sel.update(tracks, img_cx=320.0) + assert result.track_id == 1 # closer + + +def test_selector_locks_on_same_track(): + sel = FollowTargetSelector() + sel.start() + tracks = [ + _make_track(0, 100, 50, 80, 200, dist=2.0), + _make_track(1, 300, 50, 80, 200, dist=5.0), + ] + first = sel.update(tracks, img_cx=320.0) + # Second call — switch distance but should keep locked ID + tracks[0] = _make_track(0, 100, 50, 80, 200, dist=2.0) + second = sel.update(tracks, img_cx=320.0) + assert second.track_id == first.track_id + + +def test_selector_holds_last_when_tracks_empty(): + sel = FollowTargetSelector(hold_frames=5) + sel.start() + sel.update([_make_track(0, 100, 50, 80, 200, dist=2.0)]) + # Now empty — should hold for up to 5 frames + result = sel.update([], img_cx=320.0) + assert result is not None + assert result.track_id == 0 + + +def test_selector_stops_holding_after_hold_frames(): + sel = FollowTargetSelector(hold_frames=2) + sel.start() + sel.update([_make_track(0, 100, 50, 80, 200)]) + for _ in range(5): + result = sel.update([]) + assert result is None + + +def test_selector_selects_by_centre_when_no_depth(): + sel = FollowTargetSelector() + sel.start() + # Track 0: x=0..80 → centre x=40 (far left of image) + # Track 1: x=280..360 → centre x=320 (near image centre) + tracks = [ + _make_track(0, 0, 50, 80, 200, dist=0.0), + _make_track(1, 280, 50, 80, 200, dist=0.0), + ] + result = sel.update(tracks, img_cx=320.0) + assert result.track_id == 1 # nearer to image centre + + +def test_selector_restart_reselects(): + sel = FollowTargetSelector() + sel.start() + t0 = _make_track(0, 100, 50, 80, 200, dist=2.0) + sel.update([t0]) + sel.stop() + sel.start() + t1 = _make_track(1, 300, 50, 80, 200, dist=1.0) + result = sel.update([t0, t1]) + assert result.track_id == 1 # re-selected nearest diff --git a/jetson/ros2_ws/src/saltybot_scene_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_scene_msgs/CMakeLists.txt index f11f157..e386fa9 100644 --- a/jetson/ros2_ws/src/saltybot_scene_msgs/CMakeLists.txt +++ b/jetson/ros2_ws/src/saltybot_scene_msgs/CMakeLists.txt @@ -35,6 +35,8 @@ rosidl_generate_interfaces(${PROJECT_NAME} # Issue #359 — face emotion classifier "msg/FaceEmotion.msg" "msg/FaceEmotionArray.msg" + # Issue #363 — person tracking for follow-me mode + "msg/TargetTrack.msg" DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces ) diff --git a/jetson/ros2_ws/src/saltybot_scene_msgs/msg/TargetTrack.msg b/jetson/ros2_ws/src/saltybot_scene_msgs/msg/TargetTrack.msg new file mode 100644 index 0000000..29ce57e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_scene_msgs/msg/TargetTrack.msg @@ -0,0 +1,13 @@ +std_msgs/Header header +bool tracking_active # false when no target is locked +uint32 track_id # persistent ID across frames +float32 bearing_deg # horizontal bearing to target (°, right=+, left=−) +float32 distance_m # range to target (m); 0 = unknown / depth invalid +float32 confidence # 0.0–1.0 overall track quality +int32 bbox_x # colour-image bounding box (pixels, top-left origin) +int32 bbox_y +int32 bbox_w +int32 bbox_h +float32 vel_bearing_dps # bearing rate (°/s, from Kalman velocity state) +float32 vel_dist_mps # distance rate (m/s, + = moving away) +uint8 depth_quality # 0=invalid 1=extrapolated 2=marginal 3=good