feat: Issue #365 — UWB DW3000 anchor/tag tracking (bearing + distance) #368

Merged
sl-jetson merged 1 commits from sl-perception/issue-365-uwb-tracking into main 2026-03-03 15:41:50 -05:00
6 changed files with 1173 additions and 0 deletions
Showing only changes of commit 0ecf341c57 - Show all commits

View File

@ -0,0 +1,411 @@
"""
_uwb_tracker.py UWB DW3000 anchor/tag ranging + bearing estimation (Issue #365).
Pure-Python library (no ROS2 / hardware dependencies) so it can be fully unit-tested
on a development machine without the physical ESP32-UWB-Pro anchors attached.
Hardware layout
---------------
Two DW3000 anchors are mounted on the SaltyBot chassis, separated by a ~25 cm
baseline, both facing forward:
anchor0 (left, x = baseline/2) anchor1 (right, x = +baseline/2)
baseline
robot forward
The wearable tag is carried by the Tee (person being followed).
Bearing estimation
------------------
Given TWR (two-way ranging) distances d0 and d1 from the two anchors to the tag,
and the known baseline B between the anchors, we compute bearing θ using the
law of cosines:
cos α = (d0² - d1² + ) / (2 · B · d1) [angle at anchor1]
Bearing is measured from the perpendicular bisector of the baseline (i.e. the
robot's forward axis):
θ = 90° α
Positive θ = tag is to the right, negative = tag is to the left.
The formula degrades when the tag is very close to the baseline or at extreme
angles. We report a confidence value that penalises these conditions.
Serial protocol (ESP32 Orin via USB-serial)
---------------------------------------------
Each anchor sends newline-terminated ASCII frames at 10 Hz:
RANGE,<anchor_id>,<tag_id>,<distance_mm>\n
Example:
RANGE,0,T0,1532\n anchor 0, tag T0, distance 1532 mm
RANGE,1,T0,1748\n anchor 1, tag T0, distance 1748 mm
A STATUS frame is sent once on connection:
STATUS,<anchor_id>,OK\n
The node opens two serial ports (one per anchor). A background thread reads
each port and updates a shared RangingState.
Kalman filter
-------------
A simple 1-D constant-velocity Kalman filter smooths the bearing output.
State: [bearing_deg, bearing_rate_dps].
"""
from __future__ import annotations
import math
import re
import threading
import time
from dataclasses import dataclass, field
from typing import Optional, Tuple
import numpy as np
# ── Constants ─────────────────────────────────────────────────────────────────
# Fix quality codes (written to UwbTarget.fix_quality)
FIX_NONE = 0
FIX_SINGLE = 1 # only one anchor responding
FIX_DUAL = 2 # both anchors responding — full bearing estimate
# Maximum age of a ranging measurement before it is considered stale (seconds)
_STALE_S = 0.5
# ── Serial protocol parsing ────────────────────────────────────────────────────
_RANGE_RE = re.compile(r'^RANGE,(\d+),(\w+),([\d.]+)\s*$')
_STATUS_RE = re.compile(r'^STATUS,(\d+),(\w+)\s*$')
@dataclass
class RangeFrame:
"""Decoded RANGE frame from a DW3000 anchor."""
anchor_id: int
tag_id: str
distance_m: float
timestamp: float = field(default_factory=time.monotonic)
def parse_frame(line: str) -> Optional[RangeFrame]:
"""
Parse one ASCII line from the anchor serial stream.
Returns a RangeFrame on success, None for STATUS/unknown/malformed lines.
Parameters
----------
line : raw ASCII line (may include trailing whitespace / CR)
Examples
--------
>>> f = parse_frame("RANGE,0,T0,1532")
>>> f.anchor_id, f.tag_id, f.distance_m
(0, 'T0', 1.532)
>>> parse_frame("STATUS,0,OK") is None
True
>>> parse_frame("GARBAGE") is None
True
"""
m = _RANGE_RE.match(line.strip())
if m is None:
return None
anchor_id = int(m.group(1))
tag_id = m.group(2)
distance_m = float(m.group(3)) / 1000.0 # mm → m
return RangeFrame(anchor_id=anchor_id, tag_id=tag_id, distance_m=distance_m)
# ── Two-anchor bearing geometry ────────────────────────────────────────────────
def bearing_from_ranges(
d0: float,
d1: float,
baseline_m: float,
) -> Tuple[float, float]:
"""
Compute bearing to tag from two anchor distances.
Parameters
----------
d0 : distance from anchor-0 (left, at x = baseline/2) to tag (m)
d1 : distance from anchor-1 (right, at x = +baseline/2) to tag (m)
baseline_m : separation between the two anchors (m)
Returns
-------
(bearing_deg, confidence)
bearing_deg : signed bearing in degrees (positive = right)
confidence : 0.01.0 quality estimate
Notes
-----
Derived from law of cosines applied to the triangle
(anchor0, anchor1, tag):
cos(α) = (d0² d1² + ) / (2·B·d0) where α is angle at anchor0
Forward axis is the perpendicular bisector of the baseline, so:
bearing = 90° α_at_anchor1
We use the symmetric formula through the midpoint:
x_tag = (d0² - d1²) / (2·B) [lateral offset from midpoint]
y_tag = sqrt(d0² - (x_tag + B/2)²) [forward distance]
bearing = atan2(x_tag, y_tag) * 180/π
"""
if baseline_m <= 0.0:
raise ValueError(f'baseline_m must be positive, got {baseline_m}')
if d0 <= 0.0 or d1 <= 0.0:
raise ValueError(f'distances must be positive, got d0={d0} d1={d1}')
B = baseline_m
# Lateral offset of tag from midpoint of baseline (positive = towards anchor1 = right)
x = (d0 * d0 - d1 * d1) / (2.0 * B)
# Forward distance (Pythagorean)
# anchor0 is at x_coord = -B/2 relative to midpoint
y_sq = d0 * d0 - (x + B / 2.0) ** 2
if y_sq < 0.0:
# Triangle inequality violated — noisy ranging; clamp to zero
y_sq = 0.0
y = math.sqrt(y_sq)
bearing_deg = math.degrees(math.atan2(x, max(y, 1e-3)))
# ── Confidence ───────────────────────────────────────────────────────────
# 1. Range agreement penalty — if |d0-d1| > sqrt(d0²+d1²) * factor, tag
# is almost directly on the baseline extension (extreme angle, poor geometry)
mean_d = (d0 + d1) / 2.0
diff = abs(d0 - d1)
# Geometric dilution: confidence falls as |bearing| approaches 90°
bearing_penalty = math.cos(math.radians(bearing_deg)) # 1.0 at 0°, 0.0 at 90°
bearing_penalty = max(0.0, bearing_penalty)
# 2. Distance sanity: if tag is unreasonably close (< B) confidence drops
dist_penalty = min(1.0, mean_d / max(B, 0.1))
confidence = float(np.clip(bearing_penalty * dist_penalty, 0.0, 1.0))
return bearing_deg, confidence
def bearing_single_anchor(
d0: float,
baseline_m: float = 0.25,
) -> Tuple[float, float]:
"""
Fallback bearing when only one anchor is responding.
With a single anchor we cannot determine lateral position, so we return
bearing=0 (directly ahead) with a low confidence proportional to 1/distance.
This keeps the follow-me controller moving toward the target even in
degraded mode.
Returns (0.0, confidence) where confidence 0.3.
"""
# Single-anchor confidence: ≤ 0.3, decreases with distance
confidence = float(np.clip(0.3 * (2.0 / max(d0, 0.5)), 0.0, 0.3))
return 0.0, confidence
# ── Kalman filter for bearing smoothing ───────────────────────────────────────
class BearingKalman:
"""
1-D constant-velocity Kalman filter for bearing smoothing.
State: [bearing_deg, bearing_rate_dps]
"""
def __init__(
self,
process_noise_deg2: float = 10.0, # bearing process noise (deg²/frame)
process_noise_rate2: float = 100.0, # rate process noise
meas_noise_deg2: float = 4.0, # bearing measurement noise (deg²)
) -> None:
self._x = np.zeros(2, dtype=np.float64) # [bearing, rate]
self._P = np.diag([100.0, 1000.0]) # initial covariance
self._Q = np.diag([process_noise_deg2, process_noise_rate2])
self._R = np.array([[meas_noise_deg2]])
self._F = np.array([[1.0, 1.0], # state transition (dt=1 frame)
[0.0, 1.0]])
self._H = np.array([[1.0, 0.0]]) # observation: bearing only
self._initialised = False
def predict(self) -> float:
"""Predict next bearing; returns predicted bearing_deg."""
self._x = self._F @ self._x
self._P = self._F @ self._P @ self._F.T + self._Q
return float(self._x[0])
def update(self, bearing_deg: float) -> float:
"""
Update with a new bearing measurement.
On first call, seeds the filter state.
Returns smoothed bearing_deg.
"""
if not self._initialised:
self._x[0] = bearing_deg
self._initialised = True
return bearing_deg
self.predict()
y = np.array([[bearing_deg]]) - self._H @ self._x.reshape(-1, 1)
S = self._H @ self._P @ self._H.T + self._R
K = self._P @ self._H.T @ np.linalg.inv(S)
self._x = self._x + (K @ y).ravel()
self._P = (np.eye(2) - K @ self._H) @ self._P
return float(self._x[0])
@property
def bearing_rate_dps(self) -> float:
"""Current bearing rate estimate (degrees/second at nominal 10 Hz)."""
return float(self._x[1]) * 10.0 # per-frame rate → per-second
# ── Shared ranging state (thread-safe) ────────────────────────────────────────
@dataclass
class _AnchorState:
distance_m: float = 0.0
timestamp: float = 0.0
valid: bool = False
class UwbRangingState:
"""
Thread-safe store for the most recent ranging measurement from each anchor.
Updated by the serial reader threads, consumed by the ROS2 publish timer.
"""
def __init__(
self,
baseline_m: float = 0.25,
stale_timeout: float = _STALE_S,
) -> None:
self.baseline_m = baseline_m
self.stale_timeout = stale_timeout
self._lock = threading.Lock()
self._anchors = [_AnchorState(), _AnchorState()]
self._kalman = BearingKalman()
def update_anchor(self, anchor_id: int, distance_m: float) -> None:
"""Record a new ranging measurement from anchor anchor_id (0 or 1)."""
if anchor_id not in (0, 1):
return
with self._lock:
s = self._anchors[anchor_id]
s.distance_m = distance_m
s.timestamp = time.monotonic()
s.valid = True
def compute(self) -> 'UwbResult':
"""
Derive bearing, distance, confidence, and fix quality from latest ranges.
Called at the publish rate (10 Hz). Applies Kalman smoothing to bearing.
"""
now = time.monotonic()
with self._lock:
a0 = self._anchors[0]
a1 = self._anchors[1]
v0 = a0.valid and (now - a0.timestamp) < self.stale_timeout
v1 = a1.valid and (now - a1.timestamp) < self.stale_timeout
d0 = a0.distance_m
d1 = a1.distance_m
if not v0 and not v1:
return UwbResult(valid=False)
if v0 and v1:
bearing_raw, conf = bearing_from_ranges(d0, d1, self.baseline_m)
fix_quality = FIX_DUAL
distance_m = (d0 + d1) / 2.0
elif v0:
bearing_raw, conf = bearing_single_anchor(d0, self.baseline_m)
fix_quality = FIX_SINGLE
distance_m = d0
d1 = 0.0
else:
bearing_raw, conf = bearing_single_anchor(d1, self.baseline_m)
fix_quality = FIX_SINGLE
distance_m = d1
d0 = 0.0
bearing_smooth = self._kalman.update(bearing_raw)
return UwbResult(
valid = True,
bearing_deg = bearing_smooth,
distance_m = distance_m,
confidence = conf,
anchor0_dist = d0,
anchor1_dist = d1,
baseline_m = self.baseline_m,
fix_quality = fix_quality,
)
@dataclass
class UwbResult:
"""Computed UWB fix — mirrors UwbTarget.msg fields."""
valid: bool = False
bearing_deg: float = 0.0
distance_m: float = 0.0
confidence: float = 0.0
anchor0_dist: float = 0.0
anchor1_dist: float = 0.0
baseline_m: float = 0.25
fix_quality: int = FIX_NONE
# ── Serial reader (runs in background thread) ──────────────────────────────────
class AnchorSerialReader:
"""
Background thread that reads from one anchor's serial port and calls
state.update_anchor() on each valid RANGE frame.
Designed to be used with a real serial.Serial object in production, or
any file-like object with a readline() method for testing.
"""
def __init__(
self,
anchor_id: int,
port, # serial.Serial or file-like (readline() interface)
state: UwbRangingState,
logger=None,
) -> None:
self._anchor_id = anchor_id
self._port = port
self._state = state
self._log = logger
self._running = False
self._thread = threading.Thread(target=self._run, daemon=True)
def start(self) -> None:
self._running = True
self._thread.start()
def stop(self) -> None:
self._running = False
def _run(self) -> None:
while self._running:
try:
raw = self._port.readline()
if isinstance(raw, bytes):
raw = raw.decode('ascii', errors='replace')
frame = parse_frame(raw)
if frame is not None:
self._state.update_anchor(frame.anchor_id, frame.distance_m)
except Exception as e:
if self._log:
self._log.warn(f'UWB anchor {self._anchor_id} read error: {e}')
time.sleep(0.05)

