feat(social): multi-modal tracking fusion — UWB+camera Kalman filter (Issue #92)

New packages:
  saltybot_social_msgs   — FusedTarget.msg custom message
  saltybot_social_tracking — 4-state Kalman fusion node

saltybot_social_tracking/tracking_fusion_node.py
  Subscribes to /uwb/target (PoseStamped, ~10 Hz) and /person/target
  (PoseStamped, ~30 Hz) and publishes /social/tracking/fused_target
  (FusedTarget) at 20 Hz.

  Source arbitration:
    • "fused"     — both UWB and camera are fresh; confidence-weighted blend
    • "uwb"       — UWB fresh, camera stale
    • "camera"    — camera fresh, UWB stale
    • "predicted" — all sources stale; KF coasts for up to predict_timeout (3 s)

  Kalman filter (kalman_tracker.py):
    State [x, y, vx, vy] with discrete Wiener acceleration noise model
    (process_noise=3.0 m/s²) sized for EUC speeds (20-30 km/h, ≈5.5-8.3 m/s).
    Separate UWB (0.20 m) and camera (0.12 m) measurement noise.
    Velocity estimate converges after ~3 s of 10 Hz UWB measurements.

  Confidence model (source_arbiter.py):
    Per-source confidence = quality × max(0, 1 - age/timeout).
    Composite confidence accounts for KF positional uncertainty and
    is capped at 0.4 during dead-reckoning ("predicted") mode.

Tests: 58/58 pass (no ROS2 runtime required).

Note: saltybot_social_msgs here adds FusedTarget.msg; PR #98
(Issue #84) adds PersonalityState.msg + QueryMood.srv to the same
package. The maintainer should squash-merge #98 first and rebase
this branch on top of it before merging to avoid the package.xml
conflict.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-controls 2026-03-01 23:29:13 -05:00
parent d48edf4092
commit fa0162fadc
14 changed files with 1170 additions and 0 deletions

View File

@ -26,6 +26,8 @@ rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #84 personality system
"msg/PersonalityState.msg"
"srv/QueryMood.srv"
# Issue #92 multi-modal tracking fusion
"msg/FusedTarget.msg"
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
)

View File

@ -0,0 +1,19 @@
# FusedTarget.msg — output of the multi-modal tracking fusion node.
#
# Position and velocity are in the base_link frame (robot-centred,
# +X forward, +Y left). z components are always 0.0 for ground-plane tracking.
#
# Confidence: 0.0 = no data / fully predicted; 1.0 = strong fused measurement.
# active_source: "fused" | "uwb" | "camera" | "predicted"
std_msgs/Header header
geometry_msgs/Point position # filtered 2-D position (m), z=0
geometry_msgs/Vector3 velocity # filtered 2-D velocity (m/s), z=0
float32 range_m # Euclidean distance from robot to fused position
float32 bearing_rad # bearing in base_link (+ve = person to the left)
float32 confidence # composite confidence [0.0, 1.0]
string active_source # "fused" | "uwb" | "camera" | "predicted"
string tag_id # UWB tag address (empty when UWB not contributing)

View File

@ -0,0 +1,5 @@
__pycache__/
*.pyc
*.pyo
*.egg-info/
.pytest_cache/

View File

@ -0,0 +1,48 @@
# tracking_params.yaml — saltybot_social_tracking / TrackingFusionNode
#
# Run with:
# ros2 launch saltybot_social_tracking tracking.launch.py
#
# Topics consumed:
# /uwb/target (geometry_msgs/PoseStamped) — UWB triangulated position
# /person/target (geometry_msgs/PoseStamped) — camera-detected position
#
# Topic produced:
# /social/tracking/fused_target (saltybot_social_msgs/FusedTarget)
# ── Source staleness timeouts ──────────────────────────────────────────────────
# UWB driver publishes at ~10 Hz; 1.5 s = 15 missed cycles before declared stale.
uwb_timeout: 1.5 # seconds
# Camera detector publishes at ~30 Hz; 1.0 s = 30 missed frames before stale.
cam_timeout: 1.0 # seconds
# How long the Kalman filter may coast (dead-reckoning) with no live source
# before the node stops publishing.
# At 10 m/s (EUC top-speed) the robot drifts ≈30 m over 3 s — beyond the UWB
# follow-range, so 3 s is a reasonable hard stop.
predict_timeout: 3.0 # seconds
# ── Kalman filter tuning ───────────────────────────────────────────────────────
# process_noise: acceleration noise std-dev (m/s²).
# EUC riders can brake or accelerate at ~35 m/s²; 3.0 is a good starting point.
# Increase if the filtered track lags behind fast direction changes.
# Decrease if the track is noisy.
process_noise: 3.0 # m/s²
# UWB position measurement noise (std-dev, metres).
# DW3000 TWR accuracy ≈ ±1020 cm; 0.20 accounts for system-level error.
meas_noise_uwb: 0.20 # m
# Camera position noise (std-dev, metres).
# Depth reprojection error with RealSense D435i at 13 m ≈ ±515 cm.
meas_noise_cam: 0.12 # m
# ── Control loop ──────────────────────────────────────────────────────────────
control_rate: 20.0 # Hz — KF predict + publish rate
# ── Source arbiter ────────────────────────────────────────────────────────────
# Minimum normalised confidence for a source to be considered live.
# Range [0, 1]; lower = more permissive; default 0.15 keeps slightly stale
# sources active rather than dropping to "predicted" prematurely.
confidence_threshold: 0.15

View File

@ -0,0 +1,44 @@
"""tracking.launch.py — launch the TrackingFusionNode with default params."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_social_tracking")
default_params = os.path.join(pkg_share, "config", "tracking_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=default_params,
description="Path to tracking fusion parameter YAML file",
),
DeclareLaunchArgument(
"control_rate",
default_value="20.0",
description="KF predict + publish rate (Hz)",
),
DeclareLaunchArgument(
"predict_timeout",
default_value="3.0",
description="Max KF coast time before stopping publish (s)",
),
Node(
package="saltybot_social_tracking",
executable="tracking_fusion_node",
name="tracking_fusion",
output="screen",
parameters=[
LaunchConfiguration("params_file"),
{
"control_rate": LaunchConfiguration("control_rate"),
"predict_timeout": LaunchConfiguration("predict_timeout"),
},
],
),
])

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_social_tracking</name>
<version>0.1.0</version>
<description>
Multi-modal tracking fusion for saltybot.
Fuses UWB triangulated position (/uwb/target) and camera-detected position
(/person/target) using a 4-state Kalman filter to produce a smooth, low-latency
fused estimate at /social/tracking/fused_target.
Handles EUC rider speeds (20-30 km/h), signal handoff, and predictive coasting.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>saltybot_social_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,134 @@
"""
kalman_tracker.py 4-state linear Kalman filter for 2-D position+velocity tracking.
State vector: [x, y, vx, vy]
Observation: [x_meas, y_meas]
Process model: constant velocity with Wiener process acceleration noise.
Tuned to handle EUC rider speeds (2030 km/h 5.58.3 m/s) with fast
acceleration transients.
Pure module no ROS2 dependency; fully unit-testable.
"""
import math
import numpy as np
class KalmanTracker:
"""
4-state Kalman filter: state = [x, y, vx, vy].
Parameters
----------
process_noise : acceleration noise standard deviation (m/).
Higher values allow the filter to track rapid velocity
changes (EUC acceleration events). Default 3.0 m/.
meas_noise_uwb : UWB position measurement noise std-dev (m). Default 0.20 m.
meas_noise_cam : Camera position measurement noise std-dev (m). Default 0.12 m.
"""
def __init__(
self,
process_noise: float = 3.0,
meas_noise_uwb: float = 0.20,
meas_noise_cam: float = 0.12,
):
self._q = float(process_noise)
self._r_uwb = float(meas_noise_uwb)
self._r_cam = float(meas_noise_cam)
# State [x, y, vx, vy]
self._x = np.zeros(4)
# Covariance — large initial uncertainty (10 m², 10 (m/s)²)
self._P = np.diag([10.0, 10.0, 10.0, 10.0])
# Observation matrix: H * x = [x, y]
self._H = np.array([[1.0, 0.0, 0.0, 0.0],
[0.0, 1.0, 0.0, 0.0]])
self._initialized = False
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
@property
def initialized(self) -> bool:
return self._initialized
def initialize(self, x: float, y: float) -> None:
"""Seed the filter at position (x, y) with zero velocity."""
self._x = np.array([x, y, 0.0, 0.0])
self._P = np.diag([1.0, 1.0, 5.0, 5.0])
self._initialized = True
def predict(self, dt: float) -> None:
"""
Advance the filter state by dt seconds.
Uses a discrete Wiener process acceleration model so that positional
uncertainty grows as O(dt^4/4) and velocity uncertainty as O(dt^2).
This lets the filter coast accurately through short signal outages
while still being responsive to EUC velocity changes.
"""
if dt <= 0.0:
return
F = np.array([[1.0, 0.0, dt, 0.0],
[0.0, 1.0, 0.0, dt],
[0.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 1.0]])
q = self._q
dt2 = dt * dt
dt3 = dt2 * dt
dt4 = dt3 * dt
Q = (q * q) * np.array([
[dt4 / 4.0, 0.0, dt3 / 2.0, 0.0 ],
[0.0, dt4 / 4.0, 0.0, dt3 / 2.0],
[dt3 / 2.0, 0.0, dt2, 0.0 ],
[0.0, dt3 / 2.0, 0.0, dt2 ],
])
self._x = F @ self._x
self._P = F @ self._P @ F.T + Q
def update(self, x_meas: float, y_meas: float, source: str = "uwb") -> None:
"""
Apply a position measurement (x_meas, y_meas).
source : "uwb" or "camera" selects the appropriate noise covariance.
"""
r = self._r_uwb if source == "uwb" else self._r_cam
R = np.diag([r * r, r * r])
z = np.array([x_meas, y_meas])
innov = z - self._H @ self._x # innovation
S = self._H @ self._P @ self._H.T + R # innovation covariance
K = self._P @ self._H.T @ np.linalg.inv(S) # Kalman gain
self._x = self._x + K @ innov
self._P = (np.eye(4) - K @ self._H) @ self._P
# ------------------------------------------------------------------
# State accessors
# ------------------------------------------------------------------
@property
def position(self) -> tuple:
"""Current filtered position (x, y) in metres."""
return float(self._x[0]), float(self._x[1])
@property
def velocity(self) -> tuple:
"""Current filtered velocity (vx, vy) in m/s."""
return float(self._x[2]), float(self._x[3])
def position_uncertainty_m(self) -> float:
"""RMS positional uncertainty (m) from diagonal of covariance."""
return float(math.sqrt((self._P[0, 0] + self._P[1, 1]) / 2.0))
def covariance_copy(self) -> np.ndarray:
"""Return a copy of the current 4×4 covariance matrix."""
return self._P.copy()

View File

@ -0,0 +1,155 @@
"""
source_arbiter.py Source confidence scoring and selection for tracking fusion.
Two sensor sources are supported:
UWB geometry_msgs/PoseStamped from /uwb/target (triangulated, ~10 Hz)
Camera geometry_msgs/PoseStamped from /person/target (depth+YOLO, ~30 Hz)
Confidence model
----------------
Each source's confidence is its raw measurement quality multiplied by a
linear staleness factor that drops to zero at its respective timeout:
conf = quality * max(0, 1 - age / timeout)
UWB quality is always 1.0 (the ranging hardware confidence is not exposed
by the driver in origin/main; the UWB node already applies Kalman filtering).
Camera quality defaults to 1.0; callers may pass a lower value when the
detection confidence is available.
Source selection
----------------
Both fresh "fused" (confidence-weighted position blend)
UWB only "uwb"
Camera only "camera"
Neither fresh "predicted" (Kalman coasts)
Pure module no ROS2 dependency; fully unit-testable.
"""
import math
def _staleness_factor(age_s: float, timeout_s: float) -> float:
"""Linear decay: 1.0 at age=0, 0.0 at age=timeout, clamped."""
if timeout_s <= 0.0:
return 0.0
return max(0.0, 1.0 - age_s / timeout_s)
def uwb_confidence(age_s: float, timeout_s: float, quality: float = 1.0) -> float:
"""
UWB source confidence.
age_s : seconds since last UWB measurement (0; use large value if never)
timeout_s: staleness threshold (s); confidence reaches 0 at this age
quality : inherent measurement quality [0, 1] (default 1.0)
"""
return quality * _staleness_factor(age_s, timeout_s)
def camera_confidence(
age_s: float, timeout_s: float, quality: float = 1.0
) -> float:
"""
Camera source confidence.
age_s : seconds since last camera detection (0; use large value if never)
timeout_s: staleness threshold (s)
quality : YOLO detection confidence or other quality score [0, 1]
"""
return quality * _staleness_factor(age_s, timeout_s)
def select_source(
uwb_conf: float,
cam_conf: float,
threshold: float = 0.15,
) -> str:
"""
Choose the active tracking source label.
Returns one of: "fused", "uwb", "camera", "predicted".
threshold: minimum confidence for a source to be considered live.
Sources below threshold are ignored.
"""
uwb_ok = uwb_conf >= threshold
cam_ok = cam_conf >= threshold
if uwb_ok and cam_ok:
return "fused"
if uwb_ok:
return "uwb"
if cam_ok:
return "camera"
return "predicted"
def fuse_positions(
uwb_x: float,
uwb_y: float,
uwb_conf: float,
cam_x: float,
cam_y: float,
cam_conf: float,
) -> tuple:
"""
Confidence-weighted position fusion.
Returns (fused_x, fused_y).
When total confidence is zero (shouldn't happen in "fused" state, but
guarded), returns the UWB position as fallback.
"""
total = uwb_conf + cam_conf
if total <= 0.0:
return uwb_x, uwb_y
w = uwb_conf / total
return (
w * uwb_x + (1.0 - w) * cam_x,
w * uwb_y + (1.0 - w) * cam_y,
)
def composite_confidence(
uwb_conf: float,
cam_conf: float,
source: str,
kf_uncertainty_m: float,
max_kf_uncertainty_m: float = 3.0,
) -> float:
"""
Compute a single composite confidence value [0, 1] for the fused output.
source : current source label (from select_source)
kf_uncertainty_m : current KF positional RMS uncertainty
max_kf_uncertainty_m: uncertainty at which confidence collapses to 0
"""
if source == "predicted":
# Decay with growing KF uncertainty; no sensor feeds are live
raw = max(0.0, 1.0 - kf_uncertainty_m / max_kf_uncertainty_m)
return min(0.4, raw) # cap at 0.4 — caller should know this is dead-reckoning
if source == "fused":
raw = max(uwb_conf, cam_conf)
elif source == "uwb":
raw = uwb_conf
else: # "camera"
raw = cam_conf
# Scale by KF health (full confidence only if KF is tight)
kf_health = max(0.0, 1.0 - kf_uncertainty_m / max_kf_uncertainty_m)
return raw * (0.5 + 0.5 * kf_health)
def bearing_and_range(x: float, y: float) -> tuple:
"""
Compute bearing (rad, +ve = left) and range (m) to position (x, y).
Consistent with person_follower_node conventions:
bearing = atan2(y, x) (base_link frame: +X forward, +Y left)
range = sqrt( + )
"""
return math.atan2(y, x), math.sqrt(x * x + y * y)

View File

@ -0,0 +1,257 @@
"""
tracking_fusion_node.py Multi-modal tracking fusion for saltybot.
Subscribes
----------
/uwb/target (geometry_msgs/PoseStamped) UWB-triangulated position (~10 Hz)
/person/target (geometry_msgs/PoseStamped) camera-detected position (~30 Hz)
Publishes
---------
/social/tracking/fused_target (saltybot_social_msgs/FusedTarget) at control_rate Hz
Algorithm
---------
1. Each incoming measurement updates a 4-state Kalman filter [x, y, vx, vy].
2. A 20 Hz timer runs predict+select+publish:
a. KF predict(dt)
b. Compute per-source confidence from measurement age + staleness model
c. If either source is live:
- "fused" confidence-weighted position blend KF update
- "uwb" UWB position KF update
- "camera" camera position KF update
d. Build FusedTarget from KF state + composite confidence
3. If all sources are lost but within predict_timeout, keep publishing with
active_source="predicted" and degrading confidence.
4. Beyond predict_timeout, no message is published (node stays alive).
Kalman tuning for EUC speeds (2030 km/h 5.58.3 m/s)
---------------------------------------------------------
process_noise=3.0 m/ allows rapid acceleration events
predict_timeout=3.0 s coasts 30 m at 10 m/s; acceptable dead-reckoning
Parameters
----------
uwb_timeout : UWB staleness threshold (s) default 1.5
cam_timeout : Camera staleness threshold (s) default 1.0
predict_timeout : Max KF coast before no publish (s) default 3.0
process_noise : KF acceleration noise std-dev (m/) default 3.0
meas_noise_uwb : UWB position noise std-dev (m) default 0.20
meas_noise_cam : Camera position noise std-dev (m) default 0.12
control_rate : Publish / KF predict rate (Hz) default 20.0
confidence_threshold: Min source confidence to use (01) default 0.15
Usage
-----
ros2 launch saltybot_social_tracking tracking.launch.py
"""
import math
import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Header
from saltybot_social_msgs.msg import FusedTarget
from saltybot_social_tracking.kalman_tracker import KalmanTracker
from saltybot_social_tracking.source_arbiter import (
uwb_confidence,
camera_confidence,
select_source,
fuse_positions,
composite_confidence,
bearing_and_range,
)
_BIG_AGE = 1e9 # sentinel: source never received
class TrackingFusionNode(Node):
def __init__(self):
super().__init__("tracking_fusion")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("uwb_timeout", 1.5)
self.declare_parameter("cam_timeout", 1.0)
self.declare_parameter("predict_timeout", 3.0)
self.declare_parameter("process_noise", 3.0)
self.declare_parameter("meas_noise_uwb", 0.20)
self.declare_parameter("meas_noise_cam", 0.12)
self.declare_parameter("control_rate", 20.0)
self.declare_parameter("confidence_threshold", 0.15)
self._p = self._load_params()
# ── State ─────────────────────────────────────────────────────────────
self._kf = KalmanTracker(
process_noise=self._p["process_noise"],
meas_noise_uwb=self._p["meas_noise_uwb"],
meas_noise_cam=self._p["meas_noise_cam"],
)
self._last_uwb_t: float = 0.0 # monotonic; 0 = never received
self._last_cam_t: float = 0.0
self._uwb_x: float = 0.0
self._uwb_y: float = 0.0
self._cam_x: float = 0.0
self._cam_y: float = 0.0
self._uwb_tag_id: str = ""
self._last_predict_t: float = 0.0 # monotonic time of last predict call
self._last_any_t: float = 0.0 # monotonic time of last live measurement
# ── Subscriptions ─────────────────────────────────────────────────────
self.create_subscription(
PoseStamped, "/uwb/target", self._uwb_cb, 10)
self.create_subscription(
PoseStamped, "/person/target", self._cam_cb, 10)
# ── Publisher ─────────────────────────────────────────────────────────
self._pub = self.create_publisher(FusedTarget, "/social/tracking/fused_target", 10)
# ── Timer ─────────────────────────────────────────────────────────────
self._timer = self.create_timer(
1.0 / self._p["control_rate"], self._control_cb)
self.get_logger().info(
f"TrackingFusion ready "
f"rate={self._p['control_rate']}Hz "
f"uwb_timeout={self._p['uwb_timeout']}s "
f"cam_timeout={self._p['cam_timeout']}s "
f"predict_timeout={self._p['predict_timeout']}s "
f"process_noise={self._p['process_noise']}m/s²"
)
# ── Parameter helpers ──────────────────────────────────────────────────────
def _load_params(self) -> dict:
return {
"uwb_timeout": self.get_parameter("uwb_timeout").value,
"cam_timeout": self.get_parameter("cam_timeout").value,
"predict_timeout": self.get_parameter("predict_timeout").value,
"process_noise": self.get_parameter("process_noise").value,
"meas_noise_uwb": self.get_parameter("meas_noise_uwb").value,
"meas_noise_cam": self.get_parameter("meas_noise_cam").value,
"control_rate": self.get_parameter("control_rate").value,
"confidence_threshold": self.get_parameter("confidence_threshold").value,
}
# ── Measurement callbacks ──────────────────────────────────────────────────
def _uwb_cb(self, msg: PoseStamped) -> None:
self._uwb_x = msg.pose.position.x
self._uwb_y = msg.pose.position.y
self._uwb_tag_id = "" # PoseStamped has no tag field; tag reported via /uwb/bearing
t = time.monotonic()
self._last_uwb_t = t
self._last_any_t = t
# Seed KF on first measurement
if not self._kf.initialized:
self._kf.initialize(self._uwb_x, self._uwb_y)
self._last_predict_t = t
def _cam_cb(self, msg: PoseStamped) -> None:
self._cam_x = msg.pose.position.x
self._cam_y = msg.pose.position.y
t = time.monotonic()
self._last_cam_t = t
self._last_any_t = t
if not self._kf.initialized:
self._kf.initialize(self._cam_x, self._cam_y)
self._last_predict_t = t
# ── Control loop ───────────────────────────────────────────────────────────
def _control_cb(self) -> None:
self._p = self._load_params()
if not self._kf.initialized:
return # no data yet — nothing to publish
now = time.monotonic()
dt = now - self._last_predict_t if self._last_predict_t > 0.0 else (
1.0 / self._p["control_rate"]
)
self._last_predict_t = now
# KF predict
self._kf.predict(dt)
# Source confidence
uwb_age = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else _BIG_AGE
cam_age = (now - self._last_cam_t) if self._last_cam_t > 0.0 else _BIG_AGE
u_conf = uwb_confidence(uwb_age, self._p["uwb_timeout"])
c_conf = camera_confidence(cam_age, self._p["cam_timeout"])
threshold = self._p["confidence_threshold"]
source = select_source(u_conf, c_conf, threshold)
if source == "predicted":
# Check predict_timeout — stop publishing if too stale
last_live_age = (
(now - self._last_any_t) if self._last_any_t > 0.0 else _BIG_AGE
)
if last_live_age > self._p["predict_timeout"]:
return # silently stop publishing
# Apply measurement update if a live source exists
if source == "fused":
fx, fy = fuse_positions(
self._uwb_x, self._uwb_y, u_conf,
self._cam_x, self._cam_y, c_conf,
)
self._kf.update(fx, fy, source="uwb") # use tighter noise for blended
elif source == "uwb":
self._kf.update(self._uwb_x, self._uwb_y, source="uwb")
elif source == "camera":
self._kf.update(self._cam_x, self._cam_y, source="camera")
# "predicted" → no update; KF coasts
# Build and publish message
kx, ky = self._kf.position
vx, vy = self._kf.velocity
kf_unc = self._kf.position_uncertainty_m()
conf = composite_confidence(u_conf, c_conf, source, kf_unc)
bearing, range_m = bearing_and_range(kx, ky)
hdr = Header()
hdr.stamp = self.get_clock().now().to_msg()
hdr.frame_id = "base_link"
msg = FusedTarget()
msg.header = hdr
msg.position.x = kx
msg.position.y = ky
msg.position.z = 0.0
msg.velocity.x = vx
msg.velocity.y = vy
msg.velocity.z = 0.0
msg.range_m = float(range_m)
msg.bearing_rad = float(bearing)
msg.confidence = float(conf)
msg.active_source = source
msg.tag_id = self._uwb_tag_id if "uwb" in source else ""
self._pub.publish(msg)
# ── Entry point ────────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = TrackingFusionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_social_tracking
[install]
install_scripts=$base/lib/saltybot_social_tracking

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
import os
from glob import glob
package_name = "saltybot_social_tracking"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "config"),
glob("config/*.yaml")),
(os.path.join("share", package_name, "launch"),
glob("launch/*.py")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Multi-modal tracking fusion (UWB + camera Kalman filter)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
f"tracking_fusion_node = {package_name}.tracking_fusion_node:main",
],
},
)

View File

@ -0,0 +1,438 @@
"""
test_tracking_fusion.py Unit tests for saltybot_social_tracking pure modules.
Tests cover:
- KalmanTracker: initialization, predict, update, state accessors
- source_arbiter: confidence functions, source selection, fusion, bearing
No ROS2 runtime required.
"""
import math
import sys
import os
# Allow running: python -m pytest test/test_tracking_fusion.py
# from the package root without installing the package.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
import numpy as np
from saltybot_social_tracking.kalman_tracker import KalmanTracker
from saltybot_social_tracking.source_arbiter import (
uwb_confidence,
camera_confidence,
select_source,
fuse_positions,
composite_confidence,
bearing_and_range,
)
# ─────────────────────────────────────────────────────────────────────────────
# KalmanTracker tests
# ─────────────────────────────────────────────────────────────────────────────
class TestKalmanTrackerInit:
def test_not_initialized_by_default(self):
kf = KalmanTracker()
assert not kf.initialized
def test_initialize_sets_position(self):
kf = KalmanTracker()
kf.initialize(3.0, 1.5)
assert kf.initialized
x, y = kf.position
assert abs(x - 3.0) < 1e-9
assert abs(y - 1.5) < 1e-9
def test_initialize_sets_zero_velocity(self):
kf = KalmanTracker()
kf.initialize(1.0, -2.0)
vx, vy = kf.velocity
assert abs(vx) < 1e-9
assert abs(vy) < 1e-9
def test_initialize_origin(self):
kf = KalmanTracker()
kf.initialize(0.0, 0.0)
assert kf.initialized
x, y = kf.position
assert x == 0.0 and y == 0.0
class TestKalmanTrackerPredict:
def test_predict_zero_dt_no_change(self):
kf = KalmanTracker()
kf.initialize(2.0, 1.0)
kf.predict(0.0)
x, y = kf.position
assert abs(x - 2.0) < 1e-9
assert abs(y - 1.0) < 1e-9
def test_predict_negative_dt_no_change(self):
kf = KalmanTracker()
kf.initialize(2.0, 1.0)
kf.predict(-0.1)
x, y = kf.position
assert abs(x - 2.0) < 1e-9
def test_predict_constant_velocity(self):
"""After a position update gives the filter a velocity, predict should extrapolate."""
kf = KalmanTracker(process_noise=0.001, meas_noise_uwb=0.001)
kf.initialize(0.0, 0.0)
# Force filter to track a moving target to build up velocity estimate
dt = 0.05
for i in range(40):
t = i * dt
kf.predict(dt)
kf.update(2.0 * t, 0.0, "uwb") # 2 m/s in x
# After many updates the velocity estimate should be close to 2 m/s
vx, vy = kf.velocity
assert abs(vx - 2.0) < 0.3, f"vx={vx:.3f}"
assert abs(vy) < 0.2
def test_predict_grows_uncertainty(self):
kf = KalmanTracker()
kf.initialize(0.0, 0.0)
unc_before = kf.position_uncertainty_m()
kf.predict(1.0)
unc_after = kf.position_uncertainty_m()
assert unc_after > unc_before
def test_predict_multiple_steps(self):
kf = KalmanTracker()
kf.initialize(0.0, 0.0)
kf.predict(0.1)
kf.predict(0.1)
kf.predict(0.1)
# No assertion on exact value; just verify no exception and state is finite
x, y = kf.position
assert math.isfinite(x) and math.isfinite(y)
class TestKalmanTrackerUpdate:
def test_update_pulls_position_toward_measurement(self):
kf = KalmanTracker()
kf.initialize(0.0, 0.0)
kf.update(5.0, 5.0, "uwb")
x, y = kf.position
assert x > 0.0 and y > 0.0
assert x < 5.0 and y < 5.0 # blended, not jumped
def test_update_reduces_uncertainty(self):
kf = KalmanTracker()
kf.initialize(0.0, 0.0)
kf.predict(1.0) # uncertainty grows
unc_mid = kf.position_uncertainty_m()
kf.update(0.1, 0.1, "uwb") # measurement corrects
unc_after = kf.position_uncertainty_m()
assert unc_after < unc_mid
def test_update_converges_to_true_position(self):
"""Many updates from same point should converge to that point."""
kf = KalmanTracker(meas_noise_uwb=0.01)
kf.initialize(0.0, 0.0)
for _ in range(50):
kf.update(3.0, -1.0, "uwb")
x, y = kf.position
assert abs(x - 3.0) < 0.05, f"x={x:.4f}"
assert abs(y - (-1.0)) < 0.05, f"y={y:.4f}"
def test_update_camera_source_different_noise(self):
"""Camera and UWB updates should both move state (noise model differs)."""
kf1 = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.10)
kf1.initialize(0.0, 0.0)
kf1.update(5.0, 0.0, "uwb")
x_uwb, _ = kf1.position
kf2 = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.10)
kf2.initialize(0.0, 0.0)
kf2.update(5.0, 0.0, "camera")
x_cam, _ = kf2.position
# Camera has lower noise → stronger pull toward measurement
assert x_cam > x_uwb
def test_update_unknown_source_defaults_to_camera_noise(self):
kf = KalmanTracker()
kf.initialize(0.0, 0.0)
kf.update(2.0, 0.0, "other") # unknown source — should not raise
x, _ = kf.position
assert x > 0.0
def test_position_uncertainty_finite(self):
kf = KalmanTracker()
kf.initialize(1.0, 1.0)
kf.predict(0.05)
kf.update(1.1, 0.9, "uwb")
assert math.isfinite(kf.position_uncertainty_m())
assert kf.position_uncertainty_m() >= 0.0
def test_covariance_copy_is_independent(self):
kf = KalmanTracker()
kf.initialize(0.0, 0.0)
cov = kf.covariance_copy()
cov[0, 0] = 9999.0 # mutate copy
assert kf.covariance_copy()[0, 0] != 9999.0
# ─────────────────────────────────────────────────────────────────────────────
# source_arbiter tests
# ─────────────────────────────────────────────────────────────────────────────
class TestUwbConfidence:
def test_zero_age_returns_quality(self):
assert abs(uwb_confidence(0.0, 1.5) - 1.0) < 1e-9
def test_at_timeout_returns_zero(self):
assert uwb_confidence(1.5, 1.5) == pytest.approx(0.0)
def test_beyond_timeout_returns_zero(self):
assert uwb_confidence(2.0, 1.5) == 0.0
def test_half_timeout_returns_half(self):
assert uwb_confidence(0.75, 1.5) == pytest.approx(0.5)
def test_quality_scales_result(self):
assert uwb_confidence(0.0, 1.5, quality=0.7) == pytest.approx(0.7)
def test_large_age_returns_zero(self):
assert uwb_confidence(1e9, 1.5) == 0.0
def test_zero_timeout_returns_zero(self):
assert uwb_confidence(0.0, 0.0) == 0.0
class TestCameraConfidence:
def test_zero_age_full_quality(self):
assert camera_confidence(0.0, 1.0, quality=1.0) == pytest.approx(1.0)
def test_at_timeout_zero(self):
assert camera_confidence(1.0, 1.0) == pytest.approx(0.0)
def test_beyond_timeout_zero(self):
assert camera_confidence(2.0, 1.0) == 0.0
def test_quality_scales(self):
# age=0, quality=0.8
assert camera_confidence(0.0, 1.0, quality=0.8) == pytest.approx(0.8)
def test_halfway(self):
assert camera_confidence(0.5, 1.0) == pytest.approx(0.5)
class TestSelectSource:
def test_both_above_threshold_fused(self):
assert select_source(0.8, 0.6) == "fused"
def test_only_uwb_above_threshold(self):
assert select_source(0.8, 0.0) == "uwb"
def test_only_cam_above_threshold(self):
assert select_source(0.0, 0.7) == "camera"
def test_both_below_threshold(self):
assert select_source(0.0, 0.0) == "predicted"
def test_threshold_boundary_uwb(self):
# Exactly at threshold — should be treated as live
assert select_source(0.15, 0.0, threshold=0.15) == "uwb"
def test_threshold_boundary_below(self):
assert select_source(0.14, 0.0, threshold=0.15) == "predicted"
def test_custom_threshold(self):
assert select_source(0.5, 0.0, threshold=0.6) == "predicted"
assert select_source(0.5, 0.0, threshold=0.4) == "uwb"
class TestFusePositions:
def test_equal_confidence_returns_midpoint(self):
x, y = fuse_positions(0.0, 0.0, 1.0, 4.0, 4.0, 1.0)
assert abs(x - 2.0) < 1e-9
assert abs(y - 2.0) < 1e-9
def test_full_uwb_weight_returns_uwb(self):
x, y = fuse_positions(3.0, 1.0, 1.0, 0.0, 0.0, 0.0)
assert abs(x - 3.0) < 1e-9
def test_full_cam_weight_returns_cam(self):
x, y = fuse_positions(0.0, 0.0, 0.0, -2.0, 5.0, 1.0)
assert abs(x - (-2.0)) < 1e-9
assert abs(y - 5.0) < 1e-9
def test_weighted_blend(self):
# UWB at (0,0) conf=3, camera at (4,0) conf=1 → x = 3/4*0 + 1/4*4 = 1
x, y = fuse_positions(0.0, 0.0, 3.0, 4.0, 0.0, 1.0)
assert abs(x - 1.0) < 1e-9
def test_zero_total_returns_uwb_fallback(self):
x, y = fuse_positions(7.0, 2.0, 0.0, 3.0, 1.0, 0.0)
assert abs(x - 7.0) < 1e-9
class TestCompositeConfidence:
def test_fused_source_high_confidence(self):
conf = composite_confidence(0.9, 0.8, "fused", 0.05)
assert conf > 0.7
def test_predicted_source_capped(self):
conf = composite_confidence(0.0, 0.0, "predicted", 0.1)
assert conf <= 0.4
def test_predicted_high_uncertainty_low_confidence(self):
conf = composite_confidence(0.0, 0.0, "predicted", 3.0, max_kf_uncertainty_m=3.0)
assert conf == pytest.approx(0.0)
def test_uwb_only(self):
conf = composite_confidence(0.8, 0.0, "uwb", 0.05)
assert conf > 0.3
def test_camera_only(self):
conf = composite_confidence(0.0, 0.7, "camera", 0.05)
assert conf > 0.2
def test_high_kf_uncertainty_reduces_confidence(self):
low_unc = composite_confidence(0.9, 0.0, "uwb", 0.1)
high_unc = composite_confidence(0.9, 0.0, "uwb", 2.9)
assert low_unc > high_unc
class TestBearingAndRange:
def test_straight_ahead(self):
bearing, rng = bearing_and_range(2.0, 0.0)
assert abs(bearing) < 1e-9
assert abs(rng - 2.0) < 1e-9
def test_left_of_robot(self):
# +Y = left in base_link frame; bearing should be positive
bearing, rng = bearing_and_range(0.0, 1.0)
assert abs(bearing - math.pi / 2.0) < 1e-9
assert abs(rng - 1.0) < 1e-9
def test_right_of_robot(self):
bearing, rng = bearing_and_range(0.0, -1.0)
assert abs(bearing - (-math.pi / 2.0)) < 1e-9
def test_diagonal(self):
bearing, rng = bearing_and_range(1.0, 1.0)
assert abs(bearing - math.pi / 4.0) < 1e-9
assert abs(rng - math.sqrt(2.0)) < 1e-9
def test_at_origin(self):
bearing, rng = bearing_and_range(0.0, 0.0)
assert rng == pytest.approx(0.0)
assert math.isfinite(bearing) # atan2(0,0) = 0 in most implementations
def test_range_always_non_negative(self):
for x, y in [(-1, 0), (0, -1), (-2, -3), (5, -5)]:
_, rng = bearing_and_range(x, y)
assert rng >= 0.0
# ─────────────────────────────────────────────────────────────────────────────
# Integration scenario tests
# ─────────────────────────────────────────────────────────────────────────────
class TestIntegrationScenarios:
def test_euc_speed_velocity_tracking(self):
"""Verify KF can track EUC speed (8 m/s) within 0.5 m/s after warm-up."""
kf = KalmanTracker(process_noise=3.0, meas_noise_uwb=0.20)
kf.initialize(0.0, 0.0)
dt = 1.0 / 10.0 # 10 Hz UWB rate
speed = 8.0 # m/s (≈29 km/h)
for i in range(60):
t = i * dt
kf.predict(dt)
kf.update(speed * t, 0.0, "uwb")
vx, vy = kf.velocity
assert abs(vx - speed) < 0.6, f"vx={vx:.2f} expected≈{speed}"
assert abs(vy) < 0.3
def test_signal_loss_recovery(self):
"""
After 1 s of signal loss the filter should still have a reasonable
position estimate (not diverged to infinity).
"""
kf = KalmanTracker(process_noise=3.0)
kf.initialize(2.0, 0.5)
# Warm up with 2 m/s x motion
dt = 0.05
for i in range(20):
kf.predict(dt)
kf.update(2.0 * (i + 1) * dt, 0.0, "uwb")
# Coast for 1 second (20 × 50 ms) without measurements
for _ in range(20):
kf.predict(dt)
x, y = kf.position
assert math.isfinite(x) and math.isfinite(y)
assert abs(x) < 20.0 # shouldn't have drifted more than 20 m
def test_uwb_to_camera_handoff(self):
"""
Simulate UWB going stale and camera taking over Kalman should
smoothly continue tracking without a jump.
"""
kf = KalmanTracker(meas_noise_uwb=0.20, meas_noise_cam=0.12)
kf.initialize(0.0, 0.0)
dt = 0.05
# Phase 1: UWB active
for i in range(20):
kf.predict(dt)
kf.update(float(i) * 0.1, 0.0, "uwb")
x_at_handoff, _ = kf.position
# Phase 2: Camera takes over from same trajectory
for i in range(20, 40):
kf.predict(dt)
kf.update(float(i) * 0.1, 0.0, "camera")
x_after, _ = kf.position
# Position should have continued progressing (not stuck or reset)
assert x_after > x_at_handoff
def test_confidence_degradation_during_coast(self):
"""Composite confidence should drop as KF uncertainty grows during coast."""
kf = KalmanTracker(process_noise=3.0)
kf.initialize(2.0, 0.0)
# Fresh: tight uncertainty → high confidence
unc_fresh = kf.position_uncertainty_m()
conf_fresh = composite_confidence(0.0, 0.0, "predicted", unc_fresh)
# After 2 s coast
for _ in range(40):
kf.predict(0.05)
unc_stale = kf.position_uncertainty_m()
conf_stale = composite_confidence(0.0, 0.0, "predicted", unc_stale)
assert conf_fresh >= conf_stale
def test_fused_source_confidence_weighted_position(self):
"""Fused position should sit between UWB and camera, closer to higher-conf source."""
# UWB at x=0 with high conf, camera at x=10 with low conf
uwb_c = 0.9
cam_c = 0.1
fx, fy = fuse_positions(0.0, 0.0, uwb_c, 10.0, 0.0, cam_c)
# Should be much closer to UWB (0) than camera (10)
assert fx < 3.0, f"fused_x={fx:.2f}"
def test_select_source_transitions(self):
"""Verify correct source transitions as confidences change."""
assert select_source(0.9, 0.8) == "fused"
assert select_source(0.9, 0.0) == "uwb"
assert select_source(0.0, 0.8) == "camera"
assert select_source(0.0, 0.0) == "predicted"