Merge pull request 'feat: UWB follow-me system (#57)' (#66) from sl-jetson/uwb-follow-me into main

This commit is contained in:
seb 2026-03-01 00:51:20 -05:00
commit 64d411b48a
16 changed files with 1239 additions and 69 deletions

View File

@ -1,30 +1,42 @@
""" """
person_follower_node.py Proportional person-following control loop. person_follower_node.py -- Proportional person-following control loop.
Subscribes: Subscribes:
/person/target (geometry_msgs/PoseStamped) person pose in base_link frame /uwb/target (geometry_msgs/PoseStamped) -- UWB-triangulated position (primary)
/local_costmap/costmap (nav_msgs/OccupancyGrid) Nav2 local obstacle map /person/target (geometry_msgs/PoseStamped) -- camera-detected position (secondary)
/local_costmap/costmap (nav_msgs/OccupancyGrid) -- Nav2 local obstacle map
Publishes: Publishes:
/cmd_vel (geometry_msgs/Twist) velocity commands /cmd_vel (geometry_msgs/Twist) -- velocity commands
Sensor fusion
-------------
When both UWB and camera sources are fresh:
fused = uwb_weight * uwb + (1 - uwb_weight) * camera
When only UWB is fresh: use UWB directly.
When only camera is fresh: use camera directly.
When neither is fresh: STOPPING -> SEARCHING.
UWB is declared stale after uwb_timeout seconds (default 1.0s).
Camera is declared stale after lost_timeout seconds (default 2.0s).
State machine State machine
-------------
FOLLOWING person visible (last detection < lost_timeout ago) FOLLOWING -- at least one source fresh (last detection < timeout ago)
STOPPING person lost, publishing zero until search_timeout STOPPING -- all sources lost, publishing zero until search_timeout
SEARCHING person lost > search_timeout, slow rotation to re-acquire SEARCHING -- all sources lost > search_timeout, slow rotation to re-acquire
Safety wiring Safety wiring
-------------
cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate — * cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate --
this node publishes raw /cmd_vel, the bridge handles hardware safety. this node publishes raw /cmd_vel, the bridge handles hardware safety.
follow_enabled param (default True) lets the operator disable the controller * follow_enabled param (default True) lets the operator disable the controller
at runtime: ros2 param set /person_follower follow_enabled false at runtime: ros2 param set /person_follower follow_enabled false
Obstacle override: if Nav2 local costmap shows a lethal cell in the forward * Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
corridor, forward cmd_vel is zeroed; turning is still allowed. corridor, forward cmd_vel is zeroed; turning is still allowed.
Usage Usage
-----
ros2 launch saltybot_follower person_follower.launch.py ros2 launch saltybot_follower person_follower.launch.py
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2 ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
""" """
@ -37,13 +49,13 @@ from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, Twist from geometry_msgs.msg import PoseStamped, Twist
from nav_msgs.msg import OccupancyGrid from nav_msgs.msg import OccupancyGrid
# ── State constants ──────────────────────────────────────────────────────────── # -- State constants -----------------------------------------------------------
_FOLLOWING = "following" _FOLLOWING = "following"
_STOPPING = "stopping" # person lost < search_timeout, publishing zero _STOPPING = "stopping" # all sources lost < search_timeout, publishing zero
_SEARCHING = "searching" # person lost > search_timeout, slow rotation _SEARCHING = "searching" # all sources lost > search_timeout, slow rotation
# ── Pure helper functions (also exercised by unit tests) ────────────────────── # -- Pure helper functions (also exercised by unit tests) ----------------------
def clamp(v: float, lo: float, hi: float) -> float: def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v)) return max(lo, min(hi, v))
@ -58,6 +70,40 @@ def compute_bearing(px: float, py: float) -> float:
return math.atan2(py, px) return math.atan2(py, px)
def fuse_targets(
uwb_x: float,
uwb_y: float,
uwb_fresh: bool,
cam_x: float,
cam_y: float,
cam_fresh: bool,
uwb_weight: float,
) -> tuple:
"""
Fuse UWB (primary) and camera (secondary) position estimates.
Returns (fused_x, fused_y, any_fresh).
Fusion rules:
Both fresh -> weighted blend: uwb_weight * uwb + (1-uwb_weight) * camera
UWB only -> UWB position, full weight
Camera only -> camera position, full weight
Neither -> (0, 0, False)
"""
if uwb_fresh and cam_fresh:
w = clamp(uwb_weight, 0.0, 1.0)
return (
w * uwb_x + (1.0 - w) * cam_x,
w * uwb_y + (1.0 - w) * cam_y,
True,
)
if uwb_fresh:
return uwb_x, uwb_y, True
if cam_fresh:
return cam_x, cam_y, True
return 0.0, 0.0, False
def compute_follow_cmd( def compute_follow_cmd(
px: float, px: float,
py: float, py: float,
@ -75,30 +121,27 @@ def compute_follow_cmd(
Returns (linear_x, angular_z) as floats. Returns (linear_x, angular_z) as floats.
linear_x: linear_x:
Proportional to signed distance error (distance follow_distance). Proportional to signed distance error (distance - follow_distance).
Zero inside the dead zone. Clamped to ±max_linear_vel. Zero inside the dead zone. Clamped to +-max_linear_vel.
Zeroed (not reversed) if obstacle_ahead and moving forward. Zeroed (not reversed) if obstacle_ahead and moving forward.
angular_z: angular_z:
Proportional to bearing. Always corrects even when inside the dead zone, Proportional to bearing. Always corrects even when inside the dead zone,
so the robot stays pointed at the person while holding distance. so the robot stays pointed at the person while holding distance.
Clamped to ±max_angular_vel. Clamped to +-max_angular_vel.
""" """
distance = compute_distance(px, py) distance = compute_distance(px, py)
bearing = compute_bearing(px, py) bearing = compute_bearing(px, py)
# Linear — dead zone suppresses jitter at target distance
dist_error = distance - follow_distance dist_error = distance - follow_distance
if abs(dist_error) > dead_zone: if abs(dist_error) > dead_zone:
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel) linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
else: else:
linear_x = 0.0 linear_x = 0.0
# Obstacle override: suppress forward motion, allow turning
if obstacle_ahead and linear_x > 0.0: if obstacle_ahead and linear_x > 0.0:
linear_x = 0.0 linear_x = 0.0
# Angular — always correct bearing
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel) angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
return linear_x, angular_z return linear_x, angular_z
@ -116,7 +159,7 @@ def check_obstacle_in_costmap(
""" """
Check for obstacle cells in the robot's forward corridor. Check for obstacle cells in the robot's forward corridor.
Assumes a Nav2 rolling-window local costmap (robot centre cell). Assumes a Nav2 rolling-window local costmap (robot ~= centre cell).
Forward = +col direction (robot +X). Forward = +col direction (robot +X).
costmap_data : flat row-major int8 list (from OccupancyGrid.data) costmap_data : flat row-major int8 list (from OccupancyGrid.data)
@ -124,7 +167,7 @@ def check_obstacle_in_costmap(
resolution : metres per cell resolution : metres per cell
check_dist : how far ahead to look (metres) check_dist : how far ahead to look (metres)
robot_half_width: corridor half-width (metres) robot_half_width: corridor half-width (metres)
threshold : costmap value this is treated as obstacle (Nav2 lethal=99) threshold : costmap value >= this is treated as obstacle (Nav2 lethal=99)
""" """
if resolution <= 0.0: if resolution <= 0.0:
return False return False
@ -132,8 +175,8 @@ def check_obstacle_in_costmap(
cx = width // 2 cx = width // 2
cy = height // 2 cy = height // 2
n_fwd = int(check_dist / resolution) n_fwd = int(check_dist / resolution)
n_side = int(robot_half_width / resolution) n_side = int(robot_half_width / resolution)
for dx in range(1, n_fwd + 1): for dx in range(1, n_fwd + 1):
col = cx + dx col = cx + dx
@ -147,14 +190,14 @@ def check_obstacle_in_costmap(
return False return False
# ── ROS2 Node ───────────────────────────────────────────────────────────────── # -- ROS2 Node ----------------------------------------------------------------
class PersonFollowerNode(Node): class PersonFollowerNode(Node):
def __init__(self): def __init__(self):
super().__init__("person_follower") super().__init__("person_follower")
# ── Parameters ──────────────────────────────────────────────────────── # -- Parameters --------------------------------------------------------
self.declare_parameter("follow_distance", 1.5) self.declare_parameter("follow_distance", 1.5)
self.declare_parameter("dead_zone", 0.3) self.declare_parameter("dead_zone", 0.3)
self.declare_parameter("kp_linear", 0.6) self.declare_parameter("kp_linear", 0.6)
@ -169,38 +212,48 @@ class PersonFollowerNode(Node):
self.declare_parameter("robot_width", 0.3) self.declare_parameter("robot_width", 0.3)
self.declare_parameter("control_rate", 20.0) self.declare_parameter("control_rate", 20.0)
self.declare_parameter("follow_enabled", True) self.declare_parameter("follow_enabled", True)
# UWB fusion
self.declare_parameter("uwb_weight", 0.7)
self.declare_parameter("uwb_timeout", 1.0)
self._p = {} self._p = {}
self._reload_params() self._reload_params()
# ── State ───────────────────────────────────────────────────────────── # -- State -------------------------------------------------------------
self._state = _STOPPING self._state = _STOPPING
self._last_target_t = 0.0 # monotonic clock; 0 = never received self._last_target_t = 0.0 # camera; monotonic; 0 = never received
self._person_x = 0.0 self._person_x = 0.0
self._person_y = 0.0 self._person_y = 0.0
self._last_uwb_t = 0.0 # UWB; monotonic; 0 = never received
self._uwb_x = 0.0
self._uwb_y = 0.0
self._obstacle_ahead = False self._obstacle_ahead = False
# ── Subscriptions ───────────────────────────────────────────────────── # -- Subscriptions -----------------------------------------------------
self.create_subscription(
PoseStamped, "/uwb/target", self._uwb_cb, 10)
self.create_subscription( self.create_subscription(
PoseStamped, "/person/target", self._target_cb, 10) PoseStamped, "/person/target", self._target_cb, 10)
self.create_subscription( self.create_subscription(
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1) OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
# ── Publisher ───────────────────────────────────────────────────────── # -- Publisher ---------------------------------------------------------
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10) self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
# ── Control timer ───────────────────────────────────────────────────── # -- Control timer -----------------------------------------------------
self._timer = self.create_timer( self._timer = self.create_timer(
1.0 / self._p["control_rate"], self._control_cb) 1.0 / self._p["control_rate"], self._control_cb)
self.get_logger().info( self.get_logger().info(
f"PersonFollower ready follow_dist={self._p['follow_distance']}m " "PersonFollower ready "
f"dead_zone=±{self._p['dead_zone']}m " f"follow_dist={self._p['follow_distance']}m "
f"dead_zone=+/-{self._p['dead_zone']}m "
f"max_vel={self._p['max_linear_vel']}m/s " f"max_vel={self._p['max_linear_vel']}m/s "
f"uwb_weight={self._p['uwb_weight']} "
f"lost_timeout={self._p['lost_timeout']}s" f"lost_timeout={self._p['lost_timeout']}s"
) )
# ── Parameter helper ────────────────────────────────────────────────────── # -- Parameter helper ------------------------------------------------------
def _reload_params(self): def _reload_params(self):
self._p = { self._p = {
@ -218,12 +271,21 @@ class PersonFollowerNode(Node):
"robot_width": self.get_parameter("robot_width").value, "robot_width": self.get_parameter("robot_width").value,
"control_rate": self.get_parameter("control_rate").value, "control_rate": self.get_parameter("control_rate").value,
"follow_enabled": self.get_parameter("follow_enabled").value, "follow_enabled": self.get_parameter("follow_enabled").value,
"uwb_weight": self.get_parameter("uwb_weight").value,
"uwb_timeout": self.get_parameter("uwb_timeout").value,
} }
# ── Callbacks ───────────────────────────────────────────────────────────── # -- Callbacks -------------------------------------------------------------
def _uwb_cb(self, msg):
"""UWB-triangulated person position (primary source)."""
self._uwb_x = msg.pose.position.x
self._uwb_y = msg.pose.position.y
self._last_uwb_t = time.monotonic()
self._state = _FOLLOWING
def _target_cb(self, msg): def _target_cb(self, msg):
"""Person position in base_link frame.""" """Camera-detected person position (secondary source)."""
self._person_x = msg.pose.position.x self._person_x = msg.pose.position.x
self._person_y = msg.pose.position.y self._person_y = msg.pose.position.y
self._last_target_t = time.monotonic() self._last_target_t = time.monotonic()
@ -240,10 +302,9 @@ class PersonFollowerNode(Node):
threshold=self._p["obstacle_threshold"], threshold=self._p["obstacle_threshold"],
) )
# ── Control loop ────────────────────────────────────────────────────────── # -- Control loop ----------------------------------------------------------
def _control_cb(self): def _control_cb(self):
# Re-read mutable params every tick (cheap dict lookup)
self._reload_params() self._reload_params()
twist = Twist() twist = Twist()
@ -252,51 +313,69 @@ class PersonFollowerNode(Node):
self._cmd_pub.publish(twist) self._cmd_pub.publish(twist)
return return
now = time.monotonic() now = time.monotonic()
dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
# State transitions (don't transition back from STOPPING/SEARCHING uwb_fresh = (
# here — _target_cb resets to FOLLOWING on new detection) self._last_uwb_t > 0.0 and
(now - self._last_uwb_t) <= self._p["uwb_timeout"]
)
cam_fresh = (
self._last_target_t > 0.0 and
(now - self._last_target_t) <= self._p["lost_timeout"]
)
any_fresh = uwb_fresh or cam_fresh
if self._state == _FOLLOWING: if self._state == _FOLLOWING:
if dt_lost > self._p["search_timeout"]: if not any_fresh:
self._state = _SEARCHING uwb_stale = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else 1e9
self.get_logger().warn("Person lost — entering SEARCHING state") cam_stale = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
elif dt_lost > self._p["lost_timeout"]: dt_lost = min(uwb_stale, cam_stale)
self._state = _STOPPING if dt_lost > self._p["search_timeout"]:
self.get_logger().warn("Person lost — stopping") self._state = _SEARCHING
self.get_logger().warn("Person lost -- entering SEARCHING state")
else:
self._state = _STOPPING
self.get_logger().warn("Person lost -- stopping")
linear_x, angular_z = 0.0, 0.0 linear_x, angular_z = 0.0, 0.0
if self._state == _FOLLOWING: if self._state == _FOLLOWING:
linear_x, angular_z = compute_follow_cmd( fx, fy, fresh = fuse_targets(
px=self._person_x, uwb_x=self._uwb_x,
py=self._person_y, uwb_y=self._uwb_y,
follow_distance=self._p["follow_distance"], uwb_fresh=uwb_fresh,
dead_zone=self._p["dead_zone"], cam_x=self._person_x,
kp_linear=self._p["kp_linear"], cam_y=self._person_y,
kp_angular=self._p["kp_angular"], cam_fresh=cam_fresh,
max_linear_vel=self._p["max_linear_vel"], uwb_weight=self._p["uwb_weight"],
max_angular_vel=self._p["max_angular_vel"],
obstacle_ahead=self._obstacle_ahead,
) )
if self._obstacle_ahead and linear_x == 0.0: if fresh:
self.get_logger().warn( linear_x, angular_z = compute_follow_cmd(
"Obstacle in path — forward motion suppressed", px=fx,
throttle_duration_sec=2.0, py=fy,
follow_distance=self._p["follow_distance"],
dead_zone=self._p["dead_zone"],
kp_linear=self._p["kp_linear"],
kp_angular=self._p["kp_angular"],
max_linear_vel=self._p["max_linear_vel"],
max_angular_vel=self._p["max_angular_vel"],
obstacle_ahead=self._obstacle_ahead,
) )
if self._obstacle_ahead and linear_x == 0.0:
self.get_logger().warn(
"Obstacle in path -- forward motion suppressed",
throttle_duration_sec=2.0,
)
elif self._state == _SEARCHING: elif self._state == _SEARCHING:
angular_z = float(self._p["search_angular_vel"]) angular_z = float(self._p["search_angular_vel"])
# linear_x stays 0 — rotate in place to search
# _STOPPING: both stay 0
twist.linear.x = linear_x twist.linear.x = linear_x
twist.angular.z = angular_z twist.angular.z = angular_z
self._cmd_pub.publish(twist) self._cmd_pub.publish(twist)
# ── Entry point ─────────────────────────────────────────────────────────────── # -- Entry point ---------------------------------------------------------------
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)

View File

@ -388,3 +388,98 @@ class TestFollowEnabled:
lin, ang = (0.0, 0.0) if not follow_enabled else \ lin, ang = (0.0, 0.0) if not follow_enabled else \
_compute_follow_cmd(3.0, 0.0, **_DEF) _compute_follow_cmd(3.0, 0.0, **_DEF)
assert lin != pytest.approx(0.0) assert lin != pytest.approx(0.0)
# ── fuse_targets — UWB + camera fusion ──────────────────────────────────────
def _fuse_targets(uwb_x, uwb_y, uwb_fresh, cam_x, cam_y, cam_fresh, uwb_weight):
"""Mirror of person_follower_node.fuse_targets (no ROS2)."""
def clamp(v, lo, hi): return max(lo, min(hi, v))
if uwb_fresh and cam_fresh:
w = clamp(uwb_weight, 0.0, 1.0)
return w * uwb_x + (1.0 - w) * cam_x, w * uwb_y + (1.0 - w) * cam_y, True
if uwb_fresh:
return uwb_x, uwb_y, True
if cam_fresh:
return cam_x, cam_y, True
return 0.0, 0.0, False
class TestFuseTargets:
def test_both_fresh_blends(self):
"""Both sources fresh → weighted blend."""
fx, fy, ok = _fuse_targets(
uwb_x=4.0, uwb_y=0.0, uwb_fresh=True,
cam_x=2.0, cam_y=0.0, cam_fresh=True,
uwb_weight=0.7,
)
assert ok is True
assert fx == pytest.approx(0.7 * 4.0 + 0.3 * 2.0)
assert fy == pytest.approx(0.0)
def test_uwb_only(self):
"""Only UWB fresh → UWB position returned as-is."""
fx, fy, ok = _fuse_targets(
uwb_x=3.0, uwb_y=1.0, uwb_fresh=True,
cam_x=0.0, cam_y=0.0, cam_fresh=False,
uwb_weight=0.7,
)
assert ok is True
assert fx == pytest.approx(3.0)
assert fy == pytest.approx(1.0)
def test_camera_only(self):
"""Only camera fresh → camera position returned as-is."""
fx, fy, ok = _fuse_targets(
uwb_x=0.0, uwb_y=0.0, uwb_fresh=False,
cam_x=2.5, cam_y=-0.5, cam_fresh=True,
uwb_weight=0.7,
)
assert ok is True
assert fx == pytest.approx(2.5)
assert fy == pytest.approx(-0.5)
def test_neither_fresh(self):
"""Both stale → (0, 0, False)."""
fx, fy, ok = _fuse_targets(
uwb_x=3.0, uwb_y=1.0, uwb_fresh=False,
cam_x=2.0, cam_y=0.0, cam_fresh=False,
uwb_weight=0.7,
)
assert ok is False
assert fx == pytest.approx(0.0)
assert fy == pytest.approx(0.0)
def test_uwb_weight_zero_gives_camera(self):
"""uwb_weight=0.0 → pure camera even when both fresh."""
fx, fy, ok = _fuse_targets(
uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
cam_x=1.0, cam_y=0.5, cam_fresh=True,
uwb_weight=0.0,
)
assert ok is True
assert fx == pytest.approx(1.0)
assert fy == pytest.approx(0.5)
def test_uwb_weight_one_gives_uwb(self):
"""uwb_weight=1.0 → pure UWB even when both fresh."""
fx, fy, ok = _fuse_targets(
uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
cam_x=1.0, cam_y=0.5, cam_fresh=True,
uwb_weight=1.0,
)
assert ok is True
assert fx == pytest.approx(4.0)
assert fy == pytest.approx(2.0)
def test_lateral_blend(self):
"""Lateral (Y) component is also blended correctly."""
fx, fy, ok = _fuse_targets(
uwb_x=2.0, uwb_y=1.0, uwb_fresh=True,
cam_x=2.0, cam_y=-1.0, cam_fresh=True,
uwb_weight=0.5,
)
assert ok is True
assert fx == pytest.approx(2.0)
assert fy == pytest.approx(0.0) # 0.5*1.0 + 0.5*(-1.0) = 0

View File

@ -0,0 +1,57 @@
# uwb_config.yaml — MaUWB ESP32-S3 DW3000 UWB follow-me system
#
# Hardware layout:
# Anchor-0 (port side) → USB port_a, y = +anchor_separation/2
# Anchor-1 (starboard side) → USB port_b, y = -anchor_separation/2
# Tag on person → belt clip, ~0.9m above ground
#
# Run with:
# ros2 launch saltybot_uwb uwb.launch.py
# Override at launch:
# ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2
# ── Serial ports ──────────────────────────────────────────────────────────────
# Set udev rules to get stable symlinks:
# /dev/uwb-anchor0 → port_a
# /dev/uwb-anchor1 → port_b
# (See jetson/docs/pinout.md for udev setup)
port_a: /dev/uwb-anchor0 # Anchor-0 (port)
port_b: /dev/uwb-anchor1 # Anchor-1 (starboard)
baudrate: 115200 # MaUWB default — do not change
# ── Anchor geometry ────────────────────────────────────────────────────────────
# anchor_separation: centre-to-centre distance between anchors (metres)
# Must match physical mounting. Larger = more accurate lateral resolution.
anchor_separation: 0.25 # metres (25cm)
# anchor_height: height of anchors above ground (metres)
# Orin stem mount ≈ 0.80m on the saltybot platform
anchor_height: 0.80 # metres
# tag_height: height of person's belt-clip tag above ground (metres)
tag_height: 0.90 # metres (adjust per user)
# ── Range validity ─────────────────────────────────────────────────────────────
# range_timeout_s: stale anchor — excluded from triangulation after this gap
range_timeout_s: 1.0 # seconds
# max_range_m: discard ranges beyond this (DW3000 indoor practical limit ≈8m)
max_range_m: 8.0 # metres
# min_range_m: discard ranges below this (likely multipath artefacts)
min_range_m: 0.05 # metres
# ── Kalman filter ──────────────────────────────────────────────────────────────
# kf_process_noise: Q scalar — how dynamic the person's motion is
# Higher → faster response, more jitter
kf_process_noise: 0.1
# kf_meas_noise: R scalar — how noisy the UWB ranging is
# DW3000 indoor accuracy ≈ 10cm RMS → 0.1m → R ≈ 0.01
# Use 0.3 to be conservative on first deployment
kf_meas_noise: 0.3
# ── Publish rate ──────────────────────────────────────────────────────────────
# Should match or exceed the AT+RANGE? poll rate from both anchors.
# 20Hz means 50ms per cycle; each anchor query takes ~10ms → headroom ok.
publish_rate: 20.0 # Hz

View File

@ -0,0 +1,59 @@
"""
uwb.launch.py Launch UWB driver node for MaUWB ESP32-S3 follow-me.
Usage:
ros2 launch saltybot_uwb uwb.launch.py
ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
ros2 launch saltybot_uwb uwb.launch.py anchor_separation:=0.30 publish_rate:=10.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_uwb")
cfg = os.path.join(pkg, "config", "uwb_config.yaml")
return LaunchDescription([
DeclareLaunchArgument("port_a", default_value="/dev/uwb-anchor0"),
DeclareLaunchArgument("port_b", default_value="/dev/uwb-anchor1"),
DeclareLaunchArgument("baudrate", default_value="115200"),
DeclareLaunchArgument("anchor_separation", default_value="0.25"),
DeclareLaunchArgument("anchor_height", default_value="0.80"),
DeclareLaunchArgument("tag_height", default_value="0.90"),
DeclareLaunchArgument("range_timeout_s", default_value="1.0"),
DeclareLaunchArgument("max_range_m", default_value="8.0"),
DeclareLaunchArgument("min_range_m", default_value="0.05"),
DeclareLaunchArgument("kf_process_noise", default_value="0.1"),
DeclareLaunchArgument("kf_meas_noise", default_value="0.3"),
DeclareLaunchArgument("publish_rate", default_value="20.0"),
Node(
package="saltybot_uwb",
executable="uwb_driver",
name="uwb_driver",
output="screen",
parameters=[
cfg,
{
"port_a": LaunchConfiguration("port_a"),
"port_b": LaunchConfiguration("port_b"),
"baudrate": LaunchConfiguration("baudrate"),
"anchor_separation": LaunchConfiguration("anchor_separation"),
"anchor_height": LaunchConfiguration("anchor_height"),
"tag_height": LaunchConfiguration("tag_height"),
"range_timeout_s": LaunchConfiguration("range_timeout_s"),
"max_range_m": LaunchConfiguration("max_range_m"),
"min_range_m": LaunchConfiguration("min_range_m"),
"kf_process_noise": LaunchConfiguration("kf_process_noise"),
"kf_meas_noise": LaunchConfiguration("kf_meas_noise"),
"publish_rate": LaunchConfiguration("publish_rate"),
},
],
),
])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb</name>
<version>0.1.0</version>
<description>
UWB follow-me driver for saltybot — MaUWB ESP32-S3 DW3000 two-anchor
ranging, triangulation, Kalman filtering, and /uwb/target PoseStamped publisher.
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>saltybot_uwb_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,278 @@
"""
ranging_math.py Pure triangulation and Kalman filter helpers for UWB follow-me.
No ROS2 dependencies importable in unit tests without a ROS2 install.
2-Anchor geometry
Anchors are mounted on the robot stem, separated by `sep` metres along the Y-axis:
A0 (0, +sep/2, anchor_z) port side
A1 (0, -sep/2, anchor_z) starboard side
The person tag is at (x_t, y_t, tag_z) in base_link.
Step 1 height compensation:
dz = anchor_z - tag_z
r0_h = sqrt(max(0, r0² - dz²))
r1_h = sqrt(max(0, r1² - dz²))
Step 2 lateral offset (Y):
y_t = (r1_h² - r0_h²) / (2 * sep)
Step 3 forward distance (X):
x_t = sqrt(max(0, r0_h² - (y_t - sep/2)²))
Returns (x_t, y_t); caller should treat negative x_t as 0.
"""
import math
# ── Triangulation ─────────────────────────────────────────────────────────────
def triangulate_2anchor(
r0: float,
r1: float,
sep: float,
anchor_z: float = 0.0,
tag_z: float = 0.0,
) -> tuple:
"""
Triangulate person position in base_link from two anchor ranges.
Parameters
----------
r0 : range to anchor-0 (port, y = +sep/2) in metres
r1 : range to anchor-1 (starboard, y = -sep/2) in metres
sep : anchor separation in metres (centre-to-centre)
anchor_z : height of anchors above ground (metres, default 0)
tag_z : height of person tag above ground (metres, default 0)
Returns
-------
(x_t, y_t) : person position in metres (base_link X = forward, Y = left)
x_t is clamped to 0 (can't be behind anchors geometrically).
"""
if sep <= 0.0:
raise ValueError(f"sep must be > 0, got {sep}")
if r0 < 0.0 or r1 < 0.0:
raise ValueError(f"ranges must be ≥ 0, got r0={r0}, r1={r1}")
# Height correction — bring to horizontal plane
dz = anchor_z - tag_z
dz2 = dz * dz
r0_h2 = max(0.0, r0 * r0 - dz2)
r1_h2 = max(0.0, r1 * r1 - dz2)
r0_h = math.sqrt(r0_h2)
# Lateral (Y) offset — from circle-circle intersection formula
y_t = (r1_h2 - r0_h2) / (2.0 * sep)
# Forward (X) distance
dx2 = r0_h2 - (y_t - sep / 2.0) ** 2
x_t = math.sqrt(max(0.0, dx2))
return x_t, y_t
# ── Simple 2-D Kalman filter ──────────────────────────────────────────────────
class KalmanFilter2D:
"""
Constant-velocity Kalman filter for a 2-D position estimate.
State vector: [x, y, vx, vy]
Parameters
----------
process_noise : Q scalar how much we trust the motion model
measurement_noise : R scalar how noisy the UWB measurement is
dt : expected update interval in seconds (used for F matrix)
"""
def __init__(
self,
process_noise: float = 0.1,
measurement_noise: float = 0.3,
dt: float = 0.05,
):
self._q = process_noise
self._r = measurement_noise
self._dt = dt
# State: [x, y, vx, vy]
self._x = [0.0, 0.0, 0.0, 0.0]
# Covariance matrix P (4×4, stored as flat list, row-major)
init_pos_var = 1.0
init_vel_var = 0.5
self._P = [
init_pos_var, 0.0, 0.0, 0.0,
0.0, init_pos_var, 0.0, 0.0,
0.0, 0.0, init_vel_var, 0.0,
0.0, 0.0, 0.0, init_vel_var,
]
self._initialised = False
# ── Matrix helpers (minimal, no numpy — importable without GPU deps) ──────
@staticmethod
def _mat_add(A, B):
return [a + b for a, b in zip(A, B)]
@staticmethod
def _mat_sub(A, B):
return [a - b for a, b in zip(A, B)]
@staticmethod
def _mat_mul(A, B, n):
"""n×n matrix multiply."""
C = [0.0] * (n * n)
for i in range(n):
for k in range(n):
for j in range(n):
C[i * n + j] += A[i * n + k] * B[k * n + j]
return C
@staticmethod
def _mat_T(A, n):
"""n×n transpose."""
T = [0.0] * (n * n)
for i in range(n):
for j in range(n):
T[j * n + i] = A[i * n + j]
return T
def _build_F(self):
dt = self._dt
return [
1.0, 0.0, dt, 0.0,
0.0, 1.0, 0.0, dt,
0.0, 0.0, 1.0, 0.0,
0.0, 0.0, 0.0, 1.0,
]
def _build_Q(self):
q = self._q
dt = self._dt
dt2 = dt * dt
dt3 = dt2 * dt
dt4 = dt3 * dt
# Continuous white-noise model discretised for constant-velocity
return [
q * dt4 / 4, 0.0, q * dt3 / 2, 0.0,
0.0, q * dt4 / 4, 0.0, q * dt3 / 2,
q * dt3 / 2, 0.0, q * dt2, 0.0,
0.0, q * dt3 / 2, 0.0, q * dt2,
]
def predict(self, dt: float = None):
"""Predict step — advances state by dt (defaults to self._dt)."""
if dt is not None:
self._dt = dt
F = self._build_F()
Q = self._build_Q()
n = 4
x = self._x
# x = F @ x
new_x = [
F[0] * x[0] + F[1] * x[1] + F[2] * x[2] + F[3] * x[3],
F[4] * x[0] + F[5] * x[1] + F[6] * x[2] + F[7] * x[3],
F[8] * x[0] + F[9] * x[1] + F[10] * x[2] + F[11] * x[3],
F[12] * x[0] + F[13] * x[1] + F[14] * x[2] + F[15] * x[3],
]
# P = F @ P @ F.T + Q
FP = self._mat_mul(F, self._P, n)
FT = self._mat_T(F, n)
FPF = self._mat_mul(FP, FT, n)
self._P = self._mat_add(FPF, Q)
self._x = new_x
def update(self, x_meas: float, y_meas: float):
"""
Update step with a 2-D position measurement.
H = [[1,0,0,0],[0,1,0,0]] (observe position only)
"""
if not self._initialised:
self._x[0] = x_meas
self._x[1] = y_meas
self._initialised = True
return
# Innovation
innov_x = x_meas - self._x[0]
innov_y = y_meas - self._x[1]
# S = H @ P @ H.T + R (2×2, position rows/cols only)
P = self._P
r = self._r
# H picks rows 0,1 and cols 0,1 from P
S00 = P[0] + r
S01 = P[1]
S10 = P[4]
S11 = P[5] + r
# K = P @ H.T @ S⁻¹ (4×2 Kalman gain)
det = S00 * S11 - S01 * S10
if abs(det) < 1e-12:
return
inv_det = 1.0 / det
# S⁻¹
Si00 = S11 * inv_det
Si01 = -S01 * inv_det
Si10 = -S10 * inv_det
Si11 = S00 * inv_det
# P @ H.T = first two columns of P (H.T selects cols 0,1)
PH = [P[0], P[1], P[4], P[5], P[8], P[9], P[12], P[13]] # 4×2
# K (4×2) = PH @ Si
K = [
PH[0] * Si00 + PH[1] * Si10, PH[0] * Si01 + PH[1] * Si11,
PH[2] * Si00 + PH[3] * Si10, PH[2] * Si01 + PH[3] * Si11,
PH[4] * Si00 + PH[5] * Si10, PH[4] * Si01 + PH[5] * Si11,
PH[6] * Si00 + PH[7] * Si10, PH[6] * Si01 + PH[7] * Si11,
]
# x = x + K @ innov
self._x[0] += K[0] * innov_x + K[1] * innov_y
self._x[1] += K[2] * innov_x + K[3] * innov_y
self._x[2] += K[4] * innov_x + K[5] * innov_y
self._x[3] += K[6] * innov_x + K[7] * innov_y
# P = (I - K @ H) @ P (Joseph form for numerical stability skipped —
# this is a low-latency embedded context where simplicity wins)
n = 4
# K @ H selects first two columns → rows of P
KH = [0.0] * 16
for row in range(4):
KH[row * 4 + 0] = K[row * 2 + 0]
KH[row * 4 + 1] = K[row * 2 + 1]
# I - KH
IKH = [-KH[i] for i in range(16)]
for i in range(4):
IKH[i * 4 + i] += 1.0
self._P = self._mat_mul(IKH, self._P, n)
def position(self) -> tuple:
"""Return current (x, y) position estimate."""
return self._x[0], self._x[1]
def velocity(self) -> tuple:
"""Return current (vx, vy) velocity estimate."""
return self._x[2], self._x[3]
def reset(self):
"""Reset filter to uninitialised state."""
self._x = [0.0, 0.0, 0.0, 0.0]
init_pos_var = 1.0
init_vel_var = 0.5
self._P = [
init_pos_var, 0.0, 0.0, 0.0,
0.0, init_pos_var, 0.0, 0.0,
0.0, 0.0, init_vel_var, 0.0,
0.0, 0.0, 0.0, init_vel_var,
]
self._initialised = False

View File

@ -0,0 +1,306 @@
"""
uwb_driver_node.py ROS2 node for MaUWB ESP32-S3 DW3000 follow-me system.
Hardware
2× MaUWB ESP32-S3 DW3000 anchors on robot stem (USB Orin Nano)
- Anchor-0: port side (y = +sep/2)
- Anchor-1: starboard (y = -sep/2)
1× MaUWB tag on person (belt clip)
AT command interface (115200 8N1)
Query: AT+RANGE?\r\n
Response (from anchors):
+RANGE:<anchor_id>,<range_mm>[,<rssi>]\r\n
Config:
AT+anchor_tag=ANCHOR\r\n set module as anchor
AT+anchor_tag=TAG\r\n set module as tag
Publishes
/uwb/target (geometry_msgs/PoseStamped) triangulated person position in base_link
/uwb/ranges (saltybot_uwb_msgs/UwbRangeArray) raw ranges from both anchors
Safety
If a range is stale (> range_timeout_s), that anchor is excluded from
triangulation. If both anchors are stale, /uwb/target is not published.
Usage
ros2 launch saltybot_uwb uwb.launch.py
ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
"""
import math
import re
import threading
import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Header
from saltybot_uwb_msgs.msg import UwbRange, UwbRangeArray
from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
try:
import serial
_SERIAL_AVAILABLE = True
except ImportError:
_SERIAL_AVAILABLE = False
# Regex: +RANGE:<id>,<mm> or +RANGE:<id>,<mm>,<rssi>
_RANGE_RE = re.compile(
r"\+RANGE\s*:\s*(\d+)\s*,\s*(\d+)(?:\s*,\s*(-?[\d.]+))?",
re.IGNORECASE,
)
class SerialReader(threading.Thread):
"""
Background thread: polls an anchor's UART, fires callback on every
valid +RANGE response.
"""
def __init__(self, port, baudrate, anchor_id, callback, logger):
super().__init__(daemon=True)
self._port = port
self._baudrate = baudrate
self._anchor_id = anchor_id
self._callback = callback
self._logger = logger
self._running = False
self._ser = None
def run(self):
self._running = True
while self._running:
try:
self._ser = serial.Serial(
self._port, self._baudrate, timeout=0.5
)
self._logger.info(
f"Anchor-{self._anchor_id}: opened {self._port}"
)
self._read_loop()
except Exception as exc:
self._logger.warn(
f"Anchor-{self._anchor_id} serial error: {exc} — retrying in 2s"
)
if self._ser and self._ser.is_open:
self._ser.close()
time.sleep(2.0)
def _read_loop(self):
while self._running:
try:
# Query the anchor
self._ser.write(b"AT+RANGE?\r\n")
# Read up to 10 lines waiting for a +RANGE response
for _ in range(10):
raw = self._ser.readline()
if not raw:
break
line = raw.decode("ascii", errors="replace").strip()
m = _RANGE_RE.search(line)
if m:
range_mm = int(m.group(2))
rssi = float(m.group(3)) if m.group(3) else 0.0
self._callback(self._anchor_id, range_mm, rssi)
break
except Exception as exc:
self._logger.warn(
f"Anchor-{self._anchor_id} read error: {exc}"
)
break # trigger reconnect
def stop(self):
self._running = False
if self._ser and self._ser.is_open:
self._ser.close()
class UwbDriverNode(Node):
def __init__(self):
super().__init__("uwb_driver")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("port_a", "/dev/ttyUSB0")
self.declare_parameter("port_b", "/dev/ttyUSB1")
self.declare_parameter("baudrate", 115200)
self.declare_parameter("anchor_separation", 0.25)
self.declare_parameter("anchor_height", 0.80)
self.declare_parameter("tag_height", 0.90)
self.declare_parameter("range_timeout_s", 1.0)
self.declare_parameter("max_range_m", 8.0)
self.declare_parameter("min_range_m", 0.05)
self.declare_parameter("kf_process_noise", 0.1)
self.declare_parameter("kf_meas_noise", 0.3)
self.declare_parameter("publish_rate", 20.0)
self._p = self._load_params()
# ── State (protected by lock) ──────────────────────────────────────
self._lock = threading.Lock()
self._ranges = {} # anchor_id → (range_m, rssi, timestamp)
self._kf = KalmanFilter2D(
process_noise=self._p["kf_process_noise"],
measurement_noise=self._p["kf_meas_noise"],
dt=1.0 / self._p["publish_rate"],
)
# ── Publishers ────────────────────────────────────────────────────
self._target_pub = self.create_publisher(
PoseStamped, "/uwb/target", 10)
self._ranges_pub = self.create_publisher(
UwbRangeArray, "/uwb/ranges", 10)
# ── Serial readers ────────────────────────────────────────────────
if _SERIAL_AVAILABLE:
self._reader_a = SerialReader(
self._p["port_a"], self._p["baudrate"],
anchor_id=0, callback=self._range_cb,
logger=self.get_logger(),
)
self._reader_b = SerialReader(
self._p["port_b"], self._p["baudrate"],
anchor_id=1, callback=self._range_cb,
logger=self.get_logger(),
)
self._reader_a.start()
self._reader_b.start()
else:
self.get_logger().warn(
"pyserial not installed — running in simulation mode (no serial I/O)"
)
# ── Publish timer ─────────────────────────────────────────────────
self._timer = self.create_timer(
1.0 / self._p["publish_rate"], self._publish_cb
)
self.get_logger().info(
f"UWB driver ready sep={self._p['anchor_separation']}m "
f"ports={self._p['port_a']},{self._p['port_b']} "
f"rate={self._p['publish_rate']}Hz"
)
# ── Helpers ───────────────────────────────────────────────────────────────
def _load_params(self):
return {
"port_a": self.get_parameter("port_a").value,
"port_b": self.get_parameter("port_b").value,
"baudrate": self.get_parameter("baudrate").value,
"anchor_separation": self.get_parameter("anchor_separation").value,
"anchor_height": self.get_parameter("anchor_height").value,
"tag_height": self.get_parameter("tag_height").value,
"range_timeout_s": self.get_parameter("range_timeout_s").value,
"max_range_m": self.get_parameter("max_range_m").value,
"min_range_m": self.get_parameter("min_range_m").value,
"kf_process_noise": self.get_parameter("kf_process_noise").value,
"kf_meas_noise": self.get_parameter("kf_meas_noise").value,
"publish_rate": self.get_parameter("publish_rate").value,
}
# ── Callbacks ─────────────────────────────────────────────────────────────
def _range_cb(self, anchor_id: int, range_mm: int, rssi: float):
"""Called from serial reader threads — thread-safe update."""
range_m = range_mm / 1000.0
p = self._p
if range_m < p["min_range_m"] or range_m > p["max_range_m"]:
return
with self._lock:
self._ranges[anchor_id] = (range_m, rssi, time.monotonic())
def _publish_cb(self):
now = time.monotonic()
timeout = self._p["range_timeout_s"]
sep = self._p["anchor_separation"]
with self._lock:
# Collect valid (non-stale) ranges
valid = {}
for aid, (r, rssi, t) in self._ranges.items():
if now - t <= timeout:
valid[aid] = (r, rssi, t)
# Build and publish UwbRangeArray regardless (even if partial)
hdr = Header()
hdr.stamp = self.get_clock().now().to_msg()
hdr.frame_id = "base_link"
arr = UwbRangeArray()
arr.header = hdr
for aid, (r, rssi, _) in valid.items():
entry = UwbRange()
entry.header = hdr
entry.anchor_id = aid
entry.range_m = float(r)
entry.raw_mm = int(round(r * 1000.0))
entry.rssi = float(rssi)
arr.ranges.append(entry)
self._ranges_pub.publish(arr)
# Need both anchors to triangulate
if 0 not in valid or 1 not in valid:
return
r0 = valid[0][0]
r1 = valid[1][0]
try:
x_t, y_t = triangulate_2anchor(
r0=r0,
r1=r1,
sep=sep,
anchor_z=self._p["anchor_height"],
tag_z=self._p["tag_height"],
)
except (ValueError, ZeroDivisionError) as exc:
self.get_logger().warn(f"Triangulation error: {exc}")
return
# Kalman filter update
dt = 1.0 / self._p["publish_rate"]
self._kf.predict(dt=dt)
self._kf.update(x_t, y_t)
kx, ky = self._kf.position()
# Publish PoseStamped in base_link
pose = PoseStamped()
pose.header = hdr
pose.pose.position.x = kx
pose.pose.position.y = ky
pose.pose.position.z = 0.0
# Orientation: face the person (yaw = atan2(y, x))
yaw = math.atan2(ky, kx)
pose.pose.orientation.z = math.sin(yaw / 2.0)
pose.pose.orientation.w = math.cos(yaw / 2.0)
self._target_pub.publish(pose)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = UwbDriverNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_uwb
[install]
install_scripts=$base/lib/saltybot_uwb

View File

@ -0,0 +1,32 @@
from setuptools import setup
import os
from glob import glob
package_name = "saltybot_uwb"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages",
["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
(os.path.join("share", package_name, "launch"),
glob("launch/*.py")),
(os.path.join("share", package_name, "config"),
glob("config/*.yaml")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="seb",
maintainer_email="seb@vayrette.com",
description="UWB follow-me driver for saltybot (MaUWB ESP32-S3 DW3000)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"uwb_driver = saltybot_uwb.uwb_driver_node:main",
],
},
)

View File

@ -0,0 +1,174 @@
"""
test_ranging_math.py Unit tests for UWB triangulation and Kalman filter.
No ROS2 / serial / GPU dependencies runs with plain pytest.
"""
import math
import pytest
from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
# ── triangulate_2anchor ───────────────────────────────────────────────────────
class TestTriangulate2Anchor:
def test_person_directly_ahead(self):
"""Person 2m ahead, centred: x≈2, y≈0."""
sep = 0.25
# A0 at y=+0.125, A1 at y=-0.125 → symmetric → y_t ≈ 0
r0 = math.sqrt(2.0**2 + 0.125**2)
r1 = r0 # symmetric
x, y = triangulate_2anchor(r0, r1, sep)
assert abs(x - 2.0) < 0.02
assert abs(y) < 0.01
def test_person_offset_left(self):
"""Person 2m ahead, 0.5m to the left: y_t ≈ +0.5."""
sep = 0.25
# A0 at y=+0.125, A1 at y=-0.125
px, py = 2.0, 0.5
r0 = math.sqrt(px**2 + (py - 0.125)**2)
r1 = math.sqrt(px**2 + (py + 0.125)**2)
x, y = triangulate_2anchor(r0, r1, sep)
assert abs(x - px) < 0.05
assert abs(y - py) < 0.05
def test_person_offset_right(self):
"""Person 2m ahead, 0.5m to the right: y_t ≈ -0.5."""
sep = 0.25
px, py = 2.0, -0.5
r0 = math.sqrt(px**2 + (py - 0.125)**2)
r1 = math.sqrt(px**2 + (py + 0.125)**2)
x, y = triangulate_2anchor(r0, r1, sep)
assert abs(x - px) < 0.05
assert abs(y - py) < 0.05
def test_height_compensation_reduces_horizontal_range(self):
"""With height difference, horizontal range < slant range → x smaller."""
sep = 0.25
# No height: x ≈ 2.0
x_flat, _ = triangulate_2anchor(2.0, 2.0, sep, anchor_z=0.0, tag_z=0.0)
# Same slant range but 0.5m height difference
slant = math.sqrt(2.0**2 + 0.5**2)
x_tall, _ = triangulate_2anchor(slant, slant, sep, anchor_z=0.8, tag_z=0.3)
# x_tall should be close to 2.0 (height-compensated back to horizontal)
assert abs(x_tall - x_flat) < 0.05
def test_returns_nonnegative_x(self):
"""x_t is always ≥ 0 (person can't be geometrically behind anchors)."""
sep = 0.25
# Pathological: extremely mismatched ranges → sqrt clamped to 0
x, _ = triangulate_2anchor(0.1, 0.1, sep)
assert x >= 0.0
def test_invalid_sep_raises(self):
with pytest.raises(ValueError):
triangulate_2anchor(1.0, 1.0, sep=0.0)
def test_negative_range_raises(self):
with pytest.raises(ValueError):
triangulate_2anchor(-1.0, 1.0, sep=0.25)
def test_close_range(self):
"""Person very close (0.3m) — should not crash."""
sep = 0.25
px, py = 0.3, 0.0
r0 = math.sqrt(px**2 + 0.125**2)
r1 = r0
x, y = triangulate_2anchor(r0, r1, sep)
assert x >= 0.0
def test_large_range(self):
"""Person 7m ahead — within DW3000 practical range."""
sep = 0.25
r0 = math.sqrt(7.0**2 + 0.125**2)
r1 = r0
x, y = triangulate_2anchor(r0, r1, sep)
assert abs(x - 7.0) < 0.1
assert abs(y) < 0.05
# ── KalmanFilter2D ────────────────────────────────────────────────────────────
class TestKalmanFilter2D:
def test_initial_position_after_first_update(self):
"""First update initialises position to measurement."""
kf = KalmanFilter2D()
kf.predict()
kf.update(3.0, 1.0)
x, y = kf.position()
assert abs(x - 3.0) < 0.01
assert abs(y - 1.0) < 0.01
def test_converges_to_stationary_target(self):
"""Repeated updates to same position → Kalman converges to it."""
kf = KalmanFilter2D(process_noise=0.1, measurement_noise=0.3)
for _ in range(50):
kf.predict(dt=0.05)
kf.update(2.0, 0.5)
x, y = kf.position()
assert abs(x - 2.0) < 0.1
assert abs(y - 0.5) < 0.1
def test_smooths_noisy_measurements(self):
"""Filter output should be smoother than raw measurements."""
import random
rng = random.Random(42)
kf = KalmanFilter2D(process_noise=0.05, measurement_noise=0.5)
raw_errors = []
kf_errors = []
true_x, true_y = 2.0, 1.0
for _ in range(30):
noisy_x = true_x + rng.gauss(0, 0.3)
noisy_y = true_y + rng.gauss(0, 0.3)
kf.predict(dt=0.05)
kf.update(noisy_x, noisy_y)
kx, ky = kf.position()
raw_errors.append((noisy_x - true_x)**2 + (noisy_y - true_y)**2)
kf_errors.append((kx - true_x)**2 + (ky - true_y)**2)
avg_raw = sum(raw_errors[-10:]) / 10
avg_kf = sum(kf_errors[-10:]) / 10
assert avg_kf < avg_raw # filter is smoother
def test_reset_clears_state(self):
"""After reset(), position returns to ~0."""
kf = KalmanFilter2D()
for _ in range(10):
kf.predict()
kf.update(5.0, 3.0)
kf.reset()
kf.predict()
kf.update(0.0, 0.0)
x, y = kf.position()
assert abs(x) < 0.1
assert abs(y) < 0.1
def test_velocity_nonzero_for_moving_target(self):
"""For a target moving at constant velocity, vx should be detected."""
kf = KalmanFilter2D(process_noise=1.0, measurement_noise=0.1, dt=0.1)
# Target moving at 0.5 m/s in X
for i in range(40):
kf.predict(dt=0.1)
kf.update(0.5 * i * 0.1, 0.0)
vx, vy = kf.velocity()
assert abs(vx - 0.5) < 0.15
def test_custom_dt(self):
"""predict() accepts custom dt without raising."""
kf = KalmanFilter2D()
kf.predict(dt=0.02)
kf.update(1.0, 1.0)
x, y = kf.position()
assert abs(x - 1.0) < 0.01
def test_no_nan_on_zero_measurement(self):
"""Zero measurement should not produce NaN."""
kf = KalmanFilter2D()
kf.predict()
kf.update(0.0, 0.0)
x, y = kf.position()
assert not math.isnan(x)
assert not math.isnan(y)

View File

@ -0,0 +1,15 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_uwb_msgs)
find_package(ament_cmake REQUIRED)
find_package(std_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/UwbRange.msg"
"msg/UwbRangeArray.msg"
DEPENDENCIES std_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,13 @@
# UwbRange.msg — range measurement from a single UWB anchor
#
# anchor_id : 0 = anchor-0 (port side), 1 = anchor-1 (starboard side)
# range_m : measured horizontal range in metres (after height correction)
# raw_mm : raw TWR range from AT+RANGE? response, millimetres
# rssi : received signal strength (dBm), 0 if not reported by module
std_msgs/Header header
uint8 anchor_id
float32 range_m
uint32 raw_mm
float32 rssi

View File

@ -0,0 +1,8 @@
# UwbRangeArray.msg — all UWB anchor ranges in one message
#
# Published by uwb_driver_node on /uwb/ranges.
# Contains one UwbRange entry per active anchor.
std_msgs/Header header
saltybot_uwb_msgs/UwbRange[] ranges

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_msgs</name>
<version>0.1.0</version>
<description>Custom ROS2 message definitions for UWB ranging (saltybot).</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>std_msgs</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>