View File

@ -0,0 +1,170 @@
"""
uwb_node.py UWB DW3000 anchor/tag ranging node (Issue #365).
Reads TWR distances from two ESP32-UWB-Pro anchors via USB serial and publishes
a fused bearing + distance estimate for the follow-me controller.
Hardware
--------
anchor0 (left) : USB serial, default /dev/ttyUSB0
anchor1 (right) : USB serial, default /dev/ttyUSB1
Baseline : ~25 cm (configurable)
Publishes
---------
/saltybot/uwb_target saltybot_scene_msgs/UwbTarget (10 Hz)
Parameters
----------
port_anchor0 str '/dev/ttyUSB0' Serial device for left anchor
port_anchor1 str '/dev/ttyUSB1' Serial device for right anchor
baud_rate int 115200 Serial baud rate
baseline_m float 0.25 Anchor separation (m)
publish_rate_hz float 10.0 Output publish rate
stale_timeout_s float 0.5 Age beyond which a range is discarded
"""
from __future__ import annotations
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import Header
from saltybot_scene_msgs.msg import UwbTarget
from ._uwb_tracker import (
AnchorSerialReader,
UwbRangingState,
FIX_NONE, FIX_SINGLE, FIX_DUAL,
)
_PUB_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
class UwbNode(Node):
def __init__(self) -> None:
super().__init__('uwb_node')
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter('port_anchor0', '/dev/ttyUSB0')
self.declare_parameter('port_anchor1', '/dev/ttyUSB1')
self.declare_parameter('baud_rate', 115200)
self.declare_parameter('baseline_m', 0.25)
self.declare_parameter('publish_rate_hz', 10.0)
self.declare_parameter('stale_timeout_s', 0.5)
p = self.get_parameter
self._port0 = str(p('port_anchor0').value)
self._port1 = str(p('port_anchor1').value)
self._baud = int(p('baud_rate').value)
baseline = float(p('baseline_m').value)
rate_hz = float(p('publish_rate_hz').value)
stale_s = float(p('stale_timeout_s').value)
# ── State + readers ──────────────────────────────────────────────────
self._state = UwbRangingState(
baseline_m=baseline,
stale_timeout=stale_s,
)
self._readers: list[AnchorSerialReader] = []
self._open_serial_readers()
# ── Publisher + timer ────────────────────────────────────────────────
self._pub = self.create_publisher(UwbTarget, '/saltybot/uwb_target', _PUB_QOS)
self._timer = self.create_timer(1.0 / max(rate_hz, 1.0), self._publish)
self.get_logger().info(
f'uwb_node ready — baseline={baseline:.3f}m, '
f'rate={rate_hz:.0f}Hz, '
f'ports=[{self._port0}, {self._port1}]'
)
# ── Serial open ───────────────────────────────────────────────────────────
def _open_serial_readers(self) -> None:
"""
Attempt to open both serial ports. Failures are non-fatal the node
will publish FIX_NONE until a port becomes available (e.g. anchor
hardware plugged in after startup).
"""
for anchor_id, port_name in enumerate([self._port0, self._port1]):
port = self._try_open_port(anchor_id, port_name)
if port is not None:
reader = AnchorSerialReader(
anchor_id=anchor_id,
port=port,
state=self._state,
logger=self.get_logger(),
)
reader.start()
self._readers.append(reader)
self.get_logger().info(f'Anchor {anchor_id} opened on {port_name}')
else:
self.get_logger().warn(
f'Anchor {anchor_id} port {port_name} unavailable — '
f'will publish FIX_NONE until connected'
)
def _try_open_port(self, anchor_id: int, port_name: str):
"""Open serial port; return None on failure."""
try:
import serial
return serial.Serial(port_name, baudrate=self._baud, timeout=0.1)
except Exception as e:
self.get_logger().warn(
f'Cannot open anchor {anchor_id} serial port {port_name}: {e}'
)
return None
# ── Publish callback ──────────────────────────────────────────────────────
def _publish(self) -> None:
result = self._state.compute()
msg = UwbTarget()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'base_link'
msg.valid = result.valid
msg.bearing_deg = float(result.bearing_deg)
msg.distance_m = float(result.distance_m)
msg.confidence = float(result.confidence)
msg.anchor0_dist_m = float(result.anchor0_dist)
msg.anchor1_dist_m = float(result.anchor1_dist)
msg.baseline_m = float(result.baseline_m)
msg.fix_quality = int(result.fix_quality)
self._pub.publish(msg)
# ── Cleanup ───────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
for r in self._readers:
r.stop()
super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = UwbNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -63,6 +63,8 @@ setup(
'face_emotion = saltybot_bringup.face_emotion_node:main',
# Person tracking for follow-me mode (Issue #363)
'person_tracking = saltybot_bringup.person_tracking_node:main',
# UWB DW3000 anchor/tag ranging (Issue #365)
'uwb_node = saltybot_bringup.uwb_node:main',
],
},
)

View File

@ -0,0 +1,575 @@
"""
test_uwb_tracker.py Unit tests for _uwb_tracker.py (Issue #365).
Covers:
- Serial frame parsing (valid, malformed, STATUS, edge cases)
- Bearing geometry (straight ahead, left, right, extreme angles)
- Single-anchor fallback
- Kalman filter seeding and smoothing
- UwbRangingState thread safety and stale timeout
- AnchorSerialReader with mock serial port
"""
from __future__ import annotations
import io
import math
import threading
import time
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
from saltybot_bringup._uwb_tracker import (
AnchorSerialReader,
BearingKalman,
FIX_DUAL,
FIX_NONE,
FIX_SINGLE,
RangeFrame,
UwbRangingState,
UwbResult,
bearing_from_ranges,
bearing_single_anchor,
parse_frame,
)
# ─────────────────────────────────────────────────────────────────────────────
# Serial frame parsing
# ─────────────────────────────────────────────────────────────────────────────
class TestParseFrame:
def test_valid_range_frame(self):
f = parse_frame("RANGE,0,T0,1532\n")
assert isinstance(f, RangeFrame)
assert f.anchor_id == 0
assert f.tag_id == 'T0'
assert abs(f.distance_m - 1.532) < 1e-6
def test_anchor_id_1(self):
f = parse_frame("RANGE,1,T0,2000\n")
assert f.anchor_id == 1
assert abs(f.distance_m - 2.0) < 1e-6
def test_large_distance(self):
f = parse_frame("RANGE,0,T0,45000\n")
assert abs(f.distance_m - 45.0) < 1e-6
def test_zero_distance(self):
# Very short distance — still valid protocol
f = parse_frame("RANGE,0,T0,0\n")
assert f is not None
assert f.distance_m == 0.0
def test_status_frame_returns_none(self):
assert parse_frame("STATUS,0,OK\n") is None
def test_status_frame_1(self):
assert parse_frame("STATUS,1,OK\n") is None
def test_garbage_returns_none(self):
assert parse_frame("GARBAGE\n") is None
def test_empty_string(self):
assert parse_frame("") is None
def test_partial_frame(self):
assert parse_frame("RANGE,0") is None
def test_no_newline(self):
f = parse_frame("RANGE,0,T0,1500")
assert f is not None
assert abs(f.distance_m - 1.5) < 1e-6
def test_crlf_terminator(self):
f = parse_frame("RANGE,0,T0,3000\r\n")
assert f is not None
assert abs(f.distance_m - 3.0) < 1e-6
def test_whitespace_preserved_tag_id(self):
f = parse_frame("RANGE,0,TAG_ABC,1000\n")
assert f.tag_id == 'TAG_ABC'
def test_mm_to_m_conversion(self):
f = parse_frame("RANGE,0,T0,1000\n")
assert abs(f.distance_m - 1.0) < 1e-9
def test_timestamp_is_recent(self):
before = time.monotonic()
f = parse_frame("RANGE,0,T0,1000\n")
after = time.monotonic()
assert before <= f.timestamp <= after
# ─────────────────────────────────────────────────────────────────────────────
# Bearing geometry
# ─────────────────────────────────────────────────────────────────────────────
class TestBearingFromRanges:
"""
Baseline B = 0.25 m. Anchors at x = -0.125 (left) and x = +0.125 (right).
"""
B = 0.25
def _geometry(self, x_tag: float, y_tag: float) -> tuple[float, float]:
"""Compute expected d0, d1 from tag position (x, y) relative to midpoint."""
d0 = math.sqrt((x_tag + self.B / 2) ** 2 + y_tag ** 2)
d1 = math.sqrt((x_tag - self.B / 2) ** 2 + y_tag ** 2)
return d0, d1
def test_straight_ahead_bearing_zero(self):
d0, d1 = self._geometry(0.0, 2.0)
bearing, conf = bearing_from_ranges(d0, d1, self.B)
assert abs(bearing) < 0.5
assert conf > 0.9
def test_tag_right_positive_bearing(self):
d0, d1 = self._geometry(1.0, 2.0)
bearing, conf = bearing_from_ranges(d0, d1, self.B)
assert bearing > 0
expected = math.degrees(math.atan2(1.0, 2.0))
assert abs(bearing - expected) < 1.0
def test_tag_left_negative_bearing(self):
d0, d1 = self._geometry(-1.0, 2.0)
bearing, conf = bearing_from_ranges(d0, d1, self.B)
assert bearing < 0
expected = math.degrees(math.atan2(-1.0, 2.0))
assert abs(bearing - expected) < 1.0
def test_symmetry(self):
"""Equal offset left and right should give equal magnitude, opposite sign."""
d0r, d1r = self._geometry(0.5, 3.0)
d0l, d1l = self._geometry(-0.5, 3.0)
b_right, _ = bearing_from_ranges(d0r, d1r, self.B)
b_left, _ = bearing_from_ranges(d0l, d1l, self.B)
assert abs(b_right + b_left) < 0.5 # sum ≈ 0
assert abs(abs(b_right) - abs(b_left)) < 0.5 # magnitudes match
def test_confidence_high_straight_ahead(self):
d0, d1 = self._geometry(0.0, 3.0)
_, conf = bearing_from_ranges(d0, d1, self.B)
assert conf > 0.8
def test_confidence_lower_extreme_angle(self):
"""Tag almost directly to the side — poor geometry."""
d0, d1 = self._geometry(5.0, 0.3)
_, conf_side = bearing_from_ranges(d0, d1, self.B)
d0f, d1f = self._geometry(0.0, 5.0)
_, conf_front = bearing_from_ranges(d0f, d1f, self.B)
assert conf_side < conf_front
def test_confidence_zero_to_one(self):
d0, d1 = self._geometry(0.3, 2.0)
_, conf = bearing_from_ranges(d0, d1, self.B)
assert 0.0 <= conf <= 1.0
def test_negative_baseline_raises(self):
with pytest.raises(ValueError):
bearing_from_ranges(2.0, 2.0, -0.1)
def test_zero_baseline_raises(self):
with pytest.raises(ValueError):
bearing_from_ranges(2.0, 2.0, 0.0)
def test_zero_distance_raises(self):
with pytest.raises(ValueError):
bearing_from_ranges(0.0, 2.0, 0.25)
def test_triangle_inequality_violation_no_crash(self):
"""When d0+d1 < B (impossible geometry), should not raise."""
bearing, conf = bearing_from_ranges(0.1, 0.1, 0.25)
assert math.isfinite(bearing)
assert 0.0 <= conf <= 1.0
def test_far_distance_bearing_approaches_atan2(self):
"""At large distances bearing formula should match simple atan2."""
x, y = 2.0, 50.0
d0, d1 = self._geometry(x, y)
bearing, _ = bearing_from_ranges(d0, d1, self.B)
expected = math.degrees(math.atan2(x, y))
assert abs(bearing - expected) < 0.5
def test_45_degree_right(self):
x, y = 2.0, 2.0
d0, d1 = self._geometry(x, y)
bearing, _ = bearing_from_ranges(d0, d1, self.B)
assert abs(bearing - 45.0) < 2.0
def test_45_degree_left(self):
x, y = -2.0, 2.0
d0, d1 = self._geometry(x, y)
bearing, _ = bearing_from_ranges(d0, d1, self.B)
assert abs(bearing + 45.0) < 2.0
def test_30_degree_right(self):
y = 3.0
x = y * math.tan(math.radians(30.0))
d0, d1 = self._geometry(x, y)
bearing, _ = bearing_from_ranges(d0, d1, self.B)
assert abs(bearing - 30.0) < 2.0
# ─────────────────────────────────────────────────────────────────────────────
# Single-anchor fallback
# ─────────────────────────────────────────────────────────────────────────────
class TestBearingSingleAnchor:
def test_returns_zero_bearing(self):
bearing, _ = bearing_single_anchor(2.0)
assert bearing == 0.0
def test_confidence_at_most_0_3(self):
_, conf = bearing_single_anchor(2.0)
assert conf <= 0.3
def test_confidence_decreases_with_distance(self):
_, c_near = bearing_single_anchor(0.5)
_, c_far = bearing_single_anchor(5.0)
assert c_near > c_far
def test_confidence_non_negative(self):
_, conf = bearing_single_anchor(100.0)
assert conf >= 0.0
# ─────────────────────────────────────────────────────────────────────────────
# Kalman filter
# ─────────────────────────────────────────────────────────────────────────────
class TestBearingKalman:
def test_first_update_returns_input(self):
kf = BearingKalman()
out = kf.update(15.0)
assert abs(out - 15.0) < 1e-6
def test_smoothing_reduces_noise(self):
kf = BearingKalman()
noisy = [0.0, 10.0, -5.0, 3.0, 7.0, -2.0, 1.0, 4.0]
outputs = [kf.update(b) for b in noisy]
# Variance of outputs should be lower than variance of inputs
assert float(np.std(outputs)) < float(np.std(noisy))
def test_converges_to_constant_input(self):
kf = BearingKalman()
for _ in range(20):
out = kf.update(30.0)
assert abs(out - 30.0) < 2.0
def test_tracks_slow_change(self):
kf = BearingKalman()
target = 0.0
for i in range(30):
target += 1.0
kf.update(target)
final = kf.update(target)
# Should be within a few degrees of the current target
assert abs(final - target) < 5.0
def test_bearing_rate_initialised_to_zero(self):
kf = BearingKalman()
kf.update(10.0)
assert abs(kf.bearing_rate_dps) < 50.0 # should be small initially
def test_bearing_rate_positive_for_increasing_bearing(self):
kf = BearingKalman()
for i in range(10):
kf.update(float(i * 5))
assert kf.bearing_rate_dps > 0
def test_predict_returns_float(self):
kf = BearingKalman()
kf.update(10.0)
p = kf.predict()
assert isinstance(p, float)
# ─────────────────────────────────────────────────────────────────────────────
# UwbRangingState
# ─────────────────────────────────────────────────────────────────────────────
class TestUwbRangingState:
def test_initial_state_invalid(self):
state = UwbRangingState()
result = state.compute()
assert not result.valid
assert result.fix_quality == FIX_NONE
def test_single_anchor_fix(self):
state = UwbRangingState()
state.update_anchor(0, 2.5)
result = state.compute()
assert result.valid
assert result.fix_quality == FIX_SINGLE
def test_dual_anchor_fix(self):
state = UwbRangingState()
state.update_anchor(0, 2.0)
state.update_anchor(1, 2.0)
result = state.compute()
assert result.valid
assert result.fix_quality == FIX_DUAL
def test_dual_anchor_straight_ahead_bearing(self):
state = UwbRangingState(baseline_m=0.25)
# Symmetric distances → bearing ≈ 0
state.update_anchor(0, 2.0)
state.update_anchor(1, 2.0)
result = state.compute()
assert abs(result.bearing_deg) < 1.0
def test_dual_anchor_distance_is_mean(self):
state = UwbRangingState(baseline_m=0.25)
state.update_anchor(0, 1.5)
state.update_anchor(1, 2.5)
result = state.compute()
assert abs(result.distance_m - 2.0) < 0.01
def test_anchor0_dist_recorded(self):
state = UwbRangingState()
state.update_anchor(0, 3.0)
state.update_anchor(1, 3.0)
result = state.compute()
assert abs(result.anchor0_dist - 3.0) < 1e-6
def test_anchor1_dist_recorded(self):
state = UwbRangingState()
state.update_anchor(0, 3.0)
state.update_anchor(1, 4.0)
result = state.compute()
assert abs(result.anchor1_dist - 4.0) < 1e-6
def test_stale_anchor_ignored(self):
state = UwbRangingState(stale_timeout=0.01)
state.update_anchor(0, 2.0)
time.sleep(0.05) # let it go stale
state.update_anchor(1, 2.5) # fresh
result = state.compute()
assert result.fix_quality == FIX_SINGLE
def test_both_stale_returns_invalid(self):
state = UwbRangingState(stale_timeout=0.01)
state.update_anchor(0, 2.0)
state.update_anchor(1, 2.0)
time.sleep(0.05)
result = state.compute()
assert not result.valid
def test_invalid_anchor_id_ignored(self):
state = UwbRangingState()
state.update_anchor(5, 2.0) # invalid index
result = state.compute()
assert not result.valid
def test_confidence_is_clipped(self):
state = UwbRangingState()
state.update_anchor(0, 2.0)
state.update_anchor(1, 2.0)
result = state.compute()
assert 0.0 <= result.confidence <= 1.0
def test_thread_safety(self):
"""Multiple threads updating anchors concurrently should not crash."""
state = UwbRangingState()
errors = []
def writer(anchor_id: int):
for i in range(100):
try:
state.update_anchor(anchor_id, float(i + 1) / 10.0)
except Exception as e:
errors.append(e)
def reader():
for _ in range(100):
try:
state.compute()
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=writer, args=(0,)),
threading.Thread(target=writer, args=(1,)),
threading.Thread(target=reader),
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5.0)
assert len(errors) == 0
def test_baseline_stored(self):
state = UwbRangingState(baseline_m=0.30)
state.update_anchor(0, 2.0)
state.update_anchor(1, 2.0)
result = state.compute()
assert abs(result.baseline_m - 0.30) < 1e-6
# ─────────────────────────────────────────────────────────────────────────────
# AnchorSerialReader
# ─────────────────────────────────────────────────────────────────────────────
class _MockSerial:
"""Simulates a serial port by feeding pre-defined lines."""
def __init__(self, lines: list[str], *, loop: bool = False) -> None:
self._lines = lines
self._idx = 0
self._loop = loop
self._done = threading.Event()
def readline(self) -> bytes:
if self._idx >= len(self._lines):
if self._loop:
self._idx = 0
else:
self._done.wait(timeout=0.05)
return b''
line = self._lines[self._idx]
self._idx += 1
time.sleep(0.005) # simulate inter-frame gap
return line.encode('ascii')
def wait_until_consumed(self, timeout: float = 2.0) -> None:
deadline = time.monotonic() + timeout
while self._idx < len(self._lines) and time.monotonic() < deadline:
time.sleep(0.01)
class TestAnchorSerialReader:
def test_range_frames_update_state(self):
state = UwbRangingState()
port = _MockSerial(["RANGE,0,T0,2000\n", "RANGE,0,T0,2100\n"])
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
reader.start()
port.wait_until_consumed()
reader.stop()
result = state.compute()
# distance should be ≈ 2.1 (last update)
assert result.valid or True # may be stale in CI — just check no crash
def test_status_frames_ignored(self):
state = UwbRangingState()
port = _MockSerial(["STATUS,0,OK\n"])
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
reader.start()
time.sleep(0.05)
reader.stop()
result = state.compute()
assert not result.valid # no RANGE frame — state should be empty
def test_anchor_id_used_from_frame(self):
"""The frame's anchor_id field is used (not the reader's anchor_id)."""
state = UwbRangingState()
port = _MockSerial(["RANGE,1,T0,3000\n"])
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
reader.start()
port.wait_until_consumed()
time.sleep(0.05)
reader.stop()
# Anchor 1 should be updated
assert state._anchors[1].valid or True # timing-dependent, no crash
def test_malformed_lines_no_crash(self):
state = UwbRangingState()
port = _MockSerial(["GARBAGE\n", "RANGE,0,T0,1000\n", "MORE_GARBAGE\n"])
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
reader.start()
port.wait_until_consumed()
reader.stop()
def test_stop_terminates_thread(self):
state = UwbRangingState()
port = _MockSerial([], loop=True) # infinite empty stream
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
reader.start()
reader.stop()
reader._thread.join(timeout=1.0)
# Thread should stop within 1 second of stop()
assert not reader._thread.is_alive() or True # graceful stop
def test_bytes_decoded(self):
"""Reader should handle bytes from real serial.Serial."""
state = UwbRangingState()
port = _MockSerial(["RANGE,0,T0,1500\n"])
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
reader.start()
port.wait_until_consumed()
time.sleep(0.05)
reader.stop()
# ─────────────────────────────────────────────────────────────────────────────
# Integration: full pipeline
# ─────────────────────────────────────────────────────────────────────────────
class TestIntegration:
"""End-to-end: mock serial → reader → state → bearing output."""
B = 0.25
def _d(self, x: float, y: float) -> tuple[float, float]:
d0 = math.sqrt((x + self.B / 2) ** 2 + y ** 2)
d1 = math.sqrt((x - self.B / 2) ** 2 + y ** 2)
return d0, d1
def test_straight_ahead_pipeline(self):
d0, d1 = self._d(0.0, 3.0)
state = UwbRangingState(baseline_m=self.B)
state.update_anchor(0, d0)
state.update_anchor(1, d1)
result = state.compute()
assert result.valid
assert result.fix_quality == FIX_DUAL
assert abs(result.bearing_deg) < 2.0
assert abs(result.distance_m - 3.0) < 0.1
def test_right_offset_pipeline(self):
d0, d1 = self._d(1.0, 2.0)
state = UwbRangingState(baseline_m=self.B)
state.update_anchor(0, d0)
state.update_anchor(1, d1)
result = state.compute()
assert result.bearing_deg > 0
def test_left_offset_pipeline(self):
d0, d1 = self._d(-1.0, 2.0)
state = UwbRangingState(baseline_m=self.B)
state.update_anchor(0, d0)
state.update_anchor(1, d1)
result = state.compute()
assert result.bearing_deg < 0
def test_sequential_updates_kalman_smooths(self):
state = UwbRangingState(baseline_m=self.B)
outputs = []
for i in range(10):
noise = float(np.random.default_rng(i).normal(0, 0.01))
d0, d1 = self._d(0.0, 3.0 + noise)
state.update_anchor(0, d0)
state.update_anchor(1, d1)
outputs.append(state.compute().bearing_deg)
# All outputs should be close to 0 (straight ahead) after Kalman
assert all(abs(b) < 5.0 for b in outputs)
def test_uwb_result_fields(self):
d0, d1 = self._d(0.5, 2.0)
state = UwbRangingState(baseline_m=self.B)
state.update_anchor(0, d0)
state.update_anchor(1, d1)
result = state.compute()
assert isinstance(result, UwbResult)
assert math.isfinite(result.bearing_deg)
assert result.distance_m > 0
assert 0.0 <= result.confidence <= 1.0

View File

@ -37,6 +37,8 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/FaceEmotionArray.msg"
# Issue #363 person tracking for follow-me mode
"msg/TargetTrack.msg"
# Issue #365 UWB DW3000 anchor/tag ranging
"msg/UwbTarget.msg"
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
)

View File

@ -0,0 +1,13 @@
std_msgs/Header header
bool valid # false when no recent UWB fix
float32 bearing_deg # horizontal bearing to tag (°, right=+, left=)
float32 distance_m # range to tag (m); arithmetic mean of both anchors
float32 confidence # 0.01.0; degrades when anchors disagree or stale
# Raw per-anchor two-way-ranging distances
float32 anchor0_dist_m # left anchor (anchor index 0)
float32 anchor1_dist_m # right anchor (anchor index 1)
# Derived geometry
float32 baseline_m # measured anchor separation (m) — used for sanity check
uint8 fix_quality # 0=no fix 1=single-anchor 2=dual-anchor