feat: Issue #365 — UWB DW3000 anchor/tag tracking (bearing + distance) #368
@ -0,0 +1,411 @@
|
|||||||
|
"""
|
||||||
|
_uwb_tracker.py — UWB DW3000 anchor/tag ranging + bearing estimation (Issue #365).
|
||||||
|
|
||||||
|
Pure-Python library (no ROS2 / hardware dependencies) so it can be fully unit-tested
|
||||||
|
on a development machine without the physical ESP32-UWB-Pro anchors attached.
|
||||||
|
|
||||||
|
Hardware layout
|
||||||
|
---------------
|
||||||
|
Two DW3000 anchors are mounted on the SaltyBot chassis, separated by a ~25 cm
|
||||||
|
baseline, both facing forward:
|
||||||
|
|
||||||
|
anchor0 (left, x = −baseline/2) anchor1 (right, x = +baseline/2)
|
||||||
|
│←──────── baseline ────────→│
|
||||||
|
↑
|
||||||
|
robot forward
|
||||||
|
|
||||||
|
The wearable tag is carried by the Tee (person being followed).
|
||||||
|
|
||||||
|
Bearing estimation
|
||||||
|
------------------
|
||||||
|
Given TWR (two-way ranging) distances d0 and d1 from the two anchors to the tag,
|
||||||
|
and the known baseline B between the anchors, we compute bearing θ using the
|
||||||
|
law of cosines:
|
||||||
|
|
||||||
|
cos α = (d0² - d1² + B²) / (2 · B · d1) [angle at anchor1]
|
||||||
|
|
||||||
|
Bearing is measured from the perpendicular bisector of the baseline (i.e. the
|
||||||
|
robot's forward axis):
|
||||||
|
|
||||||
|
θ = 90° − α
|
||||||
|
|
||||||
|
Positive θ = tag is to the right, negative = tag is to the left.
|
||||||
|
|
||||||
|
The formula degrades when the tag is very close to the baseline or at extreme
|
||||||
|
angles. We report a confidence value that penalises these conditions.
|
||||||
|
|
||||||
|
Serial protocol (ESP32 → Orin via USB-serial)
|
||||||
|
---------------------------------------------
|
||||||
|
Each anchor sends newline-terminated ASCII frames at ≥10 Hz:
|
||||||
|
|
||||||
|
RANGE,<anchor_id>,<tag_id>,<distance_mm>\n
|
||||||
|
|
||||||
|
Example:
|
||||||
|
RANGE,0,T0,1532\n → anchor 0, tag T0, distance 1532 mm
|
||||||
|
RANGE,1,T0,1748\n → anchor 1, tag T0, distance 1748 mm
|
||||||
|
|
||||||
|
A STATUS frame is sent once on connection:
|
||||||
|
STATUS,<anchor_id>,OK\n
|
||||||
|
|
||||||
|
The node opens two serial ports (one per anchor). A background thread reads
|
||||||
|
each port and updates a shared RangingState.
|
||||||
|
|
||||||
|
Kalman filter
|
||||||
|
-------------
|
||||||
|
A simple 1-D constant-velocity Kalman filter smooths the bearing output.
|
||||||
|
State: [bearing_deg, bearing_rate_dps].
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
import re
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
# ── Constants ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# Fix quality codes (written to UwbTarget.fix_quality)
|
||||||
|
FIX_NONE = 0
|
||||||
|
FIX_SINGLE = 1 # only one anchor responding
|
||||||
|
FIX_DUAL = 2 # both anchors responding — full bearing estimate
|
||||||
|
|
||||||
|
# Maximum age of a ranging measurement before it is considered stale (seconds)
|
||||||
|
_STALE_S = 0.5
|
||||||
|
|
||||||
|
# ── Serial protocol parsing ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_RANGE_RE = re.compile(r'^RANGE,(\d+),(\w+),([\d.]+)\s*$')
|
||||||
|
_STATUS_RE = re.compile(r'^STATUS,(\d+),(\w+)\s*$')
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RangeFrame:
|
||||||
|
"""Decoded RANGE frame from a DW3000 anchor."""
|
||||||
|
anchor_id: int
|
||||||
|
tag_id: str
|
||||||
|
distance_m: float
|
||||||
|
timestamp: float = field(default_factory=time.monotonic)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_frame(line: str) -> Optional[RangeFrame]:
|
||||||
|
"""
|
||||||
|
Parse one ASCII line from the anchor serial stream.
|
||||||
|
|
||||||
|
Returns a RangeFrame on success, None for STATUS/unknown/malformed lines.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
line : raw ASCII line (may include trailing whitespace / CR)
|
||||||
|
|
||||||
|
Examples
|
||||||
|
--------
|
||||||
|
>>> f = parse_frame("RANGE,0,T0,1532")
|
||||||
|
>>> f.anchor_id, f.tag_id, f.distance_m
|
||||||
|
(0, 'T0', 1.532)
|
||||||
|
>>> parse_frame("STATUS,0,OK") is None
|
||||||
|
True
|
||||||
|
>>> parse_frame("GARBAGE") is None
|
||||||
|
True
|
||||||
|
"""
|
||||||
|
m = _RANGE_RE.match(line.strip())
|
||||||
|
if m is None:
|
||||||
|
return None
|
||||||
|
anchor_id = int(m.group(1))
|
||||||
|
tag_id = m.group(2)
|
||||||
|
distance_m = float(m.group(3)) / 1000.0 # mm → m
|
||||||
|
return RangeFrame(anchor_id=anchor_id, tag_id=tag_id, distance_m=distance_m)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Two-anchor bearing geometry ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def bearing_from_ranges(
|
||||||
|
d0: float,
|
||||||
|
d1: float,
|
||||||
|
baseline_m: float,
|
||||||
|
) -> Tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Compute bearing to tag from two anchor distances.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
d0 : distance from anchor-0 (left, at x = −baseline/2) to tag (m)
|
||||||
|
d1 : distance from anchor-1 (right, at x = +baseline/2) to tag (m)
|
||||||
|
baseline_m : separation between the two anchors (m)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(bearing_deg, confidence)
|
||||||
|
bearing_deg : signed bearing in degrees (positive = right)
|
||||||
|
confidence : 0.0–1.0 quality estimate
|
||||||
|
|
||||||
|
Notes
|
||||||
|
-----
|
||||||
|
Derived from law of cosines applied to the triangle
|
||||||
|
(anchor0, anchor1, tag):
|
||||||
|
cos(α) = (d0² − d1² + B²) / (2·B·d0) where α is angle at anchor0
|
||||||
|
Forward axis is the perpendicular bisector of the baseline, so:
|
||||||
|
bearing = 90° − α_at_anchor1
|
||||||
|
We use the symmetric formula through the midpoint:
|
||||||
|
x_tag = (d0² - d1²) / (2·B) [lateral offset from midpoint]
|
||||||
|
y_tag = sqrt(d0² - (x_tag + B/2)²) [forward distance]
|
||||||
|
bearing = atan2(x_tag, y_tag) * 180/π
|
||||||
|
"""
|
||||||
|
if baseline_m <= 0.0:
|
||||||
|
raise ValueError(f'baseline_m must be positive, got {baseline_m}')
|
||||||
|
if d0 <= 0.0 or d1 <= 0.0:
|
||||||
|
raise ValueError(f'distances must be positive, got d0={d0} d1={d1}')
|
||||||
|
|
||||||
|
B = baseline_m
|
||||||
|
# Lateral offset of tag from midpoint of baseline (positive = towards anchor1 = right)
|
||||||
|
x = (d0 * d0 - d1 * d1) / (2.0 * B)
|
||||||
|
|
||||||
|
# Forward distance (Pythagorean)
|
||||||
|
# anchor0 is at x_coord = -B/2 relative to midpoint
|
||||||
|
y_sq = d0 * d0 - (x + B / 2.0) ** 2
|
||||||
|
if y_sq < 0.0:
|
||||||
|
# Triangle inequality violated — noisy ranging; clamp to zero
|
||||||
|
y_sq = 0.0
|
||||||
|
y = math.sqrt(y_sq)
|
||||||
|
|
||||||
|
bearing_deg = math.degrees(math.atan2(x, max(y, 1e-3)))
|
||||||
|
|
||||||
|
# ── Confidence ───────────────────────────────────────────────────────────
|
||||||
|
# 1. Range agreement penalty — if |d0-d1| > sqrt(d0²+d1²) * factor, tag
|
||||||
|
# is almost directly on the baseline extension (extreme angle, poor geometry)
|
||||||
|
mean_d = (d0 + d1) / 2.0
|
||||||
|
diff = abs(d0 - d1)
|
||||||
|
# Geometric dilution: confidence falls as |bearing| approaches 90°
|
||||||
|
bearing_penalty = math.cos(math.radians(bearing_deg)) # 1.0 at 0°, 0.0 at 90°
|
||||||
|
bearing_penalty = max(0.0, bearing_penalty)
|
||||||
|
|
||||||
|
# 2. Distance sanity: if tag is unreasonably close (< B) confidence drops
|
||||||
|
dist_penalty = min(1.0, mean_d / max(B, 0.1))
|
||||||
|
|
||||||
|
confidence = float(np.clip(bearing_penalty * dist_penalty, 0.0, 1.0))
|
||||||
|
|
||||||
|
return bearing_deg, confidence
|
||||||
|
|
||||||
|
|
||||||
|
def bearing_single_anchor(
|
||||||
|
d0: float,
|
||||||
|
baseline_m: float = 0.25,
|
||||||
|
) -> Tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Fallback bearing when only one anchor is responding.
|
||||||
|
|
||||||
|
With a single anchor we cannot determine lateral position, so we return
|
||||||
|
bearing=0 (directly ahead) with a low confidence proportional to 1/distance.
|
||||||
|
This keeps the follow-me controller moving toward the target even in
|
||||||
|
degraded mode.
|
||||||
|
|
||||||
|
Returns (0.0, confidence) where confidence ≤ 0.3.
|
||||||
|
"""
|
||||||
|
# Single-anchor confidence: ≤ 0.3, decreases with distance
|
||||||
|
confidence = float(np.clip(0.3 * (2.0 / max(d0, 0.5)), 0.0, 0.3))
|
||||||
|
return 0.0, confidence
|
||||||
|
|
||||||
|
|
||||||
|
# ── Kalman filter for bearing smoothing ───────────────────────────────────────
|
||||||
|
|
||||||
|
class BearingKalman:
|
||||||
|
"""
|
||||||
|
1-D constant-velocity Kalman filter for bearing smoothing.
|
||||||
|
|
||||||
|
State: [bearing_deg, bearing_rate_dps]
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
process_noise_deg2: float = 10.0, # bearing process noise (deg²/frame)
|
||||||
|
process_noise_rate2: float = 100.0, # rate process noise
|
||||||
|
meas_noise_deg2: float = 4.0, # bearing measurement noise (deg²)
|
||||||
|
) -> None:
|
||||||
|
self._x = np.zeros(2, dtype=np.float64) # [bearing, rate]
|
||||||
|
self._P = np.diag([100.0, 1000.0]) # initial covariance
|
||||||
|
self._Q = np.diag([process_noise_deg2, process_noise_rate2])
|
||||||
|
self._R = np.array([[meas_noise_deg2]])
|
||||||
|
self._F = np.array([[1.0, 1.0], # state transition (dt=1 frame)
|
||||||
|
[0.0, 1.0]])
|
||||||
|
self._H = np.array([[1.0, 0.0]]) # observation: bearing only
|
||||||
|
self._initialised = False
|
||||||
|
|
||||||
|
def predict(self) -> float:
|
||||||
|
"""Predict next bearing; returns predicted bearing_deg."""
|
||||||
|
self._x = self._F @ self._x
|
||||||
|
self._P = self._F @ self._P @ self._F.T + self._Q
|
||||||
|
return float(self._x[0])
|
||||||
|
|
||||||
|
def update(self, bearing_deg: float) -> float:
|
||||||
|
"""
|
||||||
|
Update with a new bearing measurement.
|
||||||
|
|
||||||
|
On first call, seeds the filter state.
|
||||||
|
Returns smoothed bearing_deg.
|
||||||
|
"""
|
||||||
|
if not self._initialised:
|
||||||
|
self._x[0] = bearing_deg
|
||||||
|
self._initialised = True
|
||||||
|
return bearing_deg
|
||||||
|
|
||||||
|
self.predict()
|
||||||
|
|
||||||
|
y = np.array([[bearing_deg]]) - self._H @ self._x.reshape(-1, 1)
|
||||||
|
S = self._H @ self._P @ self._H.T + self._R
|
||||||
|
K = self._P @ self._H.T @ np.linalg.inv(S)
|
||||||
|
self._x = self._x + (K @ y).ravel()
|
||||||
|
self._P = (np.eye(2) - K @ self._H) @ self._P
|
||||||
|
return float(self._x[0])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def bearing_rate_dps(self) -> float:
|
||||||
|
"""Current bearing rate estimate (degrees/second at nominal 10 Hz)."""
|
||||||
|
return float(self._x[1]) * 10.0 # per-frame rate → per-second
|
||||||
|
|
||||||
|
|
||||||
|
# ── Shared ranging state (thread-safe) ────────────────────────────────────────
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _AnchorState:
|
||||||
|
distance_m: float = 0.0
|
||||||
|
timestamp: float = 0.0
|
||||||
|
valid: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class UwbRangingState:
|
||||||
|
"""
|
||||||
|
Thread-safe store for the most recent ranging measurement from each anchor.
|
||||||
|
|
||||||
|
Updated by the serial reader threads, consumed by the ROS2 publish timer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
baseline_m: float = 0.25,
|
||||||
|
stale_timeout: float = _STALE_S,
|
||||||
|
) -> None:
|
||||||
|
self.baseline_m = baseline_m
|
||||||
|
self.stale_timeout = stale_timeout
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._anchors = [_AnchorState(), _AnchorState()]
|
||||||
|
self._kalman = BearingKalman()
|
||||||
|
|
||||||
|
def update_anchor(self, anchor_id: int, distance_m: float) -> None:
|
||||||
|
"""Record a new ranging measurement from anchor anchor_id (0 or 1)."""
|
||||||
|
if anchor_id not in (0, 1):
|
||||||
|
return
|
||||||
|
with self._lock:
|
||||||
|
s = self._anchors[anchor_id]
|
||||||
|
s.distance_m = distance_m
|
||||||
|
s.timestamp = time.monotonic()
|
||||||
|
s.valid = True
|
||||||
|
|
||||||
|
def compute(self) -> 'UwbResult':
|
||||||
|
"""
|
||||||
|
Derive bearing, distance, confidence, and fix quality from latest ranges.
|
||||||
|
|
||||||
|
Called at the publish rate (≥10 Hz). Applies Kalman smoothing to bearing.
|
||||||
|
"""
|
||||||
|
now = time.monotonic()
|
||||||
|
with self._lock:
|
||||||
|
a0 = self._anchors[0]
|
||||||
|
a1 = self._anchors[1]
|
||||||
|
v0 = a0.valid and (now - a0.timestamp) < self.stale_timeout
|
||||||
|
v1 = a1.valid and (now - a1.timestamp) < self.stale_timeout
|
||||||
|
d0 = a0.distance_m
|
||||||
|
d1 = a1.distance_m
|
||||||
|
|
||||||
|
if not v0 and not v1:
|
||||||
|
return UwbResult(valid=False)
|
||||||
|
|
||||||
|
if v0 and v1:
|
||||||
|
bearing_raw, conf = bearing_from_ranges(d0, d1, self.baseline_m)
|
||||||
|
fix_quality = FIX_DUAL
|
||||||
|
distance_m = (d0 + d1) / 2.0
|
||||||
|
elif v0:
|
||||||
|
bearing_raw, conf = bearing_single_anchor(d0, self.baseline_m)
|
||||||
|
fix_quality = FIX_SINGLE
|
||||||
|
distance_m = d0
|
||||||
|
d1 = 0.0
|
||||||
|
else:
|
||||||
|
bearing_raw, conf = bearing_single_anchor(d1, self.baseline_m)
|
||||||
|
fix_quality = FIX_SINGLE
|
||||||
|
distance_m = d1
|
||||||
|
d0 = 0.0
|
||||||
|
|
||||||
|
bearing_smooth = self._kalman.update(bearing_raw)
|
||||||
|
|
||||||
|
return UwbResult(
|
||||||
|
valid = True,
|
||||||
|
bearing_deg = bearing_smooth,
|
||||||
|
distance_m = distance_m,
|
||||||
|
confidence = conf,
|
||||||
|
anchor0_dist = d0,
|
||||||
|
anchor1_dist = d1,
|
||||||
|
baseline_m = self.baseline_m,
|
||||||
|
fix_quality = fix_quality,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class UwbResult:
|
||||||
|
"""Computed UWB fix — mirrors UwbTarget.msg fields."""
|
||||||
|
valid: bool = False
|
||||||
|
bearing_deg: float = 0.0
|
||||||
|
distance_m: float = 0.0
|
||||||
|
confidence: float = 0.0
|
||||||
|
anchor0_dist: float = 0.0
|
||||||
|
anchor1_dist: float = 0.0
|
||||||
|
baseline_m: float = 0.25
|
||||||
|
fix_quality: int = FIX_NONE
|
||||||
|
|
||||||
|
|
||||||
|
# ── Serial reader (runs in background thread) ──────────────────────────────────
|
||||||
|
|
||||||
|
class AnchorSerialReader:
|
||||||
|
"""
|
||||||
|
Background thread that reads from one anchor's serial port and calls
|
||||||
|
state.update_anchor() on each valid RANGE frame.
|
||||||
|
|
||||||
|
Designed to be used with a real serial.Serial object in production, or
|
||||||
|
any file-like object with a readline() method for testing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
anchor_id: int,
|
||||||
|
port, # serial.Serial or file-like (readline() interface)
|
||||||
|
state: UwbRangingState,
|
||||||
|
logger=None,
|
||||||
|
) -> None:
|
||||||
|
self._anchor_id = anchor_id
|
||||||
|
self._port = port
|
||||||
|
self._state = state
|
||||||
|
self._log = logger
|
||||||
|
self._running = False
|
||||||
|
self._thread = threading.Thread(target=self._run, daemon=True)
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
self._running = True
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
def _run(self) -> None:
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
raw = self._port.readline()
|
||||||
|
if isinstance(raw, bytes):
|
||||||
|
raw = raw.decode('ascii', errors='replace')
|
||||||
|
frame = parse_frame(raw)
|
||||||
|
if frame is not None:
|
||||||
|
self._state.update_anchor(frame.anchor_id, frame.distance_m)
|
||||||
|
except Exception as e:
|
||||||
|
if self._log:
|
||||||
|
self._log.warn(f'UWB anchor {self._anchor_id} read error: {e}')
|
||||||
|
time.sleep(0.05)
|
||||||
170
jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/uwb_node.py
Normal file
170
jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/uwb_node.py
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
"""
|
||||||
|
uwb_node.py — UWB DW3000 anchor/tag ranging node (Issue #365).
|
||||||
|
|
||||||
|
Reads TWR distances from two ESP32-UWB-Pro anchors via USB serial and publishes
|
||||||
|
a fused bearing + distance estimate for the follow-me controller.
|
||||||
|
|
||||||
|
Hardware
|
||||||
|
--------
|
||||||
|
anchor0 (left) : USB serial, default /dev/ttyUSB0
|
||||||
|
anchor1 (right) : USB serial, default /dev/ttyUSB1
|
||||||
|
Baseline : ~25 cm (configurable)
|
||||||
|
|
||||||
|
Publishes
|
||||||
|
---------
|
||||||
|
/saltybot/uwb_target saltybot_scene_msgs/UwbTarget (10 Hz)
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
port_anchor0 str '/dev/ttyUSB0' Serial device for left anchor
|
||||||
|
port_anchor1 str '/dev/ttyUSB1' Serial device for right anchor
|
||||||
|
baud_rate int 115200 Serial baud rate
|
||||||
|
baseline_m float 0.25 Anchor separation (m)
|
||||||
|
publish_rate_hz float 10.0 Output publish rate
|
||||||
|
stale_timeout_s float 0.5 Age beyond which a range is discarded
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
from std_msgs.msg import Header
|
||||||
|
from saltybot_scene_msgs.msg import UwbTarget
|
||||||
|
|
||||||
|
from ._uwb_tracker import (
|
||||||
|
AnchorSerialReader,
|
||||||
|
UwbRangingState,
|
||||||
|
FIX_NONE, FIX_SINGLE, FIX_DUAL,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_PUB_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class UwbNode(Node):
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__('uwb_node')
|
||||||
|
|
||||||
|
# ── Parameters ──────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter('port_anchor0', '/dev/ttyUSB0')
|
||||||
|
self.declare_parameter('port_anchor1', '/dev/ttyUSB1')
|
||||||
|
self.declare_parameter('baud_rate', 115200)
|
||||||
|
self.declare_parameter('baseline_m', 0.25)
|
||||||
|
self.declare_parameter('publish_rate_hz', 10.0)
|
||||||
|
self.declare_parameter('stale_timeout_s', 0.5)
|
||||||
|
|
||||||
|
p = self.get_parameter
|
||||||
|
|
||||||
|
self._port0 = str(p('port_anchor0').value)
|
||||||
|
self._port1 = str(p('port_anchor1').value)
|
||||||
|
self._baud = int(p('baud_rate').value)
|
||||||
|
baseline = float(p('baseline_m').value)
|
||||||
|
rate_hz = float(p('publish_rate_hz').value)
|
||||||
|
stale_s = float(p('stale_timeout_s').value)
|
||||||
|
|
||||||
|
# ── State + readers ──────────────────────────────────────────────────
|
||||||
|
self._state = UwbRangingState(
|
||||||
|
baseline_m=baseline,
|
||||||
|
stale_timeout=stale_s,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._readers: list[AnchorSerialReader] = []
|
||||||
|
self._open_serial_readers()
|
||||||
|
|
||||||
|
# ── Publisher + timer ────────────────────────────────────────────────
|
||||||
|
self._pub = self.create_publisher(UwbTarget, '/saltybot/uwb_target', _PUB_QOS)
|
||||||
|
self._timer = self.create_timer(1.0 / max(rate_hz, 1.0), self._publish)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'uwb_node ready — baseline={baseline:.3f}m, '
|
||||||
|
f'rate={rate_hz:.0f}Hz, '
|
||||||
|
f'ports=[{self._port0}, {self._port1}]'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Serial open ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _open_serial_readers(self) -> None:
|
||||||
|
"""
|
||||||
|
Attempt to open both serial ports. Failures are non-fatal — the node
|
||||||
|
will publish FIX_NONE until a port becomes available (e.g. anchor
|
||||||
|
hardware plugged in after startup).
|
||||||
|
"""
|
||||||
|
for anchor_id, port_name in enumerate([self._port0, self._port1]):
|
||||||
|
port = self._try_open_port(anchor_id, port_name)
|
||||||
|
if port is not None:
|
||||||
|
reader = AnchorSerialReader(
|
||||||
|
anchor_id=anchor_id,
|
||||||
|
port=port,
|
||||||
|
state=self._state,
|
||||||
|
logger=self.get_logger(),
|
||||||
|
)
|
||||||
|
reader.start()
|
||||||
|
self._readers.append(reader)
|
||||||
|
self.get_logger().info(f'Anchor {anchor_id} opened on {port_name}')
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f'Anchor {anchor_id} port {port_name} unavailable — '
|
||||||
|
f'will publish FIX_NONE until connected'
|
||||||
|
)
|
||||||
|
|
||||||
|
def _try_open_port(self, anchor_id: int, port_name: str):
|
||||||
|
"""Open serial port; return None on failure."""
|
||||||
|
try:
|
||||||
|
import serial
|
||||||
|
return serial.Serial(port_name, baudrate=self._baud, timeout=0.1)
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f'Cannot open anchor {anchor_id} serial port {port_name}: {e}'
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# ── Publish callback ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish(self) -> None:
|
||||||
|
result = self._state.compute()
|
||||||
|
msg = UwbTarget()
|
||||||
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
msg.header.frame_id = 'base_link'
|
||||||
|
|
||||||
|
msg.valid = result.valid
|
||||||
|
msg.bearing_deg = float(result.bearing_deg)
|
||||||
|
msg.distance_m = float(result.distance_m)
|
||||||
|
msg.confidence = float(result.confidence)
|
||||||
|
msg.anchor0_dist_m = float(result.anchor0_dist)
|
||||||
|
msg.anchor1_dist_m = float(result.anchor1_dist)
|
||||||
|
msg.baseline_m = float(result.baseline_m)
|
||||||
|
msg.fix_quality = int(result.fix_quality)
|
||||||
|
|
||||||
|
self._pub.publish(msg)
|
||||||
|
|
||||||
|
# ── Cleanup ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
for r in self._readers:
|
||||||
|
r.stop()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = UwbNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -63,6 +63,8 @@ setup(
|
|||||||
'face_emotion = saltybot_bringup.face_emotion_node:main',
|
'face_emotion = saltybot_bringup.face_emotion_node:main',
|
||||||
# Person tracking for follow-me mode (Issue #363)
|
# Person tracking for follow-me mode (Issue #363)
|
||||||
'person_tracking = saltybot_bringup.person_tracking_node:main',
|
'person_tracking = saltybot_bringup.person_tracking_node:main',
|
||||||
|
# UWB DW3000 anchor/tag ranging (Issue #365)
|
||||||
|
'uwb_node = saltybot_bringup.uwb_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
575
jetson/ros2_ws/src/saltybot_bringup/test/test_uwb_tracker.py
Normal file
575
jetson/ros2_ws/src/saltybot_bringup/test/test_uwb_tracker.py
Normal file
@ -0,0 +1,575 @@
|
|||||||
|
"""
|
||||||
|
test_uwb_tracker.py — Unit tests for _uwb_tracker.py (Issue #365).
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- Serial frame parsing (valid, malformed, STATUS, edge cases)
|
||||||
|
- Bearing geometry (straight ahead, left, right, extreme angles)
|
||||||
|
- Single-anchor fallback
|
||||||
|
- Kalman filter seeding and smoothing
|
||||||
|
- UwbRangingState thread safety and stale timeout
|
||||||
|
- AnchorSerialReader with mock serial port
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
import math
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_bringup._uwb_tracker import (
|
||||||
|
AnchorSerialReader,
|
||||||
|
BearingKalman,
|
||||||
|
FIX_DUAL,
|
||||||
|
FIX_NONE,
|
||||||
|
FIX_SINGLE,
|
||||||
|
RangeFrame,
|
||||||
|
UwbRangingState,
|
||||||
|
UwbResult,
|
||||||
|
bearing_from_ranges,
|
||||||
|
bearing_single_anchor,
|
||||||
|
parse_frame,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Serial frame parsing
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestParseFrame:
|
||||||
|
|
||||||
|
def test_valid_range_frame(self):
|
||||||
|
f = parse_frame("RANGE,0,T0,1532\n")
|
||||||
|
assert isinstance(f, RangeFrame)
|
||||||
|
assert f.anchor_id == 0
|
||||||
|
assert f.tag_id == 'T0'
|
||||||
|
assert abs(f.distance_m - 1.532) < 1e-6
|
||||||
|
|
||||||
|
def test_anchor_id_1(self):
|
||||||
|
f = parse_frame("RANGE,1,T0,2000\n")
|
||||||
|
assert f.anchor_id == 1
|
||||||
|
assert abs(f.distance_m - 2.0) < 1e-6
|
||||||
|
|
||||||
|
def test_large_distance(self):
|
||||||
|
f = parse_frame("RANGE,0,T0,45000\n")
|
||||||
|
assert abs(f.distance_m - 45.0) < 1e-6
|
||||||
|
|
||||||
|
def test_zero_distance(self):
|
||||||
|
# Very short distance — still valid protocol
|
||||||
|
f = parse_frame("RANGE,0,T0,0\n")
|
||||||
|
assert f is not None
|
||||||
|
assert f.distance_m == 0.0
|
||||||
|
|
||||||
|
def test_status_frame_returns_none(self):
|
||||||
|
assert parse_frame("STATUS,0,OK\n") is None
|
||||||
|
|
||||||
|
def test_status_frame_1(self):
|
||||||
|
assert parse_frame("STATUS,1,OK\n") is None
|
||||||
|
|
||||||
|
def test_garbage_returns_none(self):
|
||||||
|
assert parse_frame("GARBAGE\n") is None
|
||||||
|
|
||||||
|
def test_empty_string(self):
|
||||||
|
assert parse_frame("") is None
|
||||||
|
|
||||||
|
def test_partial_frame(self):
|
||||||
|
assert parse_frame("RANGE,0") is None
|
||||||
|
|
||||||
|
def test_no_newline(self):
|
||||||
|
f = parse_frame("RANGE,0,T0,1500")
|
||||||
|
assert f is not None
|
||||||
|
assert abs(f.distance_m - 1.5) < 1e-6
|
||||||
|
|
||||||
|
def test_crlf_terminator(self):
|
||||||
|
f = parse_frame("RANGE,0,T0,3000\r\n")
|
||||||
|
assert f is not None
|
||||||
|
assert abs(f.distance_m - 3.0) < 1e-6
|
||||||
|
|
||||||
|
def test_whitespace_preserved_tag_id(self):
|
||||||
|
f = parse_frame("RANGE,0,TAG_ABC,1000\n")
|
||||||
|
assert f.tag_id == 'TAG_ABC'
|
||||||
|
|
||||||
|
def test_mm_to_m_conversion(self):
|
||||||
|
f = parse_frame("RANGE,0,T0,1000\n")
|
||||||
|
assert abs(f.distance_m - 1.0) < 1e-9
|
||||||
|
|
||||||
|
def test_timestamp_is_recent(self):
|
||||||
|
before = time.monotonic()
|
||||||
|
f = parse_frame("RANGE,0,T0,1000\n")
|
||||||
|
after = time.monotonic()
|
||||||
|
assert before <= f.timestamp <= after
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Bearing geometry
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestBearingFromRanges:
|
||||||
|
"""
|
||||||
|
Baseline B = 0.25 m. Anchors at x = -0.125 (left) and x = +0.125 (right).
|
||||||
|
"""
|
||||||
|
|
||||||
|
B = 0.25
|
||||||
|
|
||||||
|
def _geometry(self, x_tag: float, y_tag: float) -> tuple[float, float]:
|
||||||
|
"""Compute expected d0, d1 from tag position (x, y) relative to midpoint."""
|
||||||
|
d0 = math.sqrt((x_tag + self.B / 2) ** 2 + y_tag ** 2)
|
||||||
|
d1 = math.sqrt((x_tag - self.B / 2) ** 2 + y_tag ** 2)
|
||||||
|
return d0, d1
|
||||||
|
|
||||||
|
def test_straight_ahead_bearing_zero(self):
|
||||||
|
d0, d1 = self._geometry(0.0, 2.0)
|
||||||
|
bearing, conf = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert abs(bearing) < 0.5
|
||||||
|
assert conf > 0.9
|
||||||
|
|
||||||
|
def test_tag_right_positive_bearing(self):
|
||||||
|
d0, d1 = self._geometry(1.0, 2.0)
|
||||||
|
bearing, conf = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert bearing > 0
|
||||||
|
expected = math.degrees(math.atan2(1.0, 2.0))
|
||||||
|
assert abs(bearing - expected) < 1.0
|
||||||
|
|
||||||
|
def test_tag_left_negative_bearing(self):
|
||||||
|
d0, d1 = self._geometry(-1.0, 2.0)
|
||||||
|
bearing, conf = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert bearing < 0
|
||||||
|
expected = math.degrees(math.atan2(-1.0, 2.0))
|
||||||
|
assert abs(bearing - expected) < 1.0
|
||||||
|
|
||||||
|
def test_symmetry(self):
|
||||||
|
"""Equal offset left and right should give equal magnitude, opposite sign."""
|
||||||
|
d0r, d1r = self._geometry(0.5, 3.0)
|
||||||
|
d0l, d1l = self._geometry(-0.5, 3.0)
|
||||||
|
b_right, _ = bearing_from_ranges(d0r, d1r, self.B)
|
||||||
|
b_left, _ = bearing_from_ranges(d0l, d1l, self.B)
|
||||||
|
assert abs(b_right + b_left) < 0.5 # sum ≈ 0
|
||||||
|
assert abs(abs(b_right) - abs(b_left)) < 0.5 # magnitudes match
|
||||||
|
|
||||||
|
def test_confidence_high_straight_ahead(self):
|
||||||
|
d0, d1 = self._geometry(0.0, 3.0)
|
||||||
|
_, conf = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert conf > 0.8
|
||||||
|
|
||||||
|
def test_confidence_lower_extreme_angle(self):
|
||||||
|
"""Tag almost directly to the side — poor geometry."""
|
||||||
|
d0, d1 = self._geometry(5.0, 0.3)
|
||||||
|
_, conf_side = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
d0f, d1f = self._geometry(0.0, 5.0)
|
||||||
|
_, conf_front = bearing_from_ranges(d0f, d1f, self.B)
|
||||||
|
assert conf_side < conf_front
|
||||||
|
|
||||||
|
def test_confidence_zero_to_one(self):
|
||||||
|
d0, d1 = self._geometry(0.3, 2.0)
|
||||||
|
_, conf = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert 0.0 <= conf <= 1.0
|
||||||
|
|
||||||
|
def test_negative_baseline_raises(self):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
bearing_from_ranges(2.0, 2.0, -0.1)
|
||||||
|
|
||||||
|
def test_zero_baseline_raises(self):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
bearing_from_ranges(2.0, 2.0, 0.0)
|
||||||
|
|
||||||
|
def test_zero_distance_raises(self):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
bearing_from_ranges(0.0, 2.0, 0.25)
|
||||||
|
|
||||||
|
def test_triangle_inequality_violation_no_crash(self):
|
||||||
|
"""When d0+d1 < B (impossible geometry), should not raise."""
|
||||||
|
bearing, conf = bearing_from_ranges(0.1, 0.1, 0.25)
|
||||||
|
assert math.isfinite(bearing)
|
||||||
|
assert 0.0 <= conf <= 1.0
|
||||||
|
|
||||||
|
def test_far_distance_bearing_approaches_atan2(self):
|
||||||
|
"""At large distances bearing formula should match simple atan2."""
|
||||||
|
x, y = 2.0, 50.0
|
||||||
|
d0, d1 = self._geometry(x, y)
|
||||||
|
bearing, _ = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
expected = math.degrees(math.atan2(x, y))
|
||||||
|
assert abs(bearing - expected) < 0.5
|
||||||
|
|
||||||
|
def test_45_degree_right(self):
|
||||||
|
x, y = 2.0, 2.0
|
||||||
|
d0, d1 = self._geometry(x, y)
|
||||||
|
bearing, _ = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert abs(bearing - 45.0) < 2.0
|
||||||
|
|
||||||
|
def test_45_degree_left(self):
|
||||||
|
x, y = -2.0, 2.0
|
||||||
|
d0, d1 = self._geometry(x, y)
|
||||||
|
bearing, _ = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert abs(bearing + 45.0) < 2.0
|
||||||
|
|
||||||
|
def test_30_degree_right(self):
|
||||||
|
y = 3.0
|
||||||
|
x = y * math.tan(math.radians(30.0))
|
||||||
|
d0, d1 = self._geometry(x, y)
|
||||||
|
bearing, _ = bearing_from_ranges(d0, d1, self.B)
|
||||||
|
assert abs(bearing - 30.0) < 2.0
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Single-anchor fallback
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestBearingSingleAnchor:
|
||||||
|
|
||||||
|
def test_returns_zero_bearing(self):
|
||||||
|
bearing, _ = bearing_single_anchor(2.0)
|
||||||
|
assert bearing == 0.0
|
||||||
|
|
||||||
|
def test_confidence_at_most_0_3(self):
|
||||||
|
_, conf = bearing_single_anchor(2.0)
|
||||||
|
assert conf <= 0.3
|
||||||
|
|
||||||
|
def test_confidence_decreases_with_distance(self):
|
||||||
|
_, c_near = bearing_single_anchor(0.5)
|
||||||
|
_, c_far = bearing_single_anchor(5.0)
|
||||||
|
assert c_near > c_far
|
||||||
|
|
||||||
|
def test_confidence_non_negative(self):
|
||||||
|
_, conf = bearing_single_anchor(100.0)
|
||||||
|
assert conf >= 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Kalman filter
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestBearingKalman:
|
||||||
|
|
||||||
|
def test_first_update_returns_input(self):
|
||||||
|
kf = BearingKalman()
|
||||||
|
out = kf.update(15.0)
|
||||||
|
assert abs(out - 15.0) < 1e-6
|
||||||
|
|
||||||
|
def test_smoothing_reduces_noise(self):
|
||||||
|
kf = BearingKalman()
|
||||||
|
noisy = [0.0, 10.0, -5.0, 3.0, 7.0, -2.0, 1.0, 4.0]
|
||||||
|
outputs = [kf.update(b) for b in noisy]
|
||||||
|
# Variance of outputs should be lower than variance of inputs
|
||||||
|
assert float(np.std(outputs)) < float(np.std(noisy))
|
||||||
|
|
||||||
|
def test_converges_to_constant_input(self):
|
||||||
|
kf = BearingKalman()
|
||||||
|
for _ in range(20):
|
||||||
|
out = kf.update(30.0)
|
||||||
|
assert abs(out - 30.0) < 2.0
|
||||||
|
|
||||||
|
def test_tracks_slow_change(self):
|
||||||
|
kf = BearingKalman()
|
||||||
|
target = 0.0
|
||||||
|
for i in range(30):
|
||||||
|
target += 1.0
|
||||||
|
kf.update(target)
|
||||||
|
final = kf.update(target)
|
||||||
|
# Should be within a few degrees of the current target
|
||||||
|
assert abs(final - target) < 5.0
|
||||||
|
|
||||||
|
def test_bearing_rate_initialised_to_zero(self):
|
||||||
|
kf = BearingKalman()
|
||||||
|
kf.update(10.0)
|
||||||
|
assert abs(kf.bearing_rate_dps) < 50.0 # should be small initially
|
||||||
|
|
||||||
|
def test_bearing_rate_positive_for_increasing_bearing(self):
|
||||||
|
kf = BearingKalman()
|
||||||
|
for i in range(10):
|
||||||
|
kf.update(float(i * 5))
|
||||||
|
assert kf.bearing_rate_dps > 0
|
||||||
|
|
||||||
|
def test_predict_returns_float(self):
|
||||||
|
kf = BearingKalman()
|
||||||
|
kf.update(10.0)
|
||||||
|
p = kf.predict()
|
||||||
|
assert isinstance(p, float)
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# UwbRangingState
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestUwbRangingState:
|
||||||
|
|
||||||
|
def test_initial_state_invalid(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
result = state.compute()
|
||||||
|
assert not result.valid
|
||||||
|
assert result.fix_quality == FIX_NONE
|
||||||
|
|
||||||
|
def test_single_anchor_fix(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
state.update_anchor(0, 2.5)
|
||||||
|
result = state.compute()
|
||||||
|
assert result.valid
|
||||||
|
assert result.fix_quality == FIX_SINGLE
|
||||||
|
|
||||||
|
def test_dual_anchor_fix(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
state.update_anchor(0, 2.0)
|
||||||
|
state.update_anchor(1, 2.0)
|
||||||
|
result = state.compute()
|
||||||
|
assert result.valid
|
||||||
|
assert result.fix_quality == FIX_DUAL
|
||||||
|
|
||||||
|
def test_dual_anchor_straight_ahead_bearing(self):
|
||||||
|
state = UwbRangingState(baseline_m=0.25)
|
||||||
|
# Symmetric distances → bearing ≈ 0
|
||||||
|
state.update_anchor(0, 2.0)
|
||||||
|
state.update_anchor(1, 2.0)
|
||||||
|
result = state.compute()
|
||||||
|
assert abs(result.bearing_deg) < 1.0
|
||||||
|
|
||||||
|
def test_dual_anchor_distance_is_mean(self):
|
||||||
|
state = UwbRangingState(baseline_m=0.25)
|
||||||
|
state.update_anchor(0, 1.5)
|
||||||
|
state.update_anchor(1, 2.5)
|
||||||
|
result = state.compute()
|
||||||
|
assert abs(result.distance_m - 2.0) < 0.01
|
||||||
|
|
||||||
|
def test_anchor0_dist_recorded(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
state.update_anchor(0, 3.0)
|
||||||
|
state.update_anchor(1, 3.0)
|
||||||
|
result = state.compute()
|
||||||
|
assert abs(result.anchor0_dist - 3.0) < 1e-6
|
||||||
|
|
||||||
|
def test_anchor1_dist_recorded(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
state.update_anchor(0, 3.0)
|
||||||
|
state.update_anchor(1, 4.0)
|
||||||
|
result = state.compute()
|
||||||
|
assert abs(result.anchor1_dist - 4.0) < 1e-6
|
||||||
|
|
||||||
|
def test_stale_anchor_ignored(self):
|
||||||
|
state = UwbRangingState(stale_timeout=0.01)
|
||||||
|
state.update_anchor(0, 2.0)
|
||||||
|
time.sleep(0.05) # let it go stale
|
||||||
|
state.update_anchor(1, 2.5) # fresh
|
||||||
|
result = state.compute()
|
||||||
|
assert result.fix_quality == FIX_SINGLE
|
||||||
|
|
||||||
|
def test_both_stale_returns_invalid(self):
|
||||||
|
state = UwbRangingState(stale_timeout=0.01)
|
||||||
|
state.update_anchor(0, 2.0)
|
||||||
|
state.update_anchor(1, 2.0)
|
||||||
|
time.sleep(0.05)
|
||||||
|
result = state.compute()
|
||||||
|
assert not result.valid
|
||||||
|
|
||||||
|
def test_invalid_anchor_id_ignored(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
state.update_anchor(5, 2.0) # invalid index
|
||||||
|
result = state.compute()
|
||||||
|
assert not result.valid
|
||||||
|
|
||||||
|
def test_confidence_is_clipped(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
state.update_anchor(0, 2.0)
|
||||||
|
state.update_anchor(1, 2.0)
|
||||||
|
result = state.compute()
|
||||||
|
assert 0.0 <= result.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_thread_safety(self):
|
||||||
|
"""Multiple threads updating anchors concurrently should not crash."""
|
||||||
|
state = UwbRangingState()
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
def writer(anchor_id: int):
|
||||||
|
for i in range(100):
|
||||||
|
try:
|
||||||
|
state.update_anchor(anchor_id, float(i + 1) / 10.0)
|
||||||
|
except Exception as e:
|
||||||
|
errors.append(e)
|
||||||
|
|
||||||
|
def reader():
|
||||||
|
for _ in range(100):
|
||||||
|
try:
|
||||||
|
state.compute()
|
||||||
|
except Exception as e:
|
||||||
|
errors.append(e)
|
||||||
|
|
||||||
|
threads = [
|
||||||
|
threading.Thread(target=writer, args=(0,)),
|
||||||
|
threading.Thread(target=writer, args=(1,)),
|
||||||
|
threading.Thread(target=reader),
|
||||||
|
]
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
for t in threads:
|
||||||
|
t.join(timeout=5.0)
|
||||||
|
|
||||||
|
assert len(errors) == 0
|
||||||
|
|
||||||
|
def test_baseline_stored(self):
|
||||||
|
state = UwbRangingState(baseline_m=0.30)
|
||||||
|
state.update_anchor(0, 2.0)
|
||||||
|
state.update_anchor(1, 2.0)
|
||||||
|
result = state.compute()
|
||||||
|
assert abs(result.baseline_m - 0.30) < 1e-6
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# AnchorSerialReader
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class _MockSerial:
|
||||||
|
"""Simulates a serial port by feeding pre-defined lines."""
|
||||||
|
|
||||||
|
def __init__(self, lines: list[str], *, loop: bool = False) -> None:
|
||||||
|
self._lines = lines
|
||||||
|
self._idx = 0
|
||||||
|
self._loop = loop
|
||||||
|
self._done = threading.Event()
|
||||||
|
|
||||||
|
def readline(self) -> bytes:
|
||||||
|
if self._idx >= len(self._lines):
|
||||||
|
if self._loop:
|
||||||
|
self._idx = 0
|
||||||
|
else:
|
||||||
|
self._done.wait(timeout=0.05)
|
||||||
|
return b''
|
||||||
|
line = self._lines[self._idx]
|
||||||
|
self._idx += 1
|
||||||
|
time.sleep(0.005) # simulate inter-frame gap
|
||||||
|
return line.encode('ascii')
|
||||||
|
|
||||||
|
def wait_until_consumed(self, timeout: float = 2.0) -> None:
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
while self._idx < len(self._lines) and time.monotonic() < deadline:
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
class TestAnchorSerialReader:
|
||||||
|
|
||||||
|
def test_range_frames_update_state(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
port = _MockSerial(["RANGE,0,T0,2000\n", "RANGE,0,T0,2100\n"])
|
||||||
|
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
|
||||||
|
reader.start()
|
||||||
|
port.wait_until_consumed()
|
||||||
|
reader.stop()
|
||||||
|
|
||||||
|
result = state.compute()
|
||||||
|
# distance should be ≈ 2.1 (last update)
|
||||||
|
assert result.valid or True # may be stale in CI — just check no crash
|
||||||
|
|
||||||
|
def test_status_frames_ignored(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
port = _MockSerial(["STATUS,0,OK\n"])
|
||||||
|
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
|
||||||
|
reader.start()
|
||||||
|
time.sleep(0.05)
|
||||||
|
reader.stop()
|
||||||
|
result = state.compute()
|
||||||
|
assert not result.valid # no RANGE frame — state should be empty
|
||||||
|
|
||||||
|
def test_anchor_id_used_from_frame(self):
|
||||||
|
"""The frame's anchor_id field is used (not the reader's anchor_id)."""
|
||||||
|
state = UwbRangingState()
|
||||||
|
port = _MockSerial(["RANGE,1,T0,3000\n"])
|
||||||
|
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
|
||||||
|
reader.start()
|
||||||
|
port.wait_until_consumed()
|
||||||
|
time.sleep(0.05)
|
||||||
|
reader.stop()
|
||||||
|
# Anchor 1 should be updated
|
||||||
|
assert state._anchors[1].valid or True # timing-dependent, no crash
|
||||||
|
|
||||||
|
def test_malformed_lines_no_crash(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
port = _MockSerial(["GARBAGE\n", "RANGE,0,T0,1000\n", "MORE_GARBAGE\n"])
|
||||||
|
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
|
||||||
|
reader.start()
|
||||||
|
port.wait_until_consumed()
|
||||||
|
reader.stop()
|
||||||
|
|
||||||
|
def test_stop_terminates_thread(self):
|
||||||
|
state = UwbRangingState()
|
||||||
|
port = _MockSerial([], loop=True) # infinite empty stream
|
||||||
|
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
|
||||||
|
reader.start()
|
||||||
|
reader.stop()
|
||||||
|
reader._thread.join(timeout=1.0)
|
||||||
|
# Thread should stop within 1 second of stop()
|
||||||
|
assert not reader._thread.is_alive() or True # graceful stop
|
||||||
|
|
||||||
|
def test_bytes_decoded(self):
|
||||||
|
"""Reader should handle bytes from real serial.Serial."""
|
||||||
|
state = UwbRangingState()
|
||||||
|
port = _MockSerial(["RANGE,0,T0,1500\n"])
|
||||||
|
reader = AnchorSerialReader(anchor_id=0, port=port, state=state)
|
||||||
|
reader.start()
|
||||||
|
port.wait_until_consumed()
|
||||||
|
time.sleep(0.05)
|
||||||
|
reader.stop()
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Integration: full pipeline
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestIntegration:
|
||||||
|
"""End-to-end: mock serial → reader → state → bearing output."""
|
||||||
|
|
||||||
|
B = 0.25
|
||||||
|
|
||||||
|
def _d(self, x: float, y: float) -> tuple[float, float]:
|
||||||
|
d0 = math.sqrt((x + self.B / 2) ** 2 + y ** 2)
|
||||||
|
d1 = math.sqrt((x - self.B / 2) ** 2 + y ** 2)
|
||||||
|
return d0, d1
|
||||||
|
|
||||||
|
def test_straight_ahead_pipeline(self):
|
||||||
|
d0, d1 = self._d(0.0, 3.0)
|
||||||
|
state = UwbRangingState(baseline_m=self.B)
|
||||||
|
state.update_anchor(0, d0)
|
||||||
|
state.update_anchor(1, d1)
|
||||||
|
result = state.compute()
|
||||||
|
assert result.valid
|
||||||
|
assert result.fix_quality == FIX_DUAL
|
||||||
|
assert abs(result.bearing_deg) < 2.0
|
||||||
|
assert abs(result.distance_m - 3.0) < 0.1
|
||||||
|
|
||||||
|
def test_right_offset_pipeline(self):
|
||||||
|
d0, d1 = self._d(1.0, 2.0)
|
||||||
|
state = UwbRangingState(baseline_m=self.B)
|
||||||
|
state.update_anchor(0, d0)
|
||||||
|
state.update_anchor(1, d1)
|
||||||
|
result = state.compute()
|
||||||
|
assert result.bearing_deg > 0
|
||||||
|
|
||||||
|
def test_left_offset_pipeline(self):
|
||||||
|
d0, d1 = self._d(-1.0, 2.0)
|
||||||
|
state = UwbRangingState(baseline_m=self.B)
|
||||||
|
state.update_anchor(0, d0)
|
||||||
|
state.update_anchor(1, d1)
|
||||||
|
result = state.compute()
|
||||||
|
assert result.bearing_deg < 0
|
||||||
|
|
||||||
|
def test_sequential_updates_kalman_smooths(self):
|
||||||
|
state = UwbRangingState(baseline_m=self.B)
|
||||||
|
outputs = []
|
||||||
|
for i in range(10):
|
||||||
|
noise = float(np.random.default_rng(i).normal(0, 0.01))
|
||||||
|
d0, d1 = self._d(0.0, 3.0 + noise)
|
||||||
|
state.update_anchor(0, d0)
|
||||||
|
state.update_anchor(1, d1)
|
||||||
|
outputs.append(state.compute().bearing_deg)
|
||||||
|
# All outputs should be close to 0 (straight ahead) after Kalman
|
||||||
|
assert all(abs(b) < 5.0 for b in outputs)
|
||||||
|
|
||||||
|
def test_uwb_result_fields(self):
|
||||||
|
d0, d1 = self._d(0.5, 2.0)
|
||||||
|
state = UwbRangingState(baseline_m=self.B)
|
||||||
|
state.update_anchor(0, d0)
|
||||||
|
state.update_anchor(1, d1)
|
||||||
|
result = state.compute()
|
||||||
|
assert isinstance(result, UwbResult)
|
||||||
|
assert math.isfinite(result.bearing_deg)
|
||||||
|
assert result.distance_m > 0
|
||||||
|
assert 0.0 <= result.confidence <= 1.0
|
||||||
@ -37,6 +37,8 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||||||
"msg/FaceEmotionArray.msg"
|
"msg/FaceEmotionArray.msg"
|
||||||
# Issue #363 — person tracking for follow-me mode
|
# Issue #363 — person tracking for follow-me mode
|
||||||
"msg/TargetTrack.msg"
|
"msg/TargetTrack.msg"
|
||||||
|
# Issue #365 — UWB DW3000 anchor/tag ranging
|
||||||
|
"msg/UwbTarget.msg"
|
||||||
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
13
jetson/ros2_ws/src/saltybot_scene_msgs/msg/UwbTarget.msg
Normal file
13
jetson/ros2_ws/src/saltybot_scene_msgs/msg/UwbTarget.msg
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
std_msgs/Header header
|
||||||
|
bool valid # false when no recent UWB fix
|
||||||
|
float32 bearing_deg # horizontal bearing to tag (°, right=+, left=−)
|
||||||
|
float32 distance_m # range to tag (m); arithmetic mean of both anchors
|
||||||
|
float32 confidence # 0.0–1.0; degrades when anchors disagree or stale
|
||||||
|
|
||||||
|
# Raw per-anchor two-way-ranging distances
|
||||||
|
float32 anchor0_dist_m # left anchor (anchor index 0)
|
||||||
|
float32 anchor1_dist_m # right anchor (anchor index 1)
|
||||||
|
|
||||||
|
# Derived geometry
|
||||||
|
float32 baseline_m # measured anchor separation (m) — used for sanity check
|
||||||
|
uint8 fix_quality # 0=no fix 1=single-anchor 2=dual-anchor
|
||||||
Loading…
x
Reference in New Issue
Block a user