diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt
new file mode 100644
index 0000000..ddafa7a
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt
@@ -0,0 +1,15 @@
+cmake_minimum_required(VERSION 3.8)
+project(saltybot_social_msgs)
+
+find_package(ament_cmake REQUIRED)
+find_package(rosidl_default_generators REQUIRED)
+find_package(std_msgs REQUIRED)
+find_package(geometry_msgs REQUIRED)
+
+rosidl_generate_interfaces(${PROJECT_NAME}
+ "msg/FusedTarget.msg"
+ DEPENDENCIES std_msgs geometry_msgs
+)
+
+ament_export_dependencies(rosidl_default_runtime)
+ament_package()
diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/msg/FusedTarget.msg b/jetson/ros2_ws/src/saltybot_social_msgs/msg/FusedTarget.msg
new file mode 100644
index 0000000..1211416
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_msgs/msg/FusedTarget.msg
@@ -0,0 +1,19 @@
+# FusedTarget.msg — output of the multi-modal tracking fusion node.
+#
+# Position and velocity are in the base_link frame (robot-centred,
+# +X forward, +Y left). z components are always 0.0 for ground-plane tracking.
+#
+# Confidence: 0.0 = no data / fully predicted; 1.0 = strong fused measurement.
+# active_source: "fused" | "uwb" | "camera" | "predicted"
+
+std_msgs/Header header
+
+geometry_msgs/Point position # filtered 2-D position (m), z=0
+geometry_msgs/Vector3 velocity # filtered 2-D velocity (m/s), z=0
+
+float32 range_m # Euclidean distance from robot to fused position
+float32 bearing_rad # bearing in base_link (+ve = person to the left)
+float32 confidence # composite confidence [0.0, 1.0]
+
+string active_source # "fused" | "uwb" | "camera" | "predicted"
+string tag_id # UWB tag address (empty when UWB not contributing)
diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/package.xml b/jetson/ros2_ws/src/saltybot_social_msgs/package.xml
new file mode 100644
index 0000000..debb833
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_msgs/package.xml
@@ -0,0 +1,26 @@
+
+
+
+ saltybot_social_msgs
+ 0.1.0
+
+ Custom ROS2 message and service definitions for the saltybot social subsystem.
+ Includes FusedTarget (multi-modal tracking fusion output).
+
+ sl-controls
+ MIT
+
+ ament_cmake
+ rosidl_default_generators
+
+ std_msgs
+ geometry_msgs
+
+ rosidl_default_runtime
+
+ rosidl_interface_packages
+
+
+ ament_cmake
+
+
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/.gitignore b/jetson/ros2_ws/src/saltybot_social_tracking/.gitignore
new file mode 100644
index 0000000..bf24d64
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/.gitignore
@@ -0,0 +1,5 @@
+__pycache__/
+*.pyc
+*.pyo
+*.egg-info/
+.pytest_cache/
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/config/tracking_params.yaml b/jetson/ros2_ws/src/saltybot_social_tracking/config/tracking_params.yaml
new file mode 100644
index 0000000..152764a
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/config/tracking_params.yaml
@@ -0,0 +1,48 @@
+# tracking_params.yaml — saltybot_social_tracking / TrackingFusionNode
+#
+# Run with:
+# ros2 launch saltybot_social_tracking tracking.launch.py
+#
+# Topics consumed:
+# /uwb/target (geometry_msgs/PoseStamped) — UWB triangulated position
+# /person/target (geometry_msgs/PoseStamped) — camera-detected position
+#
+# Topic produced:
+# /social/tracking/fused_target (saltybot_social_msgs/FusedTarget)
+
+# ── Source staleness timeouts ──────────────────────────────────────────────────
+# UWB driver publishes at ~10 Hz; 1.5 s = 15 missed cycles before declared stale.
+uwb_timeout: 1.5 # seconds
+
+# Camera detector publishes at ~30 Hz; 1.0 s = 30 missed frames before stale.
+cam_timeout: 1.0 # seconds
+
+# How long the Kalman filter may coast (dead-reckoning) with no live source
+# before the node stops publishing.
+# At 10 m/s (EUC top-speed) the robot drifts ≈30 m over 3 s — beyond the UWB
+# follow-range, so 3 s is a reasonable hard stop.
+predict_timeout: 3.0 # seconds
+
+# ── Kalman filter tuning ───────────────────────────────────────────────────────
+# process_noise: acceleration noise std-dev (m/s²).
+# EUC riders can brake or accelerate at ~3–5 m/s²; 3.0 is a good starting point.
+# Increase if the filtered track lags behind fast direction changes.
+# Decrease if the track is noisy.
+process_noise: 3.0 # m/s²
+
+# UWB position measurement noise (std-dev, metres).
+# DW3000 TWR accuracy ≈ ±10–20 cm; 0.20 accounts for system-level error.
+meas_noise_uwb: 0.20 # m
+
+# Camera position noise (std-dev, metres).
+# Depth reprojection error with RealSense D435i at 1–3 m ≈ ±5–15 cm.
+meas_noise_cam: 0.12 # m
+
+# ── Control loop ──────────────────────────────────────────────────────────────
+control_rate: 20.0 # Hz — KF predict + publish rate
+
+# ── Source arbiter ────────────────────────────────────────────────────────────
+# Minimum normalised confidence for a source to be considered live.
+# Range [0, 1]; lower = more permissive; default 0.15 keeps slightly stale
+# sources active rather than dropping to "predicted" prematurely.
+confidence_threshold: 0.15
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/launch/tracking.launch.py b/jetson/ros2_ws/src/saltybot_social_tracking/launch/tracking.launch.py
new file mode 100644
index 0000000..af923e6
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/launch/tracking.launch.py
@@ -0,0 +1,44 @@
+"""tracking.launch.py — launch the TrackingFusionNode with default params."""
+
+import os
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg_share = get_package_share_directory("saltybot_social_tracking")
+ default_params = os.path.join(pkg_share, "config", "tracking_params.yaml")
+
+ return LaunchDescription([
+ DeclareLaunchArgument(
+ "params_file",
+ default_value=default_params,
+ description="Path to tracking fusion parameter YAML file",
+ ),
+ DeclareLaunchArgument(
+ "control_rate",
+ default_value="20.0",
+ description="KF predict + publish rate (Hz)",
+ ),
+ DeclareLaunchArgument(
+ "predict_timeout",
+ default_value="3.0",
+ description="Max KF coast time before stopping publish (s)",
+ ),
+ Node(
+ package="saltybot_social_tracking",
+ executable="tracking_fusion_node",
+ name="tracking_fusion",
+ output="screen",
+ parameters=[
+ LaunchConfiguration("params_file"),
+ {
+ "control_rate": LaunchConfiguration("control_rate"),
+ "predict_timeout": LaunchConfiguration("predict_timeout"),
+ },
+ ],
+ ),
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/package.xml b/jetson/ros2_ws/src/saltybot_social_tracking/package.xml
new file mode 100644
index 0000000..760677b
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/package.xml
@@ -0,0 +1,31 @@
+
+
+
+ saltybot_social_tracking
+ 0.1.0
+
+ Multi-modal tracking fusion for saltybot.
+ Fuses UWB triangulated position (/uwb/target) and camera-detected position
+ (/person/target) using a 4-state Kalman filter to produce a smooth, low-latency
+ fused estimate at /social/tracking/fused_target.
+ Handles EUC rider speeds (20-30 km/h), signal handoff, and predictive coasting.
+
+ sl-controls
+ MIT
+
+ rclpy
+ geometry_msgs
+ std_msgs
+ saltybot_social_msgs
+
+ ament_python
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/resource/saltybot_social_tracking b/jetson/ros2_ws/src/saltybot_social_tracking/resource/saltybot_social_tracking
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/__init__.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/kalman_tracker.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/kalman_tracker.py
new file mode 100644
index 0000000..5ac574d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/kalman_tracker.py
@@ -0,0 +1,134 @@
+"""
+kalman_tracker.py — 4-state linear Kalman filter for 2-D position+velocity tracking.
+
+State vector: [x, y, vx, vy]
+Observation: [x_meas, y_meas]
+
+Process model: constant velocity with Wiener process acceleration noise.
+Tuned to handle EUC rider speeds (20–30 km/h ≈ 5.5–8.3 m/s) with fast
+acceleration transients.
+
+Pure module — no ROS2 dependency; fully unit-testable.
+"""
+
+import math
+import numpy as np
+
+
+class KalmanTracker:
+ """
+ 4-state Kalman filter: state = [x, y, vx, vy].
+
+ Parameters
+ ----------
+ process_noise : acceleration noise standard deviation (m/s²).
+ Higher values allow the filter to track rapid velocity
+ changes (EUC acceleration events). Default 3.0 m/s².
+ meas_noise_uwb : UWB position measurement noise std-dev (m). Default 0.20 m.
+ meas_noise_cam : Camera position measurement noise std-dev (m). Default 0.12 m.
+ """
+
+ def __init__(
+ self,
+ process_noise: float = 3.0,
+ meas_noise_uwb: float = 0.20,
+ meas_noise_cam: float = 0.12,
+ ):
+ self._q = float(process_noise)
+ self._r_uwb = float(meas_noise_uwb)
+ self._r_cam = float(meas_noise_cam)
+
+ # State [x, y, vx, vy]
+ self._x = np.zeros(4)
+
+ # Covariance — large initial uncertainty (10 m², 10 (m/s)²)
+ self._P = np.diag([10.0, 10.0, 10.0, 10.0])
+
+ # Observation matrix: H * x = [x, y]
+ self._H = np.array([[1.0, 0.0, 0.0, 0.0],
+ [0.0, 1.0, 0.0, 0.0]])
+
+ self._initialized = False
+
+ # ------------------------------------------------------------------
+ # Public API
+ # ------------------------------------------------------------------
+
+ @property
+ def initialized(self) -> bool:
+ return self._initialized
+
+ def initialize(self, x: float, y: float) -> None:
+ """Seed the filter at position (x, y) with zero velocity."""
+ self._x = np.array([x, y, 0.0, 0.0])
+ self._P = np.diag([1.0, 1.0, 5.0, 5.0])
+ self._initialized = True
+
+ def predict(self, dt: float) -> None:
+ """
+ Advance the filter state by dt seconds.
+
+ Uses a discrete Wiener process acceleration model so that positional
+ uncertainty grows as O(dt^4/4) and velocity uncertainty as O(dt^2).
+ This lets the filter coast accurately through short signal outages
+ while still being responsive to EUC velocity changes.
+ """
+ if dt <= 0.0:
+ return
+
+ F = np.array([[1.0, 0.0, dt, 0.0],
+ [0.0, 1.0, 0.0, dt],
+ [0.0, 0.0, 1.0, 0.0],
+ [0.0, 0.0, 0.0, 1.0]])
+
+ q = self._q
+ dt2 = dt * dt
+ dt3 = dt2 * dt
+ dt4 = dt3 * dt
+ Q = (q * q) * np.array([
+ [dt4 / 4.0, 0.0, dt3 / 2.0, 0.0 ],
+ [0.0, dt4 / 4.0, 0.0, dt3 / 2.0],
+ [dt3 / 2.0, 0.0, dt2, 0.0 ],
+ [0.0, dt3 / 2.0, 0.0, dt2 ],
+ ])
+
+ self._x = F @ self._x
+ self._P = F @ self._P @ F.T + Q
+
+ def update(self, x_meas: float, y_meas: float, source: str = "uwb") -> None:
+ """
+ Apply a position measurement (x_meas, y_meas).
+
+ source : "uwb" or "camera" — selects the appropriate noise covariance.
+ """
+ r = self._r_uwb if source == "uwb" else self._r_cam
+ R = np.diag([r * r, r * r])
+
+ z = np.array([x_meas, y_meas])
+ innov = z - self._H @ self._x # innovation
+ S = self._H @ self._P @ self._H.T + R # innovation covariance
+ K = self._P @ self._H.T @ np.linalg.inv(S) # Kalman gain
+ self._x = self._x + K @ innov
+ self._P = (np.eye(4) - K @ self._H) @ self._P
+
+ # ------------------------------------------------------------------
+ # State accessors
+ # ------------------------------------------------------------------
+
+ @property
+ def position(self) -> tuple:
+ """Current filtered position (x, y) in metres."""
+ return float(self._x[0]), float(self._x[1])
+
+ @property
+ def velocity(self) -> tuple:
+ """Current filtered velocity (vx, vy) in m/s."""
+ return float(self._x[2]), float(self._x[3])
+
+ def position_uncertainty_m(self) -> float:
+ """RMS positional uncertainty (m) from diagonal of covariance."""
+ return float(math.sqrt((self._P[0, 0] + self._P[1, 1]) / 2.0))
+
+ def covariance_copy(self) -> np.ndarray:
+ """Return a copy of the current 4×4 covariance matrix."""
+ return self._P.copy()
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/source_arbiter.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/source_arbiter.py
new file mode 100644
index 0000000..ddfc2f6
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/source_arbiter.py
@@ -0,0 +1,155 @@
+"""
+source_arbiter.py — Source confidence scoring and selection for tracking fusion.
+
+Two sensor sources are supported:
+ UWB — geometry_msgs/PoseStamped from /uwb/target (triangulated, ~10 Hz)
+ Camera — geometry_msgs/PoseStamped from /person/target (depth+YOLO, ~30 Hz)
+
+Confidence model
+----------------
+Each source's confidence is its raw measurement quality multiplied by a
+linear staleness factor that drops to zero at its respective timeout:
+
+ conf = quality * max(0, 1 - age / timeout)
+
+UWB quality is always 1.0 (the ranging hardware confidence is not exposed
+by the driver in origin/main; the UWB node already applies Kalman filtering).
+
+Camera quality defaults to 1.0; callers may pass a lower value when the
+detection confidence is available.
+
+Source selection
+----------------
+ Both fresh → "fused" (confidence-weighted position blend)
+ UWB only → "uwb"
+ Camera only → "camera"
+ Neither fresh → "predicted" (Kalman coasts)
+
+Pure module — no ROS2 dependency; fully unit-testable.
+"""
+
+import math
+
+
+def _staleness_factor(age_s: float, timeout_s: float) -> float:
+ """Linear decay: 1.0 at age=0, 0.0 at age=timeout, clamped."""
+ if timeout_s <= 0.0:
+ return 0.0
+ return max(0.0, 1.0 - age_s / timeout_s)
+
+
+def uwb_confidence(age_s: float, timeout_s: float, quality: float = 1.0) -> float:
+ """
+ UWB source confidence.
+
+ age_s : seconds since last UWB measurement (≥0; use large value if never)
+ timeout_s: staleness threshold (s); confidence reaches 0 at this age
+ quality : inherent measurement quality [0, 1] (default 1.0)
+ """
+ return quality * _staleness_factor(age_s, timeout_s)
+
+
+def camera_confidence(
+ age_s: float, timeout_s: float, quality: float = 1.0
+) -> float:
+ """
+ Camera source confidence.
+
+ age_s : seconds since last camera detection (≥0; use large value if never)
+ timeout_s: staleness threshold (s)
+ quality : YOLO detection confidence or other quality score [0, 1]
+ """
+ return quality * _staleness_factor(age_s, timeout_s)
+
+
+def select_source(
+ uwb_conf: float,
+ cam_conf: float,
+ threshold: float = 0.15,
+) -> str:
+ """
+ Choose the active tracking source label.
+
+ Returns one of: "fused", "uwb", "camera", "predicted".
+
+ threshold: minimum confidence for a source to be considered live.
+ Sources below threshold are ignored.
+ """
+ uwb_ok = uwb_conf >= threshold
+ cam_ok = cam_conf >= threshold
+
+ if uwb_ok and cam_ok:
+ return "fused"
+ if uwb_ok:
+ return "uwb"
+ if cam_ok:
+ return "camera"
+ return "predicted"
+
+
+def fuse_positions(
+ uwb_x: float,
+ uwb_y: float,
+ uwb_conf: float,
+ cam_x: float,
+ cam_y: float,
+ cam_conf: float,
+) -> tuple:
+ """
+ Confidence-weighted position fusion.
+
+ Returns (fused_x, fused_y).
+
+ When total confidence is zero (shouldn't happen in "fused" state, but
+ guarded), returns the UWB position as fallback.
+ """
+ total = uwb_conf + cam_conf
+ if total <= 0.0:
+ return uwb_x, uwb_y
+ w = uwb_conf / total
+ return (
+ w * uwb_x + (1.0 - w) * cam_x,
+ w * uwb_y + (1.0 - w) * cam_y,
+ )
+
+
+def composite_confidence(
+ uwb_conf: float,
+ cam_conf: float,
+ source: str,
+ kf_uncertainty_m: float,
+ max_kf_uncertainty_m: float = 3.0,
+) -> float:
+ """
+ Compute a single composite confidence value [0, 1] for the fused output.
+
+ source : current source label (from select_source)
+ kf_uncertainty_m : current KF positional RMS uncertainty
+ max_kf_uncertainty_m: uncertainty at which confidence collapses to 0
+ """
+ if source == "predicted":
+ # Decay with growing KF uncertainty; no sensor feeds are live
+ raw = max(0.0, 1.0 - kf_uncertainty_m / max_kf_uncertainty_m)
+ return min(0.4, raw) # cap at 0.4 — caller should know this is dead-reckoning
+
+ if source == "fused":
+ raw = max(uwb_conf, cam_conf)
+ elif source == "uwb":
+ raw = uwb_conf
+ else: # "camera"
+ raw = cam_conf
+
+ # Scale by KF health (full confidence only if KF is tight)
+ kf_health = max(0.0, 1.0 - kf_uncertainty_m / max_kf_uncertainty_m)
+ return raw * (0.5 + 0.5 * kf_health)
+
+
+def bearing_and_range(x: float, y: float) -> tuple:
+ """
+ Compute bearing (rad, +ve = left) and range (m) to position (x, y).
+
+ Consistent with person_follower_node conventions:
+ bearing = atan2(y, x) (base_link frame: +X forward, +Y left)
+ range = sqrt(x² + y²)
+ """
+ return math.atan2(y, x), math.sqrt(x * x + y * y)
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/tracking_fusion_node.py b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/tracking_fusion_node.py
new file mode 100644
index 0000000..a57954e
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/saltybot_social_tracking/tracking_fusion_node.py
@@ -0,0 +1,257 @@
+"""
+tracking_fusion_node.py — Multi-modal tracking fusion for saltybot.
+
+Subscribes
+----------
+ /uwb/target (geometry_msgs/PoseStamped) — UWB-triangulated position (~10 Hz)
+ /person/target (geometry_msgs/PoseStamped) — camera-detected position (~30 Hz)
+
+Publishes
+---------
+ /social/tracking/fused_target (saltybot_social_msgs/FusedTarget) at control_rate Hz
+
+Algorithm
+---------
+ 1. Each incoming measurement updates a 4-state Kalman filter [x, y, vx, vy].
+ 2. A 20 Hz timer runs predict+select+publish:
+ a. KF predict(dt)
+ b. Compute per-source confidence from measurement age + staleness model
+ c. If either source is live:
+ - "fused" → confidence-weighted position blend → KF update
+ - "uwb" → UWB position → KF update
+ - "camera" → camera position → KF update
+ d. Build FusedTarget from KF state + composite confidence
+ 3. If all sources are lost but within predict_timeout, keep publishing with
+ active_source="predicted" and degrading confidence.
+ 4. Beyond predict_timeout, no message is published (node stays alive).
+
+Kalman tuning for EUC speeds (20–30 km/h ≈ 5.5–8.3 m/s)
+---------------------------------------------------------
+ process_noise=3.0 m/s² — allows rapid acceleration events
+ predict_timeout=3.0 s — coasts ≈30 m at 10 m/s; acceptable dead-reckoning
+
+Parameters
+----------
+ uwb_timeout : UWB staleness threshold (s) default 1.5
+ cam_timeout : Camera staleness threshold (s) default 1.0
+ predict_timeout : Max KF coast before no publish (s) default 3.0
+ process_noise : KF acceleration noise std-dev (m/s²) default 3.0
+ meas_noise_uwb : UWB position noise std-dev (m) default 0.20
+ meas_noise_cam : Camera position noise std-dev (m) default 0.12
+ control_rate : Publish / KF predict rate (Hz) default 20.0
+ confidence_threshold: Min source confidence to use (0–1) default 0.15
+
+Usage
+-----
+ ros2 launch saltybot_social_tracking tracking.launch.py
+"""
+
+import math
+import time
+
+import rclpy
+from rclpy.node import Node
+from geometry_msgs.msg import PoseStamped
+from std_msgs.msg import Header
+
+from saltybot_social_msgs.msg import FusedTarget
+from saltybot_social_tracking.kalman_tracker import KalmanTracker
+from saltybot_social_tracking.source_arbiter import (
+ uwb_confidence,
+ camera_confidence,
+ select_source,
+ fuse_positions,
+ composite_confidence,
+ bearing_and_range,
+)
+
+_BIG_AGE = 1e9 # sentinel: source never received
+
+
+class TrackingFusionNode(Node):
+
+ def __init__(self):
+ super().__init__("tracking_fusion")
+
+ # ── Parameters ────────────────────────────────────────────────────────
+ self.declare_parameter("uwb_timeout", 1.5)
+ self.declare_parameter("cam_timeout", 1.0)
+ self.declare_parameter("predict_timeout", 3.0)
+ self.declare_parameter("process_noise", 3.0)
+ self.declare_parameter("meas_noise_uwb", 0.20)
+ self.declare_parameter("meas_noise_cam", 0.12)
+ self.declare_parameter("control_rate", 20.0)
+ self.declare_parameter("confidence_threshold", 0.15)
+
+ self._p = self._load_params()
+
+ # ── State ─────────────────────────────────────────────────────────────
+ self._kf = KalmanTracker(
+ process_noise=self._p["process_noise"],
+ meas_noise_uwb=self._p["meas_noise_uwb"],
+ meas_noise_cam=self._p["meas_noise_cam"],
+ )
+ self._last_uwb_t: float = 0.0 # monotonic; 0 = never received
+ self._last_cam_t: float = 0.0
+ self._uwb_x: float = 0.0
+ self._uwb_y: float = 0.0
+ self._cam_x: float = 0.0
+ self._cam_y: float = 0.0
+ self._uwb_tag_id: str = ""
+ self._last_predict_t: float = 0.0 # monotonic time of last predict call
+ self._last_any_t: float = 0.0 # monotonic time of last live measurement
+
+ # ── Subscriptions ─────────────────────────────────────────────────────
+ self.create_subscription(
+ PoseStamped, "/uwb/target", self._uwb_cb, 10)
+ self.create_subscription(
+ PoseStamped, "/person/target", self._cam_cb, 10)
+
+ # ── Publisher ─────────────────────────────────────────────────────────
+ self._pub = self.create_publisher(FusedTarget, "/social/tracking/fused_target", 10)
+
+ # ── Timer ─────────────────────────────────────────────────────────────
+ self._timer = self.create_timer(
+ 1.0 / self._p["control_rate"], self._control_cb)
+
+ self.get_logger().info(
+ f"TrackingFusion ready "
+ f"rate={self._p['control_rate']}Hz "
+ f"uwb_timeout={self._p['uwb_timeout']}s "
+ f"cam_timeout={self._p['cam_timeout']}s "
+ f"predict_timeout={self._p['predict_timeout']}s "
+ f"process_noise={self._p['process_noise']}m/s²"
+ )
+
+ # ── Parameter helpers ──────────────────────────────────────────────────────
+
+ def _load_params(self) -> dict:
+ return {
+ "uwb_timeout": self.get_parameter("uwb_timeout").value,
+ "cam_timeout": self.get_parameter("cam_timeout").value,
+ "predict_timeout": self.get_parameter("predict_timeout").value,
+ "process_noise": self.get_parameter("process_noise").value,
+ "meas_noise_uwb": self.get_parameter("meas_noise_uwb").value,
+ "meas_noise_cam": self.get_parameter("meas_noise_cam").value,
+ "control_rate": self.get_parameter("control_rate").value,
+ "confidence_threshold": self.get_parameter("confidence_threshold").value,
+ }
+
+ # ── Measurement callbacks ──────────────────────────────────────────────────
+
+ def _uwb_cb(self, msg: PoseStamped) -> None:
+ self._uwb_x = msg.pose.position.x
+ self._uwb_y = msg.pose.position.y
+ self._uwb_tag_id = "" # PoseStamped has no tag field; tag reported via /uwb/bearing
+ t = time.monotonic()
+ self._last_uwb_t = t
+ self._last_any_t = t
+
+ # Seed KF on first measurement
+ if not self._kf.initialized:
+ self._kf.initialize(self._uwb_x, self._uwb_y)
+ self._last_predict_t = t
+
+ def _cam_cb(self, msg: PoseStamped) -> None:
+ self._cam_x = msg.pose.position.x
+ self._cam_y = msg.pose.position.y
+ t = time.monotonic()
+ self._last_cam_t = t
+ self._last_any_t = t
+
+ if not self._kf.initialized:
+ self._kf.initialize(self._cam_x, self._cam_y)
+ self._last_predict_t = t
+
+ # ── Control loop ───────────────────────────────────────────────────────────
+
+ def _control_cb(self) -> None:
+ self._p = self._load_params()
+
+ if not self._kf.initialized:
+ return # no data yet — nothing to publish
+
+ now = time.monotonic()
+ dt = now - self._last_predict_t if self._last_predict_t > 0.0 else (
+ 1.0 / self._p["control_rate"]
+ )
+ self._last_predict_t = now
+
+ # KF predict
+ self._kf.predict(dt)
+
+ # Source confidence
+ uwb_age = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else _BIG_AGE
+ cam_age = (now - self._last_cam_t) if self._last_cam_t > 0.0 else _BIG_AGE
+
+ u_conf = uwb_confidence(uwb_age, self._p["uwb_timeout"])
+ c_conf = camera_confidence(cam_age, self._p["cam_timeout"])
+
+ threshold = self._p["confidence_threshold"]
+ source = select_source(u_conf, c_conf, threshold)
+
+ if source == "predicted":
+ # Check predict_timeout — stop publishing if too stale
+ last_live_age = (
+ (now - self._last_any_t) if self._last_any_t > 0.0 else _BIG_AGE
+ )
+ if last_live_age > self._p["predict_timeout"]:
+ return # silently stop publishing
+
+ # Apply measurement update if a live source exists
+ if source == "fused":
+ fx, fy = fuse_positions(
+ self._uwb_x, self._uwb_y, u_conf,
+ self._cam_x, self._cam_y, c_conf,
+ )
+ self._kf.update(fx, fy, source="uwb") # use tighter noise for blended
+ elif source == "uwb":
+ self._kf.update(self._uwb_x, self._uwb_y, source="uwb")
+ elif source == "camera":
+ self._kf.update(self._cam_x, self._cam_y, source="camera")
+ # "predicted" → no update; KF coasts
+
+ # Build and publish message
+ kx, ky = self._kf.position
+ vx, vy = self._kf.velocity
+ kf_unc = self._kf.position_uncertainty_m()
+ conf = composite_confidence(u_conf, c_conf, source, kf_unc)
+ bearing, range_m = bearing_and_range(kx, ky)
+
+ hdr = Header()
+ hdr.stamp = self.get_clock().now().to_msg()
+ hdr.frame_id = "base_link"
+
+ msg = FusedTarget()
+ msg.header = hdr
+ msg.position.x = kx
+ msg.position.y = ky
+ msg.position.z = 0.0
+ msg.velocity.x = vx
+ msg.velocity.y = vy
+ msg.velocity.z = 0.0
+ msg.range_m = float(range_m)
+ msg.bearing_rad = float(bearing)
+ msg.confidence = float(conf)
+ msg.active_source = source
+ msg.tag_id = self._uwb_tag_id if "uwb" in source else ""
+
+ self._pub.publish(msg)
+
+
+# ── Entry point ────────────────────────────────────────────────────────────────
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = TrackingFusionNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.try_shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/setup.cfg b/jetson/ros2_ws/src/saltybot_social_tracking/setup.cfg
new file mode 100644
index 0000000..55fb3ba
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/setup.cfg
@@ -0,0 +1,5 @@
+[develop]
+script_dir=$base/lib/saltybot_social_tracking
+
+[install]
+install_scripts=$base/lib/saltybot_social_tracking
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/setup.py b/jetson/ros2_ws/src/saltybot_social_tracking/setup.py
new file mode 100644
index 0000000..41b594a
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/setup.py
@@ -0,0 +1,32 @@
+from setuptools import setup, find_packages
+import os
+from glob import glob
+
+package_name = "saltybot_social_tracking"
+
+setup(
+ name=package_name,
+ version="0.1.0",
+ packages=find_packages(exclude=["test"]),
+ data_files=[
+ ("share/ament_index/resource_index/packages",
+ [f"resource/{package_name}"]),
+ (f"share/{package_name}", ["package.xml"]),
+ (os.path.join("share", package_name, "config"),
+ glob("config/*.yaml")),
+ (os.path.join("share", package_name, "launch"),
+ glob("launch/*.py")),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="sl-controls",
+ maintainer_email="sl-controls@saltylab.local",
+ description="Multi-modal tracking fusion (UWB + camera Kalman filter)",
+ license="MIT",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ f"tracking_fusion_node = {package_name}.tracking_fusion_node:main",
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_social_tracking/test/test_tracking_fusion.py b/jetson/ros2_ws/src/saltybot_social_tracking/test/test_tracking_fusion.py
new file mode 100644
index 0000000..06ca258
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_social_tracking/test/test_tracking_fusion.py
@@ -0,0 +1,438 @@
+"""
+test_tracking_fusion.py — Unit tests for saltybot_social_tracking pure modules.
+
+Tests cover:
+ - KalmanTracker: initialization, predict, update, state accessors
+ - source_arbiter: confidence functions, source selection, fusion, bearing
+
+No ROS2 runtime required.
+"""
+
+import math
+import sys
+import os
+
+# Allow running: python -m pytest test/test_tracking_fusion.py
+# from the package root without installing the package.
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+import pytest
+import numpy as np
+
+from saltybot_social_tracking.kalman_tracker import KalmanTracker
+from saltybot_social_tracking.source_arbiter import (
+ uwb_confidence,
+ camera_confidence,
+ select_source,
+ fuse_positions,
+ composite_confidence,
+ bearing_and_range,
+)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# KalmanTracker tests
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestKalmanTrackerInit:
+
+ def test_not_initialized_by_default(self):
+ kf = KalmanTracker()
+ assert not kf.initialized
+
+ def test_initialize_sets_position(self):
+ kf = KalmanTracker()
+ kf.initialize(3.0, 1.5)
+ assert kf.initialized
+ x, y = kf.position
+ assert abs(x - 3.0) < 1e-9
+ assert abs(y - 1.5) < 1e-9
+
+ def test_initialize_sets_zero_velocity(self):
+ kf = KalmanTracker()
+ kf.initialize(1.0, -2.0)
+ vx, vy = kf.velocity
+ assert abs(vx) < 1e-9
+ assert abs(vy) < 1e-9
+
+ def test_initialize_origin(self):
+ kf = KalmanTracker()
+ kf.initialize(0.0, 0.0)
+ assert kf.initialized
+ x, y = kf.position
+ assert x == 0.0 and y == 0.0
+
+
+class TestKalmanTrackerPredict:
+
+ def test_predict_zero_dt_no_change(self):
+ kf = KalmanTracker()
+ kf.initialize(2.0, 1.0)
+ kf.predict(0.0)
+ x, y = kf.position
+ assert abs(x - 2.0) < 1e-9
+ assert abs(y - 1.0) < 1e-9
+
+ def test_predict_negative_dt_no_change(self):
+ kf = KalmanTracker()
+ kf.initialize(2.0, 1.0)
+ kf.predict(-0.1)
+ x, y = kf.position
+ assert abs(x - 2.0) < 1e-9
+
+ def test_predict_constant_velocity(self):
+ """After a position update gives the filter a velocity, predict should extrapolate."""
+ kf = KalmanTracker(process_noise=0.001, meas_noise_uwb=0.001)
+ kf.initialize(0.0, 0.0)
+ # Force filter to track a moving target to build up velocity estimate
+ dt = 0.05
+ for i in range(40):
+ t = i * dt
+ kf.predict(dt)
+ kf.update(2.0 * t, 0.0, "uwb") # 2 m/s in x
+
+ # After many updates the velocity estimate should be close to 2 m/s
+ vx, vy = kf.velocity
+ assert abs(vx - 2.0) < 0.3, f"vx={vx:.3f}"
+ assert abs(vy) < 0.2
+
+ def test_predict_grows_uncertainty(self):
+ kf = KalmanTracker()
+ kf.initialize(0.0, 0.0)
+ unc_before = kf.position_uncertainty_m()
+ kf.predict(1.0)
+ unc_after = kf.position_uncertainty_m()
+ assert unc_after > unc_before
+
+ def test_predict_multiple_steps(self):
+ kf = KalmanTracker()
+ kf.initialize(0.0, 0.0)
+ kf.predict(0.1)
+ kf.predict(0.1)
+ kf.predict(0.1)
+ # No assertion on exact value; just verify no exception and state is finite
+ x, y = kf.position
+ assert math.isfinite(x) and math.isfinite(y)
+
+
+class TestKalmanTrackerUpdate:
+
+ def test_update_pulls_position_toward_measurement(self):
+ kf = KalmanTracker()
+ kf.initialize(0.0, 0.0)
+ kf.update(5.0, 5.0, "uwb")
+ x, y = kf.position
+ assert x > 0.0 and y > 0.0
+ assert x < 5.0 and y < 5.0 # blended, not jumped
+
+ def test_update_reduces_uncertainty(self):
+ kf = KalmanTracker()
+ kf.initialize(0.0, 0.0)
+ kf.predict(1.0) # uncertainty grows
+ unc_mid = kf.position_uncertainty_m()
+ kf.update(0.1, 0.1, "uwb") # measurement corrects
+ unc_after = kf.position_uncertainty_m()
+ assert unc_after < unc_mid
+
+ def test_update_converges_to_true_position(self):
+ """Many updates from same point should converge to that point."""
+ kf = KalmanTracker(meas_noise_uwb=0.01)
+ kf.initialize(0.0, 0.0)
+ for _ in range(50):
+ kf.update(3.0, -1.0, "uwb")
+ x, y = kf.position
+ assert abs(x - 3.0) < 0.05, f"x={x:.4f}"
+ assert abs(y - (-1.0)) < 0.05, f"y={y:.4f}"
+
+ def test_update_camera_source_different_noise(self):
+ """Camera and UWB updates should both move state (noise model differs)."""
+ kf1 = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.10)
+ kf1.initialize(0.0, 0.0)
+ kf1.update(5.0, 0.0, "uwb")
+ x_uwb, _ = kf1.position
+
+ kf2 = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.10)
+ kf2.initialize(0.0, 0.0)
+ kf2.update(5.0, 0.0, "camera")
+ x_cam, _ = kf2.position
+
+ # Camera has lower noise → stronger pull toward measurement
+ assert x_cam > x_uwb
+
+ def test_update_unknown_source_defaults_to_camera_noise(self):
+ kf = KalmanTracker()
+ kf.initialize(0.0, 0.0)
+ kf.update(2.0, 0.0, "other") # unknown source — should not raise
+ x, _ = kf.position
+ assert x > 0.0
+
+ def test_position_uncertainty_finite(self):
+ kf = KalmanTracker()
+ kf.initialize(1.0, 1.0)
+ kf.predict(0.05)
+ kf.update(1.1, 0.9, "uwb")
+ assert math.isfinite(kf.position_uncertainty_m())
+ assert kf.position_uncertainty_m() >= 0.0
+
+ def test_covariance_copy_is_independent(self):
+ kf = KalmanTracker()
+ kf.initialize(0.0, 0.0)
+ cov = kf.covariance_copy()
+ cov[0, 0] = 9999.0 # mutate copy
+ assert kf.covariance_copy()[0, 0] != 9999.0
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# source_arbiter tests
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestUwbConfidence:
+
+ def test_zero_age_returns_quality(self):
+ assert abs(uwb_confidence(0.0, 1.5) - 1.0) < 1e-9
+
+ def test_at_timeout_returns_zero(self):
+ assert uwb_confidence(1.5, 1.5) == pytest.approx(0.0)
+
+ def test_beyond_timeout_returns_zero(self):
+ assert uwb_confidence(2.0, 1.5) == 0.0
+
+ def test_half_timeout_returns_half(self):
+ assert uwb_confidence(0.75, 1.5) == pytest.approx(0.5)
+
+ def test_quality_scales_result(self):
+ assert uwb_confidence(0.0, 1.5, quality=0.7) == pytest.approx(0.7)
+
+ def test_large_age_returns_zero(self):
+ assert uwb_confidence(1e9, 1.5) == 0.0
+
+ def test_zero_timeout_returns_zero(self):
+ assert uwb_confidence(0.0, 0.0) == 0.0
+
+
+class TestCameraConfidence:
+
+ def test_zero_age_full_quality(self):
+ assert camera_confidence(0.0, 1.0, quality=1.0) == pytest.approx(1.0)
+
+ def test_at_timeout_zero(self):
+ assert camera_confidence(1.0, 1.0) == pytest.approx(0.0)
+
+ def test_beyond_timeout_zero(self):
+ assert camera_confidence(2.0, 1.0) == 0.0
+
+ def test_quality_scales(self):
+ # age=0, quality=0.8
+ assert camera_confidence(0.0, 1.0, quality=0.8) == pytest.approx(0.8)
+
+ def test_halfway(self):
+ assert camera_confidence(0.5, 1.0) == pytest.approx(0.5)
+
+
+class TestSelectSource:
+
+ def test_both_above_threshold_fused(self):
+ assert select_source(0.8, 0.6) == "fused"
+
+ def test_only_uwb_above_threshold(self):
+ assert select_source(0.8, 0.0) == "uwb"
+
+ def test_only_cam_above_threshold(self):
+ assert select_source(0.0, 0.7) == "camera"
+
+ def test_both_below_threshold(self):
+ assert select_source(0.0, 0.0) == "predicted"
+
+ def test_threshold_boundary_uwb(self):
+ # Exactly at threshold — should be treated as live
+ assert select_source(0.15, 0.0, threshold=0.15) == "uwb"
+
+ def test_threshold_boundary_below(self):
+ assert select_source(0.14, 0.0, threshold=0.15) == "predicted"
+
+ def test_custom_threshold(self):
+ assert select_source(0.5, 0.0, threshold=0.6) == "predicted"
+ assert select_source(0.5, 0.0, threshold=0.4) == "uwb"
+
+
+class TestFusePositions:
+
+ def test_equal_confidence_returns_midpoint(self):
+ x, y = fuse_positions(0.0, 0.0, 1.0, 4.0, 4.0, 1.0)
+ assert abs(x - 2.0) < 1e-9
+ assert abs(y - 2.0) < 1e-9
+
+ def test_full_uwb_weight_returns_uwb(self):
+ x, y = fuse_positions(3.0, 1.0, 1.0, 0.0, 0.0, 0.0)
+ assert abs(x - 3.0) < 1e-9
+
+ def test_full_cam_weight_returns_cam(self):
+ x, y = fuse_positions(0.0, 0.0, 0.0, -2.0, 5.0, 1.0)
+ assert abs(x - (-2.0)) < 1e-9
+ assert abs(y - 5.0) < 1e-9
+
+ def test_weighted_blend(self):
+ # UWB at (0,0) conf=3, camera at (4,0) conf=1 → x = 3/4*0 + 1/4*4 = 1
+ x, y = fuse_positions(0.0, 0.0, 3.0, 4.0, 0.0, 1.0)
+ assert abs(x - 1.0) < 1e-9
+
+ def test_zero_total_returns_uwb_fallback(self):
+ x, y = fuse_positions(7.0, 2.0, 0.0, 3.0, 1.0, 0.0)
+ assert abs(x - 7.0) < 1e-9
+
+
+class TestCompositeConfidence:
+
+ def test_fused_source_high_confidence(self):
+ conf = composite_confidence(0.9, 0.8, "fused", 0.05)
+ assert conf > 0.7
+
+ def test_predicted_source_capped(self):
+ conf = composite_confidence(0.0, 0.0, "predicted", 0.1)
+ assert conf <= 0.4
+
+ def test_predicted_high_uncertainty_low_confidence(self):
+ conf = composite_confidence(0.0, 0.0, "predicted", 3.0, max_kf_uncertainty_m=3.0)
+ assert conf == pytest.approx(0.0)
+
+ def test_uwb_only(self):
+ conf = composite_confidence(0.8, 0.0, "uwb", 0.05)
+ assert conf > 0.3
+
+ def test_camera_only(self):
+ conf = composite_confidence(0.0, 0.7, "camera", 0.05)
+ assert conf > 0.2
+
+ def test_high_kf_uncertainty_reduces_confidence(self):
+ low_unc = composite_confidence(0.9, 0.0, "uwb", 0.1)
+ high_unc = composite_confidence(0.9, 0.0, "uwb", 2.9)
+ assert low_unc > high_unc
+
+
+class TestBearingAndRange:
+
+ def test_straight_ahead(self):
+ bearing, rng = bearing_and_range(2.0, 0.0)
+ assert abs(bearing) < 1e-9
+ assert abs(rng - 2.0) < 1e-9
+
+ def test_left_of_robot(self):
+ # +Y = left in base_link frame; bearing should be positive
+ bearing, rng = bearing_and_range(0.0, 1.0)
+ assert abs(bearing - math.pi / 2.0) < 1e-9
+ assert abs(rng - 1.0) < 1e-9
+
+ def test_right_of_robot(self):
+ bearing, rng = bearing_and_range(0.0, -1.0)
+ assert abs(bearing - (-math.pi / 2.0)) < 1e-9
+
+ def test_diagonal(self):
+ bearing, rng = bearing_and_range(1.0, 1.0)
+ assert abs(bearing - math.pi / 4.0) < 1e-9
+ assert abs(rng - math.sqrt(2.0)) < 1e-9
+
+ def test_at_origin(self):
+ bearing, rng = bearing_and_range(0.0, 0.0)
+ assert rng == pytest.approx(0.0)
+ assert math.isfinite(bearing) # atan2(0,0) = 0 in most implementations
+
+ def test_range_always_non_negative(self):
+ for x, y in [(-1, 0), (0, -1), (-2, -3), (5, -5)]:
+ _, rng = bearing_and_range(x, y)
+ assert rng >= 0.0
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Integration scenario tests
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestIntegrationScenarios:
+
+ def test_euc_speed_velocity_tracking(self):
+ """Verify KF can track EUC speed (8 m/s) within 0.5 m/s after warm-up."""
+ kf = KalmanTracker(process_noise=3.0, meas_noise_uwb=0.20)
+ kf.initialize(0.0, 0.0)
+ dt = 1.0 / 10.0 # 10 Hz UWB rate
+ speed = 8.0 # m/s (≈29 km/h)
+ for i in range(60):
+ t = i * dt
+ kf.predict(dt)
+ kf.update(speed * t, 0.0, "uwb")
+ vx, vy = kf.velocity
+ assert abs(vx - speed) < 0.6, f"vx={vx:.2f} expected≈{speed}"
+ assert abs(vy) < 0.3
+
+ def test_signal_loss_recovery(self):
+ """
+ After 1 s of signal loss the filter should still have a reasonable
+ position estimate (not diverged to infinity).
+ """
+ kf = KalmanTracker(process_noise=3.0)
+ kf.initialize(2.0, 0.5)
+ # Warm up with 2 m/s x motion
+ dt = 0.05
+ for i in range(20):
+ kf.predict(dt)
+ kf.update(2.0 * (i + 1) * dt, 0.0, "uwb")
+ # Coast for 1 second (20 × 50 ms) without measurements
+ for _ in range(20):
+ kf.predict(dt)
+ x, y = kf.position
+ assert math.isfinite(x) and math.isfinite(y)
+ assert abs(x) < 20.0 # shouldn't have drifted more than 20 m
+
+ def test_uwb_to_camera_handoff(self):
+ """
+ Simulate UWB going stale and camera taking over — Kalman should
+ smoothly continue tracking without a jump.
+ """
+ kf = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.12)
+ kf.initialize(0.0, 0.0)
+ dt = 0.05
+ # Phase 1: UWB active
+ for i in range(20):
+ kf.predict(dt)
+ kf.update(float(i) * 0.1, 0.0, "uwb")
+ x_at_handoff, _ = kf.position
+
+ # Phase 2: Camera takes over from same trajectory
+ for i in range(20, 40):
+ kf.predict(dt)
+ kf.update(float(i) * 0.1, 0.0, "camera")
+ x_after, _ = kf.position
+
+ # Position should have continued progressing (not stuck or reset)
+ assert x_after > x_at_handoff
+
+ def test_confidence_degradation_during_coast(self):
+ """Composite confidence should drop as KF uncertainty grows during coast."""
+ kf = KalmanTracker(process_noise=3.0)
+ kf.initialize(2.0, 0.0)
+
+ # Fresh: tight uncertainty → high confidence
+ unc_fresh = kf.position_uncertainty_m()
+ conf_fresh = composite_confidence(0.0, 0.0, "predicted", unc_fresh)
+
+ # After 2 s coast
+ for _ in range(40):
+ kf.predict(0.05)
+ unc_stale = kf.position_uncertainty_m()
+ conf_stale = composite_confidence(0.0, 0.0, "predicted", unc_stale)
+
+ assert conf_fresh >= conf_stale
+
+ def test_fused_source_confidence_weighted_position(self):
+ """Fused position should sit between UWB and camera, closer to higher-conf source."""
+ # UWB at x=0 with high conf, camera at x=10 with low conf
+ uwb_c = 0.9
+ cam_c = 0.1
+ fx, fy = fuse_positions(0.0, 0.0, uwb_c, 10.0, 0.0, cam_c)
+ # Should be much closer to UWB (0) than camera (10)
+ assert fx < 3.0, f"fused_x={fx:.2f}"
+
+ def test_select_source_transitions(self):
+ """Verify correct source transitions as confidences change."""
+ assert select_source(0.9, 0.8) == "fused"
+ assert select_source(0.9, 0.0) == "uwb"
+ assert select_source(0.0, 0.8) == "camera"
+ assert select_source(0.0, 0.0) == "predicted"