From 3310a858de27dffde3adec2b60ab6c1bf449a177 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Sun, 1 Mar 2026 23:29:13 -0500 Subject: [PATCH] =?UTF-8?q?feat(social):=20multi-modal=20tracking=20fusion?= =?UTF-8?q?=20=E2=80=94=20UWB+camera=20Kalman=20filter=20(Issue=20#92)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New packages: saltybot_social_msgs — FusedTarget.msg custom message saltybot_social_tracking — 4-state Kalman fusion node saltybot_social_tracking/tracking_fusion_node.py Subscribes to /uwb/target (PoseStamped, ~10 Hz) and /person/target (PoseStamped, ~30 Hz) and publishes /social/tracking/fused_target (FusedTarget) at 20 Hz. Source arbitration: • "fused" — both UWB and camera are fresh; confidence-weighted blend • "uwb" — UWB fresh, camera stale • "camera" — camera fresh, UWB stale • "predicted" — all sources stale; KF coasts for up to predict_timeout (3 s) Kalman filter (kalman_tracker.py): State [x, y, vx, vy] with discrete Wiener acceleration noise model (process_noise=3.0 m/s²) sized for EUC speeds (20-30 km/h, ≈5.5-8.3 m/s). Separate UWB (0.20 m) and camera (0.12 m) measurement noise. Velocity estimate converges after ~3 s of 10 Hz UWB measurements. Confidence model (source_arbiter.py): Per-source confidence = quality × max(0, 1 - age/timeout). Composite confidence accounts for KF positional uncertainty and is capped at 0.4 during dead-reckoning ("predicted") mode. Tests: 58/58 pass (no ROS2 runtime required). Note: saltybot_social_msgs here adds FusedTarget.msg; PR #98 (Issue #84) adds PersonalityState.msg + QueryMood.srv to the same package. The maintainer should squash-merge #98 first and rebase this branch on top of it before merging to avoid the package.xml conflict. Co-Authored-By: Claude Sonnet 4.6 --- .../src/saltybot_social_msgs/CMakeLists.txt | 15 + .../saltybot_social_msgs/msg/FusedTarget.msg | 19 + .../src/saltybot_social_msgs/package.xml | 26 ++ .../src/saltybot_social_tracking/.gitignore | 5 + .../config/tracking_params.yaml | 48 ++ .../launch/tracking.launch.py | 44 ++ .../src/saltybot_social_tracking/package.xml | 31 ++ .../resource/saltybot_social_tracking | 0 .../saltybot_social_tracking/__init__.py | 0 .../kalman_tracker.py | 134 ++++++ .../source_arbiter.py | 155 +++++++ .../tracking_fusion_node.py | 257 ++++++++++ .../src/saltybot_social_tracking/setup.cfg | 5 + .../src/saltybot_social_tracking/setup.py | 32 ++ .../test/test_tracking_fusion.py | 438 ++++++++++++++++++ 15 files changed, 1209 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt create mode 100644 jetson/ros2_ws/src/saltybot_social_msgs/msg/FusedTarget.msg create mode 100644 jetson/ros2_ws/src/saltybot_social_msgs/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/.gitignore create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/config/tracking_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/launch/tracking.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/resource/saltybot_social_tracking create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/kalman_tracker.py create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/source_arbiter.py create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/tracking_fusion_node.py create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_social_tracking/test/test_tracking_fusion.py diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt new file mode 100644 index 0000000..ddafa7a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 3.8) +project(saltybot_social_msgs) + +find_package(ament_cmake REQUIRED) +find_package(rosidl_default_generators REQUIRED) +find_package(std_msgs REQUIRED) +find_package(geometry_msgs REQUIRED) + +rosidl_generate_interfaces(${PROJECT_NAME} + "msg/FusedTarget.msg" + DEPENDENCIES std_msgs geometry_msgs +) + +ament_export_dependencies(rosidl_default_runtime) +ament_package() diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/msg/FusedTarget.msg b/jetson/ros2_ws/src/saltybot_social_msgs/msg/FusedTarget.msg new file mode 100644 index 0000000..1211416 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_msgs/msg/FusedTarget.msg @@ -0,0 +1,19 @@ +# FusedTarget.msg — output of the multi-modal tracking fusion node. +# +# Position and velocity are in the base_link frame (robot-centred, +# +X forward, +Y left). z components are always 0.0 for ground-plane tracking. +# +# Confidence: 0.0 = no data / fully predicted; 1.0 = strong fused measurement. +# active_source: "fused" | "uwb" | "camera" | "predicted" + +std_msgs/Header header + +geometry_msgs/Point position # filtered 2-D position (m), z=0 +geometry_msgs/Vector3 velocity # filtered 2-D velocity (m/s), z=0 + +float32 range_m # Euclidean distance from robot to fused position +float32 bearing_rad # bearing in base_link (+ve = person to the left) +float32 confidence # composite confidence [0.0, 1.0] + +string active_source # "fused" | "uwb" | "camera" | "predicted" +string tag_id # UWB tag address (empty when UWB not contributing) diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/package.xml b/jetson/ros2_ws/src/saltybot_social_msgs/package.xml new file mode 100644 index 0000000..debb833 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_msgs/package.xml @@ -0,0 +1,26 @@ + + + + saltybot_social_msgs + 0.1.0 + + Custom ROS2 message and service definitions for the saltybot social subsystem. + Includes FusedTarget (multi-modal tracking fusion output). + + sl-controls + MIT + + ament_cmake + rosidl_default_generators + + std_msgs + geometry_msgs + + rosidl_default_runtime + + rosidl_interface_packages + + + ament_cmake + + diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/.gitignore b/jetson/ros2_ws/src/saltybot_social_tracking/.gitignore new file mode 100644 index 0000000..bf24d64 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/.gitignore @@ -0,0 +1,5 @@ +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +.pytest_cache/ diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/config/tracking_params.yaml b/jetson/ros2_ws/src/saltybot_social_tracking/config/tracking_params.yaml new file mode 100644 index 0000000..152764a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/config/tracking_params.yaml @@ -0,0 +1,48 @@ +# tracking_params.yaml — saltybot_social_tracking / TrackingFusionNode +# +# Run with: +# ros2 launch saltybot_social_tracking tracking.launch.py +# +# Topics consumed: +# /uwb/target (geometry_msgs/PoseStamped) — UWB triangulated position +# /person/target (geometry_msgs/PoseStamped) — camera-detected position +# +# Topic produced: +# /social/tracking/fused_target (saltybot_social_msgs/FusedTarget) + +# ── Source staleness timeouts ────────────────────────────────────────────────── +# UWB driver publishes at ~10 Hz; 1.5 s = 15 missed cycles before declared stale. +uwb_timeout: 1.5 # seconds + +# Camera detector publishes at ~30 Hz; 1.0 s = 30 missed frames before stale. +cam_timeout: 1.0 # seconds + +# How long the Kalman filter may coast (dead-reckoning) with no live source +# before the node stops publishing. +# At 10 m/s (EUC top-speed) the robot drifts ≈30 m over 3 s — beyond the UWB +# follow-range, so 3 s is a reasonable hard stop. +predict_timeout: 3.0 # seconds + +# ── Kalman filter tuning ─────────────────────────────────────────────────────── +# process_noise: acceleration noise std-dev (m/s²). +# EUC riders can brake or accelerate at ~3–5 m/s²; 3.0 is a good starting point. +# Increase if the filtered track lags behind fast direction changes. +# Decrease if the track is noisy. +process_noise: 3.0 # m/s² + +# UWB position measurement noise (std-dev, metres). +# DW3000 TWR accuracy ≈ ±10–20 cm; 0.20 accounts for system-level error. +meas_noise_uwb: 0.20 # m + +# Camera position noise (std-dev, metres). +# Depth reprojection error with RealSense D435i at 1–3 m ≈ ±5–15 cm. +meas_noise_cam: 0.12 # m + +# ── Control loop ────────────────────────────────────────────────────────────── +control_rate: 20.0 # Hz — KF predict + publish rate + +# ── Source arbiter ──────────────────────────────────────────────────────────── +# Minimum normalised confidence for a source to be considered live. +# Range [0, 1]; lower = more permissive; default 0.15 keeps slightly stale +# sources active rather than dropping to "predicted" prematurely. +confidence_threshold: 0.15 diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/launch/tracking.launch.py b/jetson/ros2_ws/src/saltybot_social_tracking/launch/tracking.launch.py new file mode 100644 index 0000000..af923e6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/launch/tracking.launch.py @@ -0,0 +1,44 @@ +"""tracking.launch.py — launch the TrackingFusionNode with default params.""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_social_tracking") + default_params = os.path.join(pkg_share, "config", "tracking_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", + default_value=default_params, + description="Path to tracking fusion parameter YAML file", + ), + DeclareLaunchArgument( + "control_rate", + default_value="20.0", + description="KF predict + publish rate (Hz)", + ), + DeclareLaunchArgument( + "predict_timeout", + default_value="3.0", + description="Max KF coast time before stopping publish (s)", + ), + Node( + package="saltybot_social_tracking", + executable="tracking_fusion_node", + name="tracking_fusion", + output="screen", + parameters=[ + LaunchConfiguration("params_file"), + { + "control_rate": LaunchConfiguration("control_rate"), + "predict_timeout": LaunchConfiguration("predict_timeout"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/package.xml b/jetson/ros2_ws/src/saltybot_social_tracking/package.xml new file mode 100644 index 0000000..760677b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_social_tracking + 0.1.0 + + Multi-modal tracking fusion for saltybot. + Fuses UWB triangulated position (/uwb/target) and camera-detected position + (/person/target) using a 4-state Kalman filter to produce a smooth, low-latency + fused estimate at /social/tracking/fused_target. + Handles EUC rider speeds (20-30 km/h), signal handoff, and predictive coasting. + + sl-controls + MIT + + rclpy + geometry_msgs + std_msgs + saltybot_social_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/resource/saltybot_social_tracking b/jetson/ros2_ws/src/saltybot_social_tracking/resource/saltybot_social_tracking new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/__init__.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/kalman_tracker.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/kalman_tracker.py new file mode 100644 index 0000000..5ac574d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/kalman_tracker.py @@ -0,0 +1,134 @@ +""" +kalman_tracker.py — 4-state linear Kalman filter for 2-D position+velocity tracking. + +State vector: [x, y, vx, vy] +Observation: [x_meas, y_meas] + +Process model: constant velocity with Wiener process acceleration noise. +Tuned to handle EUC rider speeds (20–30 km/h ≈ 5.5–8.3 m/s) with fast +acceleration transients. + +Pure module — no ROS2 dependency; fully unit-testable. +""" + +import math +import numpy as np + + +class KalmanTracker: + """ + 4-state Kalman filter: state = [x, y, vx, vy]. + + Parameters + ---------- + process_noise : acceleration noise standard deviation (m/s²). + Higher values allow the filter to track rapid velocity + changes (EUC acceleration events). Default 3.0 m/s². + meas_noise_uwb : UWB position measurement noise std-dev (m). Default 0.20 m. + meas_noise_cam : Camera position measurement noise std-dev (m). Default 0.12 m. + """ + + def __init__( + self, + process_noise: float = 3.0, + meas_noise_uwb: float = 0.20, + meas_noise_cam: float = 0.12, + ): + self._q = float(process_noise) + self._r_uwb = float(meas_noise_uwb) + self._r_cam = float(meas_noise_cam) + + # State [x, y, vx, vy] + self._x = np.zeros(4) + + # Covariance — large initial uncertainty (10 m², 10 (m/s)²) + self._P = np.diag([10.0, 10.0, 10.0, 10.0]) + + # Observation matrix: H * x = [x, y] + self._H = np.array([[1.0, 0.0, 0.0, 0.0], + [0.0, 1.0, 0.0, 0.0]]) + + self._initialized = False + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + @property + def initialized(self) -> bool: + return self._initialized + + def initialize(self, x: float, y: float) -> None: + """Seed the filter at position (x, y) with zero velocity.""" + self._x = np.array([x, y, 0.0, 0.0]) + self._P = np.diag([1.0, 1.0, 5.0, 5.0]) + self._initialized = True + + def predict(self, dt: float) -> None: + """ + Advance the filter state by dt seconds. + + Uses a discrete Wiener process acceleration model so that positional + uncertainty grows as O(dt^4/4) and velocity uncertainty as O(dt^2). + This lets the filter coast accurately through short signal outages + while still being responsive to EUC velocity changes. + """ + if dt <= 0.0: + return + + F = np.array([[1.0, 0.0, dt, 0.0], + [0.0, 1.0, 0.0, dt], + [0.0, 0.0, 1.0, 0.0], + [0.0, 0.0, 0.0, 1.0]]) + + q = self._q + dt2 = dt * dt + dt3 = dt2 * dt + dt4 = dt3 * dt + Q = (q * q) * np.array([ + [dt4 / 4.0, 0.0, dt3 / 2.0, 0.0 ], + [0.0, dt4 / 4.0, 0.0, dt3 / 2.0], + [dt3 / 2.0, 0.0, dt2, 0.0 ], + [0.0, dt3 / 2.0, 0.0, dt2 ], + ]) + + self._x = F @ self._x + self._P = F @ self._P @ F.T + Q + + def update(self, x_meas: float, y_meas: float, source: str = "uwb") -> None: + """ + Apply a position measurement (x_meas, y_meas). + + source : "uwb" or "camera" — selects the appropriate noise covariance. + """ + r = self._r_uwb if source == "uwb" else self._r_cam + R = np.diag([r * r, r * r]) + + z = np.array([x_meas, y_meas]) + innov = z - self._H @ self._x # innovation + S = self._H @ self._P @ self._H.T + R # innovation covariance + K = self._P @ self._H.T @ np.linalg.inv(S) # Kalman gain + self._x = self._x + K @ innov + self._P = (np.eye(4) - K @ self._H) @ self._P + + # ------------------------------------------------------------------ + # State accessors + # ------------------------------------------------------------------ + + @property + def position(self) -> tuple: + """Current filtered position (x, y) in metres.""" + return float(self._x[0]), float(self._x[1]) + + @property + def velocity(self) -> tuple: + """Current filtered velocity (vx, vy) in m/s.""" + return float(self._x[2]), float(self._x[3]) + + def position_uncertainty_m(self) -> float: + """RMS positional uncertainty (m) from diagonal of covariance.""" + return float(math.sqrt((self._P[0, 0] + self._P[1, 1]) / 2.0)) + + def covariance_copy(self) -> np.ndarray: + """Return a copy of the current 4×4 covariance matrix.""" + return self._P.copy() diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/source_arbiter.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/source_arbiter.py new file mode 100644 index 0000000..ddfc2f6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/source_arbiter.py @@ -0,0 +1,155 @@ +""" +source_arbiter.py — Source confidence scoring and selection for tracking fusion. + +Two sensor sources are supported: + UWB — geometry_msgs/PoseStamped from /uwb/target (triangulated, ~10 Hz) + Camera — geometry_msgs/PoseStamped from /person/target (depth+YOLO, ~30 Hz) + +Confidence model +---------------- +Each source's confidence is its raw measurement quality multiplied by a +linear staleness factor that drops to zero at its respective timeout: + + conf = quality * max(0, 1 - age / timeout) + +UWB quality is always 1.0 (the ranging hardware confidence is not exposed +by the driver in origin/main; the UWB node already applies Kalman filtering). + +Camera quality defaults to 1.0; callers may pass a lower value when the +detection confidence is available. + +Source selection +---------------- + Both fresh → "fused" (confidence-weighted position blend) + UWB only → "uwb" + Camera only → "camera" + Neither fresh → "predicted" (Kalman coasts) + +Pure module — no ROS2 dependency; fully unit-testable. +""" + +import math + + +def _staleness_factor(age_s: float, timeout_s: float) -> float: + """Linear decay: 1.0 at age=0, 0.0 at age=timeout, clamped.""" + if timeout_s <= 0.0: + return 0.0 + return max(0.0, 1.0 - age_s / timeout_s) + + +def uwb_confidence(age_s: float, timeout_s: float, quality: float = 1.0) -> float: + """ + UWB source confidence. + + age_s : seconds since last UWB measurement (≥0; use large value if never) + timeout_s: staleness threshold (s); confidence reaches 0 at this age + quality : inherent measurement quality [0, 1] (default 1.0) + """ + return quality * _staleness_factor(age_s, timeout_s) + + +def camera_confidence( + age_s: float, timeout_s: float, quality: float = 1.0 +) -> float: + """ + Camera source confidence. + + age_s : seconds since last camera detection (≥0; use large value if never) + timeout_s: staleness threshold (s) + quality : YOLO detection confidence or other quality score [0, 1] + """ + return quality * _staleness_factor(age_s, timeout_s) + + +def select_source( + uwb_conf: float, + cam_conf: float, + threshold: float = 0.15, +) -> str: + """ + Choose the active tracking source label. + + Returns one of: "fused", "uwb", "camera", "predicted". + + threshold: minimum confidence for a source to be considered live. + Sources below threshold are ignored. + """ + uwb_ok = uwb_conf >= threshold + cam_ok = cam_conf >= threshold + + if uwb_ok and cam_ok: + return "fused" + if uwb_ok: + return "uwb" + if cam_ok: + return "camera" + return "predicted" + + +def fuse_positions( + uwb_x: float, + uwb_y: float, + uwb_conf: float, + cam_x: float, + cam_y: float, + cam_conf: float, +) -> tuple: + """ + Confidence-weighted position fusion. + + Returns (fused_x, fused_y). + + When total confidence is zero (shouldn't happen in "fused" state, but + guarded), returns the UWB position as fallback. + """ + total = uwb_conf + cam_conf + if total <= 0.0: + return uwb_x, uwb_y + w = uwb_conf / total + return ( + w * uwb_x + (1.0 - w) * cam_x, + w * uwb_y + (1.0 - w) * cam_y, + ) + + +def composite_confidence( + uwb_conf: float, + cam_conf: float, + source: str, + kf_uncertainty_m: float, + max_kf_uncertainty_m: float = 3.0, +) -> float: + """ + Compute a single composite confidence value [0, 1] for the fused output. + + source : current source label (from select_source) + kf_uncertainty_m : current KF positional RMS uncertainty + max_kf_uncertainty_m: uncertainty at which confidence collapses to 0 + """ + if source == "predicted": + # Decay with growing KF uncertainty; no sensor feeds are live + raw = max(0.0, 1.0 - kf_uncertainty_m / max_kf_uncertainty_m) + return min(0.4, raw) # cap at 0.4 — caller should know this is dead-reckoning + + if source == "fused": + raw = max(uwb_conf, cam_conf) + elif source == "uwb": + raw = uwb_conf + else: # "camera" + raw = cam_conf + + # Scale by KF health (full confidence only if KF is tight) + kf_health = max(0.0, 1.0 - kf_uncertainty_m / max_kf_uncertainty_m) + return raw * (0.5 + 0.5 * kf_health) + + +def bearing_and_range(x: float, y: float) -> tuple: + """ + Compute bearing (rad, +ve = left) and range (m) to position (x, y). + + Consistent with person_follower_node conventions: + bearing = atan2(y, x) (base_link frame: +X forward, +Y left) + range = sqrt(x² + y²) + """ + return math.atan2(y, x), math.sqrt(x * x + y * y) diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/tracking_fusion_node.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/tracking_fusion_node.py new file mode 100644 index 0000000..a57954e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/tracking_fusion_node.py @@ -0,0 +1,257 @@ +""" +tracking_fusion_node.py — Multi-modal tracking fusion for saltybot. + +Subscribes +---------- + /uwb/target (geometry_msgs/PoseStamped) — UWB-triangulated position (~10 Hz) + /person/target (geometry_msgs/PoseStamped) — camera-detected position (~30 Hz) + +Publishes +--------- + /social/tracking/fused_target (saltybot_social_msgs/FusedTarget) at control_rate Hz + +Algorithm +--------- + 1. Each incoming measurement updates a 4-state Kalman filter [x, y, vx, vy]. + 2. A 20 Hz timer runs predict+select+publish: + a. KF predict(dt) + b. Compute per-source confidence from measurement age + staleness model + c. If either source is live: + - "fused" → confidence-weighted position blend → KF update + - "uwb" → UWB position → KF update + - "camera" → camera position → KF update + d. Build FusedTarget from KF state + composite confidence + 3. If all sources are lost but within predict_timeout, keep publishing with + active_source="predicted" and degrading confidence. + 4. Beyond predict_timeout, no message is published (node stays alive). + +Kalman tuning for EUC speeds (20–30 km/h ≈ 5.5–8.3 m/s) +--------------------------------------------------------- + process_noise=3.0 m/s² — allows rapid acceleration events + predict_timeout=3.0 s — coasts ≈30 m at 10 m/s; acceptable dead-reckoning + +Parameters +---------- + uwb_timeout : UWB staleness threshold (s) default 1.5 + cam_timeout : Camera staleness threshold (s) default 1.0 + predict_timeout : Max KF coast before no publish (s) default 3.0 + process_noise : KF acceleration noise std-dev (m/s²) default 3.0 + meas_noise_uwb : UWB position noise std-dev (m) default 0.20 + meas_noise_cam : Camera position noise std-dev (m) default 0.12 + control_rate : Publish / KF predict rate (Hz) default 20.0 + confidence_threshold: Min source confidence to use (0–1) default 0.15 + +Usage +----- + ros2 launch saltybot_social_tracking tracking.launch.py +""" + +import math +import time + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import PoseStamped +from std_msgs.msg import Header + +from saltybot_social_msgs.msg import FusedTarget +from saltybot_social_tracking.kalman_tracker import KalmanTracker +from saltybot_social_tracking.source_arbiter import ( + uwb_confidence, + camera_confidence, + select_source, + fuse_positions, + composite_confidence, + bearing_and_range, +) + +_BIG_AGE = 1e9 # sentinel: source never received + + +class TrackingFusionNode(Node): + + def __init__(self): + super().__init__("tracking_fusion") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("uwb_timeout", 1.5) + self.declare_parameter("cam_timeout", 1.0) + self.declare_parameter("predict_timeout", 3.0) + self.declare_parameter("process_noise", 3.0) + self.declare_parameter("meas_noise_uwb", 0.20) + self.declare_parameter("meas_noise_cam", 0.12) + self.declare_parameter("control_rate", 20.0) + self.declare_parameter("confidence_threshold", 0.15) + + self._p = self._load_params() + + # ── State ───────────────────────────────────────────────────────────── + self._kf = KalmanTracker( + process_noise=self._p["process_noise"], + meas_noise_uwb=self._p["meas_noise_uwb"], + meas_noise_cam=self._p["meas_noise_cam"], + ) + self._last_uwb_t: float = 0.0 # monotonic; 0 = never received + self._last_cam_t: float = 0.0 + self._uwb_x: float = 0.0 + self._uwb_y: float = 0.0 + self._cam_x: float = 0.0 + self._cam_y: float = 0.0 + self._uwb_tag_id: str = "" + self._last_predict_t: float = 0.0 # monotonic time of last predict call + self._last_any_t: float = 0.0 # monotonic time of last live measurement + + # ── Subscriptions ───────────────────────────────────────────────────── + self.create_subscription( + PoseStamped, "/uwb/target", self._uwb_cb, 10) + self.create_subscription( + PoseStamped, "/person/target", self._cam_cb, 10) + + # ── Publisher ───────────────────────────────────────────────────────── + self._pub = self.create_publisher(FusedTarget, "/social/tracking/fused_target", 10) + + # ── Timer ───────────────────────────────────────────────────────────── + self._timer = self.create_timer( + 1.0 / self._p["control_rate"], self._control_cb) + + self.get_logger().info( + f"TrackingFusion ready " + f"rate={self._p['control_rate']}Hz " + f"uwb_timeout={self._p['uwb_timeout']}s " + f"cam_timeout={self._p['cam_timeout']}s " + f"predict_timeout={self._p['predict_timeout']}s " + f"process_noise={self._p['process_noise']}m/s²" + ) + + # ── Parameter helpers ────────────────────────────────────────────────────── + + def _load_params(self) -> dict: + return { + "uwb_timeout": self.get_parameter("uwb_timeout").value, + "cam_timeout": self.get_parameter("cam_timeout").value, + "predict_timeout": self.get_parameter("predict_timeout").value, + "process_noise": self.get_parameter("process_noise").value, + "meas_noise_uwb": self.get_parameter("meas_noise_uwb").value, + "meas_noise_cam": self.get_parameter("meas_noise_cam").value, + "control_rate": self.get_parameter("control_rate").value, + "confidence_threshold": self.get_parameter("confidence_threshold").value, + } + + # ── Measurement callbacks ────────────────────────────────────────────────── + + def _uwb_cb(self, msg: PoseStamped) -> None: + self._uwb_x = msg.pose.position.x + self._uwb_y = msg.pose.position.y + self._uwb_tag_id = "" # PoseStamped has no tag field; tag reported via /uwb/bearing + t = time.monotonic() + self._last_uwb_t = t + self._last_any_t = t + + # Seed KF on first measurement + if not self._kf.initialized: + self._kf.initialize(self._uwb_x, self._uwb_y) + self._last_predict_t = t + + def _cam_cb(self, msg: PoseStamped) -> None: + self._cam_x = msg.pose.position.x + self._cam_y = msg.pose.position.y + t = time.monotonic() + self._last_cam_t = t + self._last_any_t = t + + if not self._kf.initialized: + self._kf.initialize(self._cam_x, self._cam_y) + self._last_predict_t = t + + # ── Control loop ─────────────────────────────────────────────────────────── + + def _control_cb(self) -> None: + self._p = self._load_params() + + if not self._kf.initialized: + return # no data yet — nothing to publish + + now = time.monotonic() + dt = now - self._last_predict_t if self._last_predict_t > 0.0 else ( + 1.0 / self._p["control_rate"] + ) + self._last_predict_t = now + + # KF predict + self._kf.predict(dt) + + # Source confidence + uwb_age = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else _BIG_AGE + cam_age = (now - self._last_cam_t) if self._last_cam_t > 0.0 else _BIG_AGE + + u_conf = uwb_confidence(uwb_age, self._p["uwb_timeout"]) + c_conf = camera_confidence(cam_age, self._p["cam_timeout"]) + + threshold = self._p["confidence_threshold"] + source = select_source(u_conf, c_conf, threshold) + + if source == "predicted": + # Check predict_timeout — stop publishing if too stale + last_live_age = ( + (now - self._last_any_t) if self._last_any_t > 0.0 else _BIG_AGE + ) + if last_live_age > self._p["predict_timeout"]: + return # silently stop publishing + + # Apply measurement update if a live source exists + if source == "fused": + fx, fy = fuse_positions( + self._uwb_x, self._uwb_y, u_conf, + self._cam_x, self._cam_y, c_conf, + ) + self._kf.update(fx, fy, source="uwb") # use tighter noise for blended + elif source == "uwb": + self._kf.update(self._uwb_x, self._uwb_y, source="uwb") + elif source == "camera": + self._kf.update(self._cam_x, self._cam_y, source="camera") + # "predicted" → no update; KF coasts + + # Build and publish message + kx, ky = self._kf.position + vx, vy = self._kf.velocity + kf_unc = self._kf.position_uncertainty_m() + conf = composite_confidence(u_conf, c_conf, source, kf_unc) + bearing, range_m = bearing_and_range(kx, ky) + + hdr = Header() + hdr.stamp = self.get_clock().now().to_msg() + hdr.frame_id = "base_link" + + msg = FusedTarget() + msg.header = hdr + msg.position.x = kx + msg.position.y = ky + msg.position.z = 0.0 + msg.velocity.x = vx + msg.velocity.y = vy + msg.velocity.z = 0.0 + msg.range_m = float(range_m) + msg.bearing_rad = float(bearing) + msg.confidence = float(conf) + msg.active_source = source + msg.tag_id = self._uwb_tag_id if "uwb" in source else "" + + self._pub.publish(msg) + + +# ── Entry point ──────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = TrackingFusionNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/setup.cfg b/jetson/ros2_ws/src/saltybot_social_tracking/setup.cfg new file mode 100644 index 0000000..55fb3ba --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_social_tracking + +[install] +install_scripts=$base/lib/saltybot_social_tracking diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/setup.py b/jetson/ros2_ws/src/saltybot_social_tracking/setup.py new file mode 100644 index 0000000..41b594a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages +import os +from glob import glob + +package_name = "saltybot_social_tracking" + +setup( + name=package_name, + version="0.1.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", + [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (os.path.join("share", package_name, "config"), + glob("config/*.yaml")), + (os.path.join("share", package_name, "launch"), + glob("launch/*.py")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="Multi-modal tracking fusion (UWB + camera Kalman filter)", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + f"tracking_fusion_node = {package_name}.tracking_fusion_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/test/test_tracking_fusion.py b/jetson/ros2_ws/src/saltybot_social_tracking/test/test_tracking_fusion.py new file mode 100644 index 0000000..06ca258 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_tracking/test/test_tracking_fusion.py @@ -0,0 +1,438 @@ +""" +test_tracking_fusion.py — Unit tests for saltybot_social_tracking pure modules. + +Tests cover: + - KalmanTracker: initialization, predict, update, state accessors + - source_arbiter: confidence functions, source selection, fusion, bearing + +No ROS2 runtime required. +""" + +import math +import sys +import os + +# Allow running: python -m pytest test/test_tracking_fusion.py +# from the package root without installing the package. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import pytest +import numpy as np + +from saltybot_social_tracking.kalman_tracker import KalmanTracker +from saltybot_social_tracking.source_arbiter import ( + uwb_confidence, + camera_confidence, + select_source, + fuse_positions, + composite_confidence, + bearing_and_range, +) + + +# ───────────────────────────────────────────────────────────────────────────── +# KalmanTracker tests +# ───────────────────────────────────────────────────────────────────────────── + +class TestKalmanTrackerInit: + + def test_not_initialized_by_default(self): + kf = KalmanTracker() + assert not kf.initialized + + def test_initialize_sets_position(self): + kf = KalmanTracker() + kf.initialize(3.0, 1.5) + assert kf.initialized + x, y = kf.position + assert abs(x - 3.0) < 1e-9 + assert abs(y - 1.5) < 1e-9 + + def test_initialize_sets_zero_velocity(self): + kf = KalmanTracker() + kf.initialize(1.0, -2.0) + vx, vy = kf.velocity + assert abs(vx) < 1e-9 + assert abs(vy) < 1e-9 + + def test_initialize_origin(self): + kf = KalmanTracker() + kf.initialize(0.0, 0.0) + assert kf.initialized + x, y = kf.position + assert x == 0.0 and y == 0.0 + + +class TestKalmanTrackerPredict: + + def test_predict_zero_dt_no_change(self): + kf = KalmanTracker() + kf.initialize(2.0, 1.0) + kf.predict(0.0) + x, y = kf.position + assert abs(x - 2.0) < 1e-9 + assert abs(y - 1.0) < 1e-9 + + def test_predict_negative_dt_no_change(self): + kf = KalmanTracker() + kf.initialize(2.0, 1.0) + kf.predict(-0.1) + x, y = kf.position + assert abs(x - 2.0) < 1e-9 + + def test_predict_constant_velocity(self): + """After a position update gives the filter a velocity, predict should extrapolate.""" + kf = KalmanTracker(process_noise=0.001, meas_noise_uwb=0.001) + kf.initialize(0.0, 0.0) + # Force filter to track a moving target to build up velocity estimate + dt = 0.05 + for i in range(40): + t = i * dt + kf.predict(dt) + kf.update(2.0 * t, 0.0, "uwb") # 2 m/s in x + + # After many updates the velocity estimate should be close to 2 m/s + vx, vy = kf.velocity + assert abs(vx - 2.0) < 0.3, f"vx={vx:.3f}" + assert abs(vy) < 0.2 + + def test_predict_grows_uncertainty(self): + kf = KalmanTracker() + kf.initialize(0.0, 0.0) + unc_before = kf.position_uncertainty_m() + kf.predict(1.0) + unc_after = kf.position_uncertainty_m() + assert unc_after > unc_before + + def test_predict_multiple_steps(self): + kf = KalmanTracker() + kf.initialize(0.0, 0.0) + kf.predict(0.1) + kf.predict(0.1) + kf.predict(0.1) + # No assertion on exact value; just verify no exception and state is finite + x, y = kf.position + assert math.isfinite(x) and math.isfinite(y) + + +class TestKalmanTrackerUpdate: + + def test_update_pulls_position_toward_measurement(self): + kf = KalmanTracker() + kf.initialize(0.0, 0.0) + kf.update(5.0, 5.0, "uwb") + x, y = kf.position + assert x > 0.0 and y > 0.0 + assert x < 5.0 and y < 5.0 # blended, not jumped + + def test_update_reduces_uncertainty(self): + kf = KalmanTracker() + kf.initialize(0.0, 0.0) + kf.predict(1.0) # uncertainty grows + unc_mid = kf.position_uncertainty_m() + kf.update(0.1, 0.1, "uwb") # measurement corrects + unc_after = kf.position_uncertainty_m() + assert unc_after < unc_mid + + def test_update_converges_to_true_position(self): + """Many updates from same point should converge to that point.""" + kf = KalmanTracker(meas_noise_uwb=0.01) + kf.initialize(0.0, 0.0) + for _ in range(50): + kf.update(3.0, -1.0, "uwb") + x, y = kf.position + assert abs(x - 3.0) < 0.05, f"x={x:.4f}" + assert abs(y - (-1.0)) < 0.05, f"y={y:.4f}" + + def test_update_camera_source_different_noise(self): + """Camera and UWB updates should both move state (noise model differs).""" + kf1 = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.10) + kf1.initialize(0.0, 0.0) + kf1.update(5.0, 0.0, "uwb") + x_uwb, _ = kf1.position + + kf2 = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.10) + kf2.initialize(0.0, 0.0) + kf2.update(5.0, 0.0, "camera") + x_cam, _ = kf2.position + + # Camera has lower noise → stronger pull toward measurement + assert x_cam > x_uwb + + def test_update_unknown_source_defaults_to_camera_noise(self): + kf = KalmanTracker() + kf.initialize(0.0, 0.0) + kf.update(2.0, 0.0, "other") # unknown source — should not raise + x, _ = kf.position + assert x > 0.0 + + def test_position_uncertainty_finite(self): + kf = KalmanTracker() + kf.initialize(1.0, 1.0) + kf.predict(0.05) + kf.update(1.1, 0.9, "uwb") + assert math.isfinite(kf.position_uncertainty_m()) + assert kf.position_uncertainty_m() >= 0.0 + + def test_covariance_copy_is_independent(self): + kf = KalmanTracker() + kf.initialize(0.0, 0.0) + cov = kf.covariance_copy() + cov[0, 0] = 9999.0 # mutate copy + assert kf.covariance_copy()[0, 0] != 9999.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# source_arbiter tests +# ───────────────────────────────────────────────────────────────────────────── + +class TestUwbConfidence: + + def test_zero_age_returns_quality(self): + assert abs(uwb_confidence(0.0, 1.5) - 1.0) < 1e-9 + + def test_at_timeout_returns_zero(self): + assert uwb_confidence(1.5, 1.5) == pytest.approx(0.0) + + def test_beyond_timeout_returns_zero(self): + assert uwb_confidence(2.0, 1.5) == 0.0 + + def test_half_timeout_returns_half(self): + assert uwb_confidence(0.75, 1.5) == pytest.approx(0.5) + + def test_quality_scales_result(self): + assert uwb_confidence(0.0, 1.5, quality=0.7) == pytest.approx(0.7) + + def test_large_age_returns_zero(self): + assert uwb_confidence(1e9, 1.5) == 0.0 + + def test_zero_timeout_returns_zero(self): + assert uwb_confidence(0.0, 0.0) == 0.0 + + +class TestCameraConfidence: + + def test_zero_age_full_quality(self): + assert camera_confidence(0.0, 1.0, quality=1.0) == pytest.approx(1.0) + + def test_at_timeout_zero(self): + assert camera_confidence(1.0, 1.0) == pytest.approx(0.0) + + def test_beyond_timeout_zero(self): + assert camera_confidence(2.0, 1.0) == 0.0 + + def test_quality_scales(self): + # age=0, quality=0.8 + assert camera_confidence(0.0, 1.0, quality=0.8) == pytest.approx(0.8) + + def test_halfway(self): + assert camera_confidence(0.5, 1.0) == pytest.approx(0.5) + + +class TestSelectSource: + + def test_both_above_threshold_fused(self): + assert select_source(0.8, 0.6) == "fused" + + def test_only_uwb_above_threshold(self): + assert select_source(0.8, 0.0) == "uwb" + + def test_only_cam_above_threshold(self): + assert select_source(0.0, 0.7) == "camera" + + def test_both_below_threshold(self): + assert select_source(0.0, 0.0) == "predicted" + + def test_threshold_boundary_uwb(self): + # Exactly at threshold — should be treated as live + assert select_source(0.15, 0.0, threshold=0.15) == "uwb" + + def test_threshold_boundary_below(self): + assert select_source(0.14, 0.0, threshold=0.15) == "predicted" + + def test_custom_threshold(self): + assert select_source(0.5, 0.0, threshold=0.6) == "predicted" + assert select_source(0.5, 0.0, threshold=0.4) == "uwb" + + +class TestFusePositions: + + def test_equal_confidence_returns_midpoint(self): + x, y = fuse_positions(0.0, 0.0, 1.0, 4.0, 4.0, 1.0) + assert abs(x - 2.0) < 1e-9 + assert abs(y - 2.0) < 1e-9 + + def test_full_uwb_weight_returns_uwb(self): + x, y = fuse_positions(3.0, 1.0, 1.0, 0.0, 0.0, 0.0) + assert abs(x - 3.0) < 1e-9 + + def test_full_cam_weight_returns_cam(self): + x, y = fuse_positions(0.0, 0.0, 0.0, -2.0, 5.0, 1.0) + assert abs(x - (-2.0)) < 1e-9 + assert abs(y - 5.0) < 1e-9 + + def test_weighted_blend(self): + # UWB at (0,0) conf=3, camera at (4,0) conf=1 → x = 3/4*0 + 1/4*4 = 1 + x, y = fuse_positions(0.0, 0.0, 3.0, 4.0, 0.0, 1.0) + assert abs(x - 1.0) < 1e-9 + + def test_zero_total_returns_uwb_fallback(self): + x, y = fuse_positions(7.0, 2.0, 0.0, 3.0, 1.0, 0.0) + assert abs(x - 7.0) < 1e-9 + + +class TestCompositeConfidence: + + def test_fused_source_high_confidence(self): + conf = composite_confidence(0.9, 0.8, "fused", 0.05) + assert conf > 0.7 + + def test_predicted_source_capped(self): + conf = composite_confidence(0.0, 0.0, "predicted", 0.1) + assert conf <= 0.4 + + def test_predicted_high_uncertainty_low_confidence(self): + conf = composite_confidence(0.0, 0.0, "predicted", 3.0, max_kf_uncertainty_m=3.0) + assert conf == pytest.approx(0.0) + + def test_uwb_only(self): + conf = composite_confidence(0.8, 0.0, "uwb", 0.05) + assert conf > 0.3 + + def test_camera_only(self): + conf = composite_confidence(0.0, 0.7, "camera", 0.05) + assert conf > 0.2 + + def test_high_kf_uncertainty_reduces_confidence(self): + low_unc = composite_confidence(0.9, 0.0, "uwb", 0.1) + high_unc = composite_confidence(0.9, 0.0, "uwb", 2.9) + assert low_unc > high_unc + + +class TestBearingAndRange: + + def test_straight_ahead(self): + bearing, rng = bearing_and_range(2.0, 0.0) + assert abs(bearing) < 1e-9 + assert abs(rng - 2.0) < 1e-9 + + def test_left_of_robot(self): + # +Y = left in base_link frame; bearing should be positive + bearing, rng = bearing_and_range(0.0, 1.0) + assert abs(bearing - math.pi / 2.0) < 1e-9 + assert abs(rng - 1.0) < 1e-9 + + def test_right_of_robot(self): + bearing, rng = bearing_and_range(0.0, -1.0) + assert abs(bearing - (-math.pi / 2.0)) < 1e-9 + + def test_diagonal(self): + bearing, rng = bearing_and_range(1.0, 1.0) + assert abs(bearing - math.pi / 4.0) < 1e-9 + assert abs(rng - math.sqrt(2.0)) < 1e-9 + + def test_at_origin(self): + bearing, rng = bearing_and_range(0.0, 0.0) + assert rng == pytest.approx(0.0) + assert math.isfinite(bearing) # atan2(0,0) = 0 in most implementations + + def test_range_always_non_negative(self): + for x, y in [(-1, 0), (0, -1), (-2, -3), (5, -5)]: + _, rng = bearing_and_range(x, y) + assert rng >= 0.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# Integration scenario tests +# ───────────────────────────────────────────────────────────────────────────── + +class TestIntegrationScenarios: + + def test_euc_speed_velocity_tracking(self): + """Verify KF can track EUC speed (8 m/s) within 0.5 m/s after warm-up.""" + kf = KalmanTracker(process_noise=3.0, meas_noise_uwb=0.20) + kf.initialize(0.0, 0.0) + dt = 1.0 / 10.0 # 10 Hz UWB rate + speed = 8.0 # m/s (≈29 km/h) + for i in range(60): + t = i * dt + kf.predict(dt) + kf.update(speed * t, 0.0, "uwb") + vx, vy = kf.velocity + assert abs(vx - speed) < 0.6, f"vx={vx:.2f} expected≈{speed}" + assert abs(vy) < 0.3 + + def test_signal_loss_recovery(self): + """ + After 1 s of signal loss the filter should still have a reasonable + position estimate (not diverged to infinity). + """ + kf = KalmanTracker(process_noise=3.0) + kf.initialize(2.0, 0.5) + # Warm up with 2 m/s x motion + dt = 0.05 + for i in range(20): + kf.predict(dt) + kf.update(2.0 * (i + 1) * dt, 0.0, "uwb") + # Coast for 1 second (20 × 50 ms) without measurements + for _ in range(20): + kf.predict(dt) + x, y = kf.position + assert math.isfinite(x) and math.isfinite(y) + assert abs(x) < 20.0 # shouldn't have drifted more than 20 m + + def test_uwb_to_camera_handoff(self): + """ + Simulate UWB going stale and camera taking over — Kalman should + smoothly continue tracking without a jump. + """ + kf = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.12) + kf.initialize(0.0, 0.0) + dt = 0.05 + # Phase 1: UWB active + for i in range(20): + kf.predict(dt) + kf.update(float(i) * 0.1, 0.0, "uwb") + x_at_handoff, _ = kf.position + + # Phase 2: Camera takes over from same trajectory + for i in range(20, 40): + kf.predict(dt) + kf.update(float(i) * 0.1, 0.0, "camera") + x_after, _ = kf.position + + # Position should have continued progressing (not stuck or reset) + assert x_after > x_at_handoff + + def test_confidence_degradation_during_coast(self): + """Composite confidence should drop as KF uncertainty grows during coast.""" + kf = KalmanTracker(process_noise=3.0) + kf.initialize(2.0, 0.0) + + # Fresh: tight uncertainty → high confidence + unc_fresh = kf.position_uncertainty_m() + conf_fresh = composite_confidence(0.0, 0.0, "predicted", unc_fresh) + + # After 2 s coast + for _ in range(40): + kf.predict(0.05) + unc_stale = kf.position_uncertainty_m() + conf_stale = composite_confidence(0.0, 0.0, "predicted", unc_stale) + + assert conf_fresh >= conf_stale + + def test_fused_source_confidence_weighted_position(self): + """Fused position should sit between UWB and camera, closer to higher-conf source.""" + # UWB at x=0 with high conf, camera at x=10 with low conf + uwb_c = 0.9 + cam_c = 0.1 + fx, fy = fuse_positions(0.0, 0.0, uwb_c, 10.0, 0.0, cam_c) + # Should be much closer to UWB (0) than camera (10) + assert fx < 3.0, f"fused_x={fx:.2f}" + + def test_select_source_transitions(self): + """Verify correct source transitions as confidences change.""" + assert select_source(0.9, 0.8) == "fused" + assert select_source(0.9, 0.0) == "uwb" + assert select_source(0.0, 0.8) == "camera" + assert select_source(0.0, 0.0) == "predicted"