diff --git a/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py b/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py
index 502bc22..c9bf3ad 100644
--- a/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py
+++ b/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py
@@ -1,30 +1,42 @@
"""
-person_follower_node.py — Proportional person-following control loop.
+person_follower_node.py -- Proportional person-following control loop.
Subscribes:
- /person/target (geometry_msgs/PoseStamped) — person pose in base_link frame
- /local_costmap/costmap (nav_msgs/OccupancyGrid) — Nav2 local obstacle map
+ /uwb/target (geometry_msgs/PoseStamped) -- UWB-triangulated position (primary)
+ /person/target (geometry_msgs/PoseStamped) -- camera-detected position (secondary)
+ /local_costmap/costmap (nav_msgs/OccupancyGrid) -- Nav2 local obstacle map
Publishes:
- /cmd_vel (geometry_msgs/Twist) — velocity commands
+ /cmd_vel (geometry_msgs/Twist) -- velocity commands
+
+Sensor fusion
+-------------
+ When both UWB and camera sources are fresh:
+ fused = uwb_weight * uwb + (1 - uwb_weight) * camera
+ When only UWB is fresh: use UWB directly.
+ When only camera is fresh: use camera directly.
+ When neither is fresh: STOPPING -> SEARCHING.
+
+ UWB is declared stale after uwb_timeout seconds (default 1.0s).
+ Camera is declared stale after lost_timeout seconds (default 2.0s).
State machine
-─────────────
- FOLLOWING — person visible (last detection < lost_timeout ago)
- STOPPING — person lost, publishing zero until search_timeout
- SEARCHING — person lost > search_timeout, slow rotation to re-acquire
+-------------
+ FOLLOWING -- at least one source fresh (last detection < timeout ago)
+ STOPPING -- all sources lost, publishing zero until search_timeout
+ SEARCHING -- all sources lost > search_timeout, slow rotation to re-acquire
Safety wiring
-─────────────
- • cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate —
+-------------
+ * cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate --
this node publishes raw /cmd_vel, the bridge handles hardware safety.
- • follow_enabled param (default True) lets the operator disable the controller
+ * follow_enabled param (default True) lets the operator disable the controller
at runtime: ros2 param set /person_follower follow_enabled false
- • Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
+ * Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
corridor, forward cmd_vel is zeroed; turning is still allowed.
Usage
-─────
+-----
ros2 launch saltybot_follower person_follower.launch.py
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
"""
@@ -37,13 +49,13 @@ from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, Twist
from nav_msgs.msg import OccupancyGrid
-# ── State constants ────────────────────────────────────────────────────────────
+# -- State constants -----------------------------------------------------------
_FOLLOWING = "following"
-_STOPPING = "stopping" # person lost < search_timeout, publishing zero
-_SEARCHING = "searching" # person lost > search_timeout, slow rotation
+_STOPPING = "stopping" # all sources lost < search_timeout, publishing zero
+_SEARCHING = "searching" # all sources lost > search_timeout, slow rotation
-# ── Pure helper functions (also exercised by unit tests) ──────────────────────
+# -- Pure helper functions (also exercised by unit tests) ----------------------
def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
@@ -58,6 +70,40 @@ def compute_bearing(px: float, py: float) -> float:
return math.atan2(py, px)
+def fuse_targets(
+ uwb_x: float,
+ uwb_y: float,
+ uwb_fresh: bool,
+ cam_x: float,
+ cam_y: float,
+ cam_fresh: bool,
+ uwb_weight: float,
+) -> tuple:
+ """
+ Fuse UWB (primary) and camera (secondary) position estimates.
+
+ Returns (fused_x, fused_y, any_fresh).
+
+ Fusion rules:
+ Both fresh -> weighted blend: uwb_weight * uwb + (1-uwb_weight) * camera
+ UWB only -> UWB position, full weight
+ Camera only -> camera position, full weight
+ Neither -> (0, 0, False)
+ """
+ if uwb_fresh and cam_fresh:
+ w = clamp(uwb_weight, 0.0, 1.0)
+ return (
+ w * uwb_x + (1.0 - w) * cam_x,
+ w * uwb_y + (1.0 - w) * cam_y,
+ True,
+ )
+ if uwb_fresh:
+ return uwb_x, uwb_y, True
+ if cam_fresh:
+ return cam_x, cam_y, True
+ return 0.0, 0.0, False
+
+
def compute_follow_cmd(
px: float,
py: float,
@@ -75,30 +121,27 @@ def compute_follow_cmd(
Returns (linear_x, angular_z) as floats.
linear_x:
- Proportional to signed distance error (distance − follow_distance).
- Zero inside the dead zone. Clamped to ±max_linear_vel.
+ Proportional to signed distance error (distance - follow_distance).
+ Zero inside the dead zone. Clamped to +-max_linear_vel.
Zeroed (not reversed) if obstacle_ahead and moving forward.
angular_z:
Proportional to bearing. Always corrects even when inside the dead zone,
so the robot stays pointed at the person while holding distance.
- Clamped to ±max_angular_vel.
+ Clamped to +-max_angular_vel.
"""
distance = compute_distance(px, py)
bearing = compute_bearing(px, py)
- # Linear — dead zone suppresses jitter at target distance
dist_error = distance - follow_distance
if abs(dist_error) > dead_zone:
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
else:
linear_x = 0.0
- # Obstacle override: suppress forward motion, allow turning
if obstacle_ahead and linear_x > 0.0:
linear_x = 0.0
- # Angular — always correct bearing
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
return linear_x, angular_z
@@ -116,7 +159,7 @@ def check_obstacle_in_costmap(
"""
Check for obstacle cells in the robot's forward corridor.
- Assumes a Nav2 rolling-window local costmap (robot ≈ centre cell).
+ Assumes a Nav2 rolling-window local costmap (robot ~= centre cell).
Forward = +col direction (robot +X).
costmap_data : flat row-major int8 list (from OccupancyGrid.data)
@@ -124,7 +167,7 @@ def check_obstacle_in_costmap(
resolution : metres per cell
check_dist : how far ahead to look (metres)
robot_half_width: corridor half-width (metres)
- threshold : costmap value ≥ this is treated as obstacle (Nav2 lethal=99)
+ threshold : costmap value >= this is treated as obstacle (Nav2 lethal=99)
"""
if resolution <= 0.0:
return False
@@ -132,8 +175,8 @@ def check_obstacle_in_costmap(
cx = width // 2
cy = height // 2
- n_fwd = int(check_dist / resolution)
- n_side = int(robot_half_width / resolution)
+ n_fwd = int(check_dist / resolution)
+ n_side = int(robot_half_width / resolution)
for dx in range(1, n_fwd + 1):
col = cx + dx
@@ -147,14 +190,14 @@ def check_obstacle_in_costmap(
return False
-# ── ROS2 Node ─────────────────────────────────────────────────────────────────
+# -- ROS2 Node ----------------------------------------------------------------
class PersonFollowerNode(Node):
def __init__(self):
super().__init__("person_follower")
- # ── Parameters ────────────────────────────────────────────────────────
+ # -- Parameters --------------------------------------------------------
self.declare_parameter("follow_distance", 1.5)
self.declare_parameter("dead_zone", 0.3)
self.declare_parameter("kp_linear", 0.6)
@@ -169,38 +212,48 @@ class PersonFollowerNode(Node):
self.declare_parameter("robot_width", 0.3)
self.declare_parameter("control_rate", 20.0)
self.declare_parameter("follow_enabled", True)
+ # UWB fusion
+ self.declare_parameter("uwb_weight", 0.7)
+ self.declare_parameter("uwb_timeout", 1.0)
self._p = {}
self._reload_params()
- # ── State ─────────────────────────────────────────────────────────────
+ # -- State -------------------------------------------------------------
self._state = _STOPPING
- self._last_target_t = 0.0 # monotonic clock; 0 = never received
+ self._last_target_t = 0.0 # camera; monotonic; 0 = never received
self._person_x = 0.0
self._person_y = 0.0
+ self._last_uwb_t = 0.0 # UWB; monotonic; 0 = never received
+ self._uwb_x = 0.0
+ self._uwb_y = 0.0
self._obstacle_ahead = False
- # ── Subscriptions ─────────────────────────────────────────────────────
+ # -- Subscriptions -----------------------------------------------------
+ self.create_subscription(
+ PoseStamped, "/uwb/target", self._uwb_cb, 10)
self.create_subscription(
PoseStamped, "/person/target", self._target_cb, 10)
self.create_subscription(
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
- # ── Publisher ─────────────────────────────────────────────────────────
+ # -- Publisher ---------------------------------------------------------
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
- # ── Control timer ─────────────────────────────────────────────────────
+ # -- Control timer -----------------------------------------------------
self._timer = self.create_timer(
1.0 / self._p["control_rate"], self._control_cb)
self.get_logger().info(
- f"PersonFollower ready follow_dist={self._p['follow_distance']}m "
- f"dead_zone=±{self._p['dead_zone']}m "
+ "PersonFollower ready "
+ f"follow_dist={self._p['follow_distance']}m "
+ f"dead_zone=+/-{self._p['dead_zone']}m "
f"max_vel={self._p['max_linear_vel']}m/s "
+ f"uwb_weight={self._p['uwb_weight']} "
f"lost_timeout={self._p['lost_timeout']}s"
)
- # ── Parameter helper ──────────────────────────────────────────────────────
+ # -- Parameter helper ------------------------------------------------------
def _reload_params(self):
self._p = {
@@ -218,12 +271,21 @@ class PersonFollowerNode(Node):
"robot_width": self.get_parameter("robot_width").value,
"control_rate": self.get_parameter("control_rate").value,
"follow_enabled": self.get_parameter("follow_enabled").value,
+ "uwb_weight": self.get_parameter("uwb_weight").value,
+ "uwb_timeout": self.get_parameter("uwb_timeout").value,
}
- # ── Callbacks ─────────────────────────────────────────────────────────────
+ # -- Callbacks -------------------------------------------------------------
+
+ def _uwb_cb(self, msg):
+ """UWB-triangulated person position (primary source)."""
+ self._uwb_x = msg.pose.position.x
+ self._uwb_y = msg.pose.position.y
+ self._last_uwb_t = time.monotonic()
+ self._state = _FOLLOWING
def _target_cb(self, msg):
- """Person position in base_link frame."""
+ """Camera-detected person position (secondary source)."""
self._person_x = msg.pose.position.x
self._person_y = msg.pose.position.y
self._last_target_t = time.monotonic()
@@ -240,10 +302,9 @@ class PersonFollowerNode(Node):
threshold=self._p["obstacle_threshold"],
)
- # ── Control loop ──────────────────────────────────────────────────────────
+ # -- Control loop ----------------------------------------------------------
def _control_cb(self):
- # Re-read mutable params every tick (cheap dict lookup)
self._reload_params()
twist = Twist()
@@ -252,51 +313,69 @@ class PersonFollowerNode(Node):
self._cmd_pub.publish(twist)
return
- now = time.monotonic()
- dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
+ now = time.monotonic()
- # State transitions (don't transition back from STOPPING/SEARCHING
- # here — _target_cb resets to FOLLOWING on new detection)
+ uwb_fresh = (
+ self._last_uwb_t > 0.0 and
+ (now - self._last_uwb_t) <= self._p["uwb_timeout"]
+ )
+ cam_fresh = (
+ self._last_target_t > 0.0 and
+ (now - self._last_target_t) <= self._p["lost_timeout"]
+ )
+
+ any_fresh = uwb_fresh or cam_fresh
if self._state == _FOLLOWING:
- if dt_lost > self._p["search_timeout"]:
- self._state = _SEARCHING
- self.get_logger().warn("Person lost — entering SEARCHING state")
- elif dt_lost > self._p["lost_timeout"]:
- self._state = _STOPPING
- self.get_logger().warn("Person lost — stopping")
+ if not any_fresh:
+ uwb_stale = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else 1e9
+ cam_stale = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
+ dt_lost = min(uwb_stale, cam_stale)
+ if dt_lost > self._p["search_timeout"]:
+ self._state = _SEARCHING
+ self.get_logger().warn("Person lost -- entering SEARCHING state")
+ else:
+ self._state = _STOPPING
+ self.get_logger().warn("Person lost -- stopping")
linear_x, angular_z = 0.0, 0.0
if self._state == _FOLLOWING:
- linear_x, angular_z = compute_follow_cmd(
- px=self._person_x,
- py=self._person_y,
- follow_distance=self._p["follow_distance"],
- dead_zone=self._p["dead_zone"],
- kp_linear=self._p["kp_linear"],
- kp_angular=self._p["kp_angular"],
- max_linear_vel=self._p["max_linear_vel"],
- max_angular_vel=self._p["max_angular_vel"],
- obstacle_ahead=self._obstacle_ahead,
+ fx, fy, fresh = fuse_targets(
+ uwb_x=self._uwb_x,
+ uwb_y=self._uwb_y,
+ uwb_fresh=uwb_fresh,
+ cam_x=self._person_x,
+ cam_y=self._person_y,
+ cam_fresh=cam_fresh,
+ uwb_weight=self._p["uwb_weight"],
)
- if self._obstacle_ahead and linear_x == 0.0:
- self.get_logger().warn(
- "Obstacle in path — forward motion suppressed",
- throttle_duration_sec=2.0,
+ if fresh:
+ linear_x, angular_z = compute_follow_cmd(
+ px=fx,
+ py=fy,
+ follow_distance=self._p["follow_distance"],
+ dead_zone=self._p["dead_zone"],
+ kp_linear=self._p["kp_linear"],
+ kp_angular=self._p["kp_angular"],
+ max_linear_vel=self._p["max_linear_vel"],
+ max_angular_vel=self._p["max_angular_vel"],
+ obstacle_ahead=self._obstacle_ahead,
)
+ if self._obstacle_ahead and linear_x == 0.0:
+ self.get_logger().warn(
+ "Obstacle in path -- forward motion suppressed",
+ throttle_duration_sec=2.0,
+ )
elif self._state == _SEARCHING:
angular_z = float(self._p["search_angular_vel"])
- # linear_x stays 0 — rotate in place to search
-
- # _STOPPING: both stay 0
twist.linear.x = linear_x
twist.angular.z = angular_z
self._cmd_pub.publish(twist)
-# ── Entry point ───────────────────────────────────────────────────────────────
+# -- Entry point ---------------------------------------------------------------
def main(args=None):
rclpy.init(args=args)
diff --git a/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py b/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py
index 80a602a..b59493d 100644
--- a/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py
+++ b/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py
@@ -388,3 +388,98 @@ class TestFollowEnabled:
lin, ang = (0.0, 0.0) if not follow_enabled else \
_compute_follow_cmd(3.0, 0.0, **_DEF)
assert lin != pytest.approx(0.0)
+
+
+# ── fuse_targets — UWB + camera fusion ──────────────────────────────────────
+
+def _fuse_targets(uwb_x, uwb_y, uwb_fresh, cam_x, cam_y, cam_fresh, uwb_weight):
+ """Mirror of person_follower_node.fuse_targets (no ROS2)."""
+ def clamp(v, lo, hi): return max(lo, min(hi, v))
+ if uwb_fresh and cam_fresh:
+ w = clamp(uwb_weight, 0.0, 1.0)
+ return w * uwb_x + (1.0 - w) * cam_x, w * uwb_y + (1.0 - w) * cam_y, True
+ if uwb_fresh:
+ return uwb_x, uwb_y, True
+ if cam_fresh:
+ return cam_x, cam_y, True
+ return 0.0, 0.0, False
+
+
+class TestFuseTargets:
+
+ def test_both_fresh_blends(self):
+ """Both sources fresh → weighted blend."""
+ fx, fy, ok = _fuse_targets(
+ uwb_x=4.0, uwb_y=0.0, uwb_fresh=True,
+ cam_x=2.0, cam_y=0.0, cam_fresh=True,
+ uwb_weight=0.7,
+ )
+ assert ok is True
+ assert fx == pytest.approx(0.7 * 4.0 + 0.3 * 2.0)
+ assert fy == pytest.approx(0.0)
+
+ def test_uwb_only(self):
+ """Only UWB fresh → UWB position returned as-is."""
+ fx, fy, ok = _fuse_targets(
+ uwb_x=3.0, uwb_y=1.0, uwb_fresh=True,
+ cam_x=0.0, cam_y=0.0, cam_fresh=False,
+ uwb_weight=0.7,
+ )
+ assert ok is True
+ assert fx == pytest.approx(3.0)
+ assert fy == pytest.approx(1.0)
+
+ def test_camera_only(self):
+ """Only camera fresh → camera position returned as-is."""
+ fx, fy, ok = _fuse_targets(
+ uwb_x=0.0, uwb_y=0.0, uwb_fresh=False,
+ cam_x=2.5, cam_y=-0.5, cam_fresh=True,
+ uwb_weight=0.7,
+ )
+ assert ok is True
+ assert fx == pytest.approx(2.5)
+ assert fy == pytest.approx(-0.5)
+
+ def test_neither_fresh(self):
+ """Both stale → (0, 0, False)."""
+ fx, fy, ok = _fuse_targets(
+ uwb_x=3.0, uwb_y=1.0, uwb_fresh=False,
+ cam_x=2.0, cam_y=0.0, cam_fresh=False,
+ uwb_weight=0.7,
+ )
+ assert ok is False
+ assert fx == pytest.approx(0.0)
+ assert fy == pytest.approx(0.0)
+
+ def test_uwb_weight_zero_gives_camera(self):
+ """uwb_weight=0.0 → pure camera even when both fresh."""
+ fx, fy, ok = _fuse_targets(
+ uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
+ cam_x=1.0, cam_y=0.5, cam_fresh=True,
+ uwb_weight=0.0,
+ )
+ assert ok is True
+ assert fx == pytest.approx(1.0)
+ assert fy == pytest.approx(0.5)
+
+ def test_uwb_weight_one_gives_uwb(self):
+ """uwb_weight=1.0 → pure UWB even when both fresh."""
+ fx, fy, ok = _fuse_targets(
+ uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
+ cam_x=1.0, cam_y=0.5, cam_fresh=True,
+ uwb_weight=1.0,
+ )
+ assert ok is True
+ assert fx == pytest.approx(4.0)
+ assert fy == pytest.approx(2.0)
+
+ def test_lateral_blend(self):
+ """Lateral (Y) component is also blended correctly."""
+ fx, fy, ok = _fuse_targets(
+ uwb_x=2.0, uwb_y=1.0, uwb_fresh=True,
+ cam_x=2.0, cam_y=-1.0, cam_fresh=True,
+ uwb_weight=0.5,
+ )
+ assert ok is True
+ assert fx == pytest.approx(2.0)
+ assert fy == pytest.approx(0.0) # 0.5*1.0 + 0.5*(-1.0) = 0
diff --git a/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml b/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml
new file mode 100644
index 0000000..58528c4
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml
@@ -0,0 +1,57 @@
+# uwb_config.yaml — MaUWB ESP32-S3 DW3000 UWB follow-me system
+#
+# Hardware layout:
+# Anchor-0 (port side) → USB port_a, y = +anchor_separation/2
+# Anchor-1 (starboard side) → USB port_b, y = -anchor_separation/2
+# Tag on person → belt clip, ~0.9m above ground
+#
+# Run with:
+# ros2 launch saltybot_uwb uwb.launch.py
+# Override at launch:
+# ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2
+
+# ── Serial ports ──────────────────────────────────────────────────────────────
+# Set udev rules to get stable symlinks:
+# /dev/uwb-anchor0 → port_a
+# /dev/uwb-anchor1 → port_b
+# (See jetson/docs/pinout.md for udev setup)
+port_a: /dev/uwb-anchor0 # Anchor-0 (port)
+port_b: /dev/uwb-anchor1 # Anchor-1 (starboard)
+baudrate: 115200 # MaUWB default — do not change
+
+# ── Anchor geometry ────────────────────────────────────────────────────────────
+# anchor_separation: centre-to-centre distance between anchors (metres)
+# Must match physical mounting. Larger = more accurate lateral resolution.
+anchor_separation: 0.25 # metres (25cm)
+
+# anchor_height: height of anchors above ground (metres)
+# Orin stem mount ≈ 0.80m on the saltybot platform
+anchor_height: 0.80 # metres
+
+# tag_height: height of person's belt-clip tag above ground (metres)
+tag_height: 0.90 # metres (adjust per user)
+
+# ── Range validity ─────────────────────────────────────────────────────────────
+# range_timeout_s: stale anchor — excluded from triangulation after this gap
+range_timeout_s: 1.0 # seconds
+
+# max_range_m: discard ranges beyond this (DW3000 indoor practical limit ≈8m)
+max_range_m: 8.0 # metres
+
+# min_range_m: discard ranges below this (likely multipath artefacts)
+min_range_m: 0.05 # metres
+
+# ── Kalman filter ──────────────────────────────────────────────────────────────
+# kf_process_noise: Q scalar — how dynamic the person's motion is
+# Higher → faster response, more jitter
+kf_process_noise: 0.1
+
+# kf_meas_noise: R scalar — how noisy the UWB ranging is
+# DW3000 indoor accuracy ≈ 10cm RMS → 0.1m → R ≈ 0.01
+# Use 0.3 to be conservative on first deployment
+kf_meas_noise: 0.3
+
+# ── Publish rate ──────────────────────────────────────────────────────────────
+# Should match or exceed the AT+RANGE? poll rate from both anchors.
+# 20Hz means 50ms per cycle; each anchor query takes ~10ms → headroom ok.
+publish_rate: 20.0 # Hz
diff --git a/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py b/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py
new file mode 100644
index 0000000..a1bfb17
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py
@@ -0,0 +1,59 @@
+"""
+uwb.launch.py — Launch UWB driver node for MaUWB ESP32-S3 follow-me.
+
+Usage:
+ ros2 launch saltybot_uwb uwb.launch.py
+ ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
+ ros2 launch saltybot_uwb uwb.launch.py anchor_separation:=0.30 publish_rate:=10.0
+"""
+
+import os
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg = get_package_share_directory("saltybot_uwb")
+ cfg = os.path.join(pkg, "config", "uwb_config.yaml")
+
+ return LaunchDescription([
+ DeclareLaunchArgument("port_a", default_value="/dev/uwb-anchor0"),
+ DeclareLaunchArgument("port_b", default_value="/dev/uwb-anchor1"),
+ DeclareLaunchArgument("baudrate", default_value="115200"),
+ DeclareLaunchArgument("anchor_separation", default_value="0.25"),
+ DeclareLaunchArgument("anchor_height", default_value="0.80"),
+ DeclareLaunchArgument("tag_height", default_value="0.90"),
+ DeclareLaunchArgument("range_timeout_s", default_value="1.0"),
+ DeclareLaunchArgument("max_range_m", default_value="8.0"),
+ DeclareLaunchArgument("min_range_m", default_value="0.05"),
+ DeclareLaunchArgument("kf_process_noise", default_value="0.1"),
+ DeclareLaunchArgument("kf_meas_noise", default_value="0.3"),
+ DeclareLaunchArgument("publish_rate", default_value="20.0"),
+
+ Node(
+ package="saltybot_uwb",
+ executable="uwb_driver",
+ name="uwb_driver",
+ output="screen",
+ parameters=[
+ cfg,
+ {
+ "port_a": LaunchConfiguration("port_a"),
+ "port_b": LaunchConfiguration("port_b"),
+ "baudrate": LaunchConfiguration("baudrate"),
+ "anchor_separation": LaunchConfiguration("anchor_separation"),
+ "anchor_height": LaunchConfiguration("anchor_height"),
+ "tag_height": LaunchConfiguration("tag_height"),
+ "range_timeout_s": LaunchConfiguration("range_timeout_s"),
+ "max_range_m": LaunchConfiguration("max_range_m"),
+ "min_range_m": LaunchConfiguration("min_range_m"),
+ "kf_process_noise": LaunchConfiguration("kf_process_noise"),
+ "kf_meas_noise": LaunchConfiguration("kf_meas_noise"),
+ "publish_rate": LaunchConfiguration("publish_rate"),
+ },
+ ],
+ ),
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_uwb/package.xml b/jetson/ros2_ws/src/saltybot_uwb/package.xml
new file mode 100644
index 0000000..e0bae4d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/package.xml
@@ -0,0 +1,28 @@
+
+
+
+ saltybot_uwb
+ 0.1.0
+
+ UWB follow-me driver for saltybot — MaUWB ESP32-S3 DW3000 two-anchor
+ ranging, triangulation, Kalman filtering, and /uwb/target PoseStamped publisher.
+
+ seb
+ MIT
+
+ ament_python
+
+ rclpy
+ geometry_msgs
+ std_msgs
+ saltybot_uwb_msgs
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_uwb/resource/saltybot_uwb b/jetson/ros2_ws/src/saltybot_uwb/resource/saltybot_uwb
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/__init__.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py
new file mode 100644
index 0000000..7fcbe9b
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py
@@ -0,0 +1,278 @@
+"""
+ranging_math.py — Pure triangulation and Kalman filter helpers for UWB follow-me.
+
+No ROS2 dependencies — importable in unit tests without a ROS2 install.
+
+2-Anchor geometry
+─────────────────
+Anchors are mounted on the robot stem, separated by `sep` metres along the Y-axis:
+
+ A0 (0, +sep/2, anchor_z) ← port side
+ A1 (0, -sep/2, anchor_z) ← starboard side
+
+The person tag is at (x_t, y_t, tag_z) in base_link.
+
+Step 1 — height compensation:
+ dz = anchor_z - tag_z
+ r0_h = sqrt(max(0, r0² - dz²))
+ r1_h = sqrt(max(0, r1² - dz²))
+
+Step 2 — lateral offset (Y):
+ y_t = (r1_h² - r0_h²) / (2 * sep)
+
+Step 3 — forward distance (X):
+ x_t = sqrt(max(0, r0_h² - (y_t - sep/2)²))
+
+Returns (x_t, y_t); caller should treat negative x_t as 0.
+"""
+
+import math
+
+
+# ── Triangulation ─────────────────────────────────────────────────────────────
+
+def triangulate_2anchor(
+ r0: float,
+ r1: float,
+ sep: float,
+ anchor_z: float = 0.0,
+ tag_z: float = 0.0,
+) -> tuple:
+ """
+ Triangulate person position in base_link from two anchor ranges.
+
+ Parameters
+ ----------
+ r0 : range to anchor-0 (port, y = +sep/2) in metres
+ r1 : range to anchor-1 (starboard, y = -sep/2) in metres
+ sep : anchor separation in metres (centre-to-centre)
+ anchor_z : height of anchors above ground (metres, default 0)
+ tag_z : height of person tag above ground (metres, default 0)
+
+ Returns
+ -------
+ (x_t, y_t) : person position in metres (base_link X = forward, Y = left)
+ x_t is clamped to ≥ 0 (can't be behind anchors geometrically).
+ """
+ if sep <= 0.0:
+ raise ValueError(f"sep must be > 0, got {sep}")
+ if r0 < 0.0 or r1 < 0.0:
+ raise ValueError(f"ranges must be ≥ 0, got r0={r0}, r1={r1}")
+
+ # Height correction — bring to horizontal plane
+ dz = anchor_z - tag_z
+ dz2 = dz * dz
+ r0_h2 = max(0.0, r0 * r0 - dz2)
+ r1_h2 = max(0.0, r1 * r1 - dz2)
+ r0_h = math.sqrt(r0_h2)
+
+ # Lateral (Y) offset — from circle-circle intersection formula
+ y_t = (r1_h2 - r0_h2) / (2.0 * sep)
+
+ # Forward (X) distance
+ dx2 = r0_h2 - (y_t - sep / 2.0) ** 2
+ x_t = math.sqrt(max(0.0, dx2))
+
+ return x_t, y_t
+
+
+# ── Simple 2-D Kalman filter ──────────────────────────────────────────────────
+
+class KalmanFilter2D:
+ """
+ Constant-velocity Kalman filter for a 2-D position estimate.
+
+ State vector: [x, y, vx, vy]ᵀ
+
+ Parameters
+ ----------
+ process_noise : Q scalar — how much we trust the motion model
+ measurement_noise : R scalar — how noisy the UWB measurement is
+ dt : expected update interval in seconds (used for F matrix)
+ """
+
+ def __init__(
+ self,
+ process_noise: float = 0.1,
+ measurement_noise: float = 0.3,
+ dt: float = 0.05,
+ ):
+ self._q = process_noise
+ self._r = measurement_noise
+ self._dt = dt
+
+ # State: [x, y, vx, vy]
+ self._x = [0.0, 0.0, 0.0, 0.0]
+
+ # Covariance matrix P (4×4, stored as flat list, row-major)
+ init_pos_var = 1.0
+ init_vel_var = 0.5
+ self._P = [
+ init_pos_var, 0.0, 0.0, 0.0,
+ 0.0, init_pos_var, 0.0, 0.0,
+ 0.0, 0.0, init_vel_var, 0.0,
+ 0.0, 0.0, 0.0, init_vel_var,
+ ]
+ self._initialised = False
+
+ # ── Matrix helpers (minimal, no numpy — importable without GPU deps) ──────
+
+ @staticmethod
+ def _mat_add(A, B):
+ return [a + b for a, b in zip(A, B)]
+
+ @staticmethod
+ def _mat_sub(A, B):
+ return [a - b for a, b in zip(A, B)]
+
+ @staticmethod
+ def _mat_mul(A, B, n):
+ """n×n matrix multiply."""
+ C = [0.0] * (n * n)
+ for i in range(n):
+ for k in range(n):
+ for j in range(n):
+ C[i * n + j] += A[i * n + k] * B[k * n + j]
+ return C
+
+ @staticmethod
+ def _mat_T(A, n):
+ """n×n transpose."""
+ T = [0.0] * (n * n)
+ for i in range(n):
+ for j in range(n):
+ T[j * n + i] = A[i * n + j]
+ return T
+
+ def _build_F(self):
+ dt = self._dt
+ return [
+ 1.0, 0.0, dt, 0.0,
+ 0.0, 1.0, 0.0, dt,
+ 0.0, 0.0, 1.0, 0.0,
+ 0.0, 0.0, 0.0, 1.0,
+ ]
+
+ def _build_Q(self):
+ q = self._q
+ dt = self._dt
+ dt2 = dt * dt
+ dt3 = dt2 * dt
+ dt4 = dt3 * dt
+ # Continuous white-noise model discretised for constant-velocity
+ return [
+ q * dt4 / 4, 0.0, q * dt3 / 2, 0.0,
+ 0.0, q * dt4 / 4, 0.0, q * dt3 / 2,
+ q * dt3 / 2, 0.0, q * dt2, 0.0,
+ 0.0, q * dt3 / 2, 0.0, q * dt2,
+ ]
+
+ def predict(self, dt: float = None):
+ """Predict step — advances state by dt (defaults to self._dt)."""
+ if dt is not None:
+ self._dt = dt
+ F = self._build_F()
+ Q = self._build_Q()
+ n = 4
+ x = self._x
+ # x = F @ x
+ new_x = [
+ F[0] * x[0] + F[1] * x[1] + F[2] * x[2] + F[3] * x[3],
+ F[4] * x[0] + F[5] * x[1] + F[6] * x[2] + F[7] * x[3],
+ F[8] * x[0] + F[9] * x[1] + F[10] * x[2] + F[11] * x[3],
+ F[12] * x[0] + F[13] * x[1] + F[14] * x[2] + F[15] * x[3],
+ ]
+ # P = F @ P @ F.T + Q
+ FP = self._mat_mul(F, self._P, n)
+ FT = self._mat_T(F, n)
+ FPF = self._mat_mul(FP, FT, n)
+ self._P = self._mat_add(FPF, Q)
+ self._x = new_x
+
+ def update(self, x_meas: float, y_meas: float):
+ """
+ Update step with a 2-D position measurement.
+
+ H = [[1,0,0,0],[0,1,0,0]] (observe position only)
+ """
+ if not self._initialised:
+ self._x[0] = x_meas
+ self._x[1] = y_meas
+ self._initialised = True
+ return
+
+ # Innovation
+ innov_x = x_meas - self._x[0]
+ innov_y = y_meas - self._x[1]
+
+ # S = H @ P @ H.T + R (2×2, position rows/cols only)
+ P = self._P
+ r = self._r
+ # H picks rows 0,1 and cols 0,1 from P
+ S00 = P[0] + r
+ S01 = P[1]
+ S10 = P[4]
+ S11 = P[5] + r
+
+ # K = P @ H.T @ S⁻¹ (4×2 Kalman gain)
+ det = S00 * S11 - S01 * S10
+ if abs(det) < 1e-12:
+ return
+ inv_det = 1.0 / det
+ # S⁻¹
+ Si00 = S11 * inv_det
+ Si01 = -S01 * inv_det
+ Si10 = -S10 * inv_det
+ Si11 = S00 * inv_det
+
+ # P @ H.T = first two columns of P (H.T selects cols 0,1)
+ PH = [P[0], P[1], P[4], P[5], P[8], P[9], P[12], P[13]] # 4×2
+
+ # K (4×2) = PH @ Si
+ K = [
+ PH[0] * Si00 + PH[1] * Si10, PH[0] * Si01 + PH[1] * Si11,
+ PH[2] * Si00 + PH[3] * Si10, PH[2] * Si01 + PH[3] * Si11,
+ PH[4] * Si00 + PH[5] * Si10, PH[4] * Si01 + PH[5] * Si11,
+ PH[6] * Si00 + PH[7] * Si10, PH[6] * Si01 + PH[7] * Si11,
+ ]
+
+ # x = x + K @ innov
+ self._x[0] += K[0] * innov_x + K[1] * innov_y
+ self._x[1] += K[2] * innov_x + K[3] * innov_y
+ self._x[2] += K[4] * innov_x + K[5] * innov_y
+ self._x[3] += K[6] * innov_x + K[7] * innov_y
+
+ # P = (I - K @ H) @ P (Joseph form for numerical stability skipped —
+ # this is a low-latency embedded context where simplicity wins)
+ n = 4
+ # K @ H selects first two columns → rows of P
+ KH = [0.0] * 16
+ for row in range(4):
+ KH[row * 4 + 0] = K[row * 2 + 0]
+ KH[row * 4 + 1] = K[row * 2 + 1]
+ # I - KH
+ IKH = [-KH[i] for i in range(16)]
+ for i in range(4):
+ IKH[i * 4 + i] += 1.0
+ self._P = self._mat_mul(IKH, self._P, n)
+
+ def position(self) -> tuple:
+ """Return current (x, y) position estimate."""
+ return self._x[0], self._x[1]
+
+ def velocity(self) -> tuple:
+ """Return current (vx, vy) velocity estimate."""
+ return self._x[2], self._x[3]
+
+ def reset(self):
+ """Reset filter to uninitialised state."""
+ self._x = [0.0, 0.0, 0.0, 0.0]
+ init_pos_var = 1.0
+ init_vel_var = 0.5
+ self._P = [
+ init_pos_var, 0.0, 0.0, 0.0,
+ 0.0, init_pos_var, 0.0, 0.0,
+ 0.0, 0.0, init_vel_var, 0.0,
+ 0.0, 0.0, 0.0, init_vel_var,
+ ]
+ self._initialised = False
diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py
new file mode 100644
index 0000000..8a5901d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py
@@ -0,0 +1,306 @@
+"""
+uwb_driver_node.py — ROS2 node for MaUWB ESP32-S3 DW3000 follow-me system.
+
+Hardware
+────────
+ • 2× MaUWB ESP32-S3 DW3000 anchors on robot stem (USB → Orin Nano)
+ - Anchor-0: port side (y = +sep/2)
+ - Anchor-1: starboard (y = -sep/2)
+ • 1× MaUWB tag on person (belt clip)
+
+AT command interface (115200 8N1)
+──────────────────────────────────
+ Query: AT+RANGE?\r\n
+ Response (from anchors):
+ +RANGE:,[,]\r\n
+
+ Config:
+ AT+anchor_tag=ANCHOR\r\n — set module as anchor
+ AT+anchor_tag=TAG\r\n — set module as tag
+
+Publishes
+─────────
+ /uwb/target (geometry_msgs/PoseStamped) — triangulated person position in base_link
+ /uwb/ranges (saltybot_uwb_msgs/UwbRangeArray) — raw ranges from both anchors
+
+Safety
+──────
+ If a range is stale (> range_timeout_s), that anchor is excluded from
+ triangulation. If both anchors are stale, /uwb/target is not published.
+
+Usage
+─────
+ ros2 launch saltybot_uwb uwb.launch.py
+ ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
+"""
+
+import math
+import re
+import threading
+import time
+
+import rclpy
+from rclpy.node import Node
+from geometry_msgs.msg import PoseStamped
+from std_msgs.msg import Header
+
+from saltybot_uwb_msgs.msg import UwbRange, UwbRangeArray
+from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
+
+try:
+ import serial
+ _SERIAL_AVAILABLE = True
+except ImportError:
+ _SERIAL_AVAILABLE = False
+
+
+# Regex: +RANGE:, or +RANGE:,,
+_RANGE_RE = re.compile(
+ r"\+RANGE\s*:\s*(\d+)\s*,\s*(\d+)(?:\s*,\s*(-?[\d.]+))?",
+ re.IGNORECASE,
+)
+
+
+class SerialReader(threading.Thread):
+ """
+ Background thread: polls an anchor's UART, fires callback on every
+ valid +RANGE response.
+ """
+
+ def __init__(self, port, baudrate, anchor_id, callback, logger):
+ super().__init__(daemon=True)
+ self._port = port
+ self._baudrate = baudrate
+ self._anchor_id = anchor_id
+ self._callback = callback
+ self._logger = logger
+ self._running = False
+ self._ser = None
+
+ def run(self):
+ self._running = True
+ while self._running:
+ try:
+ self._ser = serial.Serial(
+ self._port, self._baudrate, timeout=0.5
+ )
+ self._logger.info(
+ f"Anchor-{self._anchor_id}: opened {self._port}"
+ )
+ self._read_loop()
+ except Exception as exc:
+ self._logger.warn(
+ f"Anchor-{self._anchor_id} serial error: {exc} — retrying in 2s"
+ )
+ if self._ser and self._ser.is_open:
+ self._ser.close()
+ time.sleep(2.0)
+
+ def _read_loop(self):
+ while self._running:
+ try:
+ # Query the anchor
+ self._ser.write(b"AT+RANGE?\r\n")
+ # Read up to 10 lines waiting for a +RANGE response
+ for _ in range(10):
+ raw = self._ser.readline()
+ if not raw:
+ break
+ line = raw.decode("ascii", errors="replace").strip()
+ m = _RANGE_RE.search(line)
+ if m:
+ range_mm = int(m.group(2))
+ rssi = float(m.group(3)) if m.group(3) else 0.0
+ self._callback(self._anchor_id, range_mm, rssi)
+ break
+ except Exception as exc:
+ self._logger.warn(
+ f"Anchor-{self._anchor_id} read error: {exc}"
+ )
+ break # trigger reconnect
+
+ def stop(self):
+ self._running = False
+ if self._ser and self._ser.is_open:
+ self._ser.close()
+
+
+class UwbDriverNode(Node):
+
+ def __init__(self):
+ super().__init__("uwb_driver")
+
+ # ── Parameters ────────────────────────────────────────────────────────
+ self.declare_parameter("port_a", "/dev/ttyUSB0")
+ self.declare_parameter("port_b", "/dev/ttyUSB1")
+ self.declare_parameter("baudrate", 115200)
+ self.declare_parameter("anchor_separation", 0.25)
+ self.declare_parameter("anchor_height", 0.80)
+ self.declare_parameter("tag_height", 0.90)
+ self.declare_parameter("range_timeout_s", 1.0)
+ self.declare_parameter("max_range_m", 8.0)
+ self.declare_parameter("min_range_m", 0.05)
+ self.declare_parameter("kf_process_noise", 0.1)
+ self.declare_parameter("kf_meas_noise", 0.3)
+ self.declare_parameter("publish_rate", 20.0)
+
+ self._p = self._load_params()
+
+ # ── State (protected by lock) ──────────────────────────────────────
+ self._lock = threading.Lock()
+ self._ranges = {} # anchor_id → (range_m, rssi, timestamp)
+ self._kf = KalmanFilter2D(
+ process_noise=self._p["kf_process_noise"],
+ measurement_noise=self._p["kf_meas_noise"],
+ dt=1.0 / self._p["publish_rate"],
+ )
+
+ # ── Publishers ────────────────────────────────────────────────────
+ self._target_pub = self.create_publisher(
+ PoseStamped, "/uwb/target", 10)
+ self._ranges_pub = self.create_publisher(
+ UwbRangeArray, "/uwb/ranges", 10)
+
+ # ── Serial readers ────────────────────────────────────────────────
+ if _SERIAL_AVAILABLE:
+ self._reader_a = SerialReader(
+ self._p["port_a"], self._p["baudrate"],
+ anchor_id=0, callback=self._range_cb,
+ logger=self.get_logger(),
+ )
+ self._reader_b = SerialReader(
+ self._p["port_b"], self._p["baudrate"],
+ anchor_id=1, callback=self._range_cb,
+ logger=self.get_logger(),
+ )
+ self._reader_a.start()
+ self._reader_b.start()
+ else:
+ self.get_logger().warn(
+ "pyserial not installed — running in simulation mode (no serial I/O)"
+ )
+
+ # ── Publish timer ─────────────────────────────────────────────────
+ self._timer = self.create_timer(
+ 1.0 / self._p["publish_rate"], self._publish_cb
+ )
+
+ self.get_logger().info(
+ f"UWB driver ready sep={self._p['anchor_separation']}m "
+ f"ports={self._p['port_a']},{self._p['port_b']} "
+ f"rate={self._p['publish_rate']}Hz"
+ )
+
+ # ── Helpers ───────────────────────────────────────────────────────────────
+
+ def _load_params(self):
+ return {
+ "port_a": self.get_parameter("port_a").value,
+ "port_b": self.get_parameter("port_b").value,
+ "baudrate": self.get_parameter("baudrate").value,
+ "anchor_separation": self.get_parameter("anchor_separation").value,
+ "anchor_height": self.get_parameter("anchor_height").value,
+ "tag_height": self.get_parameter("tag_height").value,
+ "range_timeout_s": self.get_parameter("range_timeout_s").value,
+ "max_range_m": self.get_parameter("max_range_m").value,
+ "min_range_m": self.get_parameter("min_range_m").value,
+ "kf_process_noise": self.get_parameter("kf_process_noise").value,
+ "kf_meas_noise": self.get_parameter("kf_meas_noise").value,
+ "publish_rate": self.get_parameter("publish_rate").value,
+ }
+
+ # ── Callbacks ─────────────────────────────────────────────────────────────
+
+ def _range_cb(self, anchor_id: int, range_mm: int, rssi: float):
+ """Called from serial reader threads — thread-safe update."""
+ range_m = range_mm / 1000.0
+ p = self._p
+ if range_m < p["min_range_m"] or range_m > p["max_range_m"]:
+ return
+ with self._lock:
+ self._ranges[anchor_id] = (range_m, rssi, time.monotonic())
+
+ def _publish_cb(self):
+ now = time.monotonic()
+ timeout = self._p["range_timeout_s"]
+ sep = self._p["anchor_separation"]
+
+ with self._lock:
+ # Collect valid (non-stale) ranges
+ valid = {}
+ for aid, (r, rssi, t) in self._ranges.items():
+ if now - t <= timeout:
+ valid[aid] = (r, rssi, t)
+
+ # Build and publish UwbRangeArray regardless (even if partial)
+ hdr = Header()
+ hdr.stamp = self.get_clock().now().to_msg()
+ hdr.frame_id = "base_link"
+
+ arr = UwbRangeArray()
+ arr.header = hdr
+ for aid, (r, rssi, _) in valid.items():
+ entry = UwbRange()
+ entry.header = hdr
+ entry.anchor_id = aid
+ entry.range_m = float(r)
+ entry.raw_mm = int(round(r * 1000.0))
+ entry.rssi = float(rssi)
+ arr.ranges.append(entry)
+ self._ranges_pub.publish(arr)
+
+ # Need both anchors to triangulate
+ if 0 not in valid or 1 not in valid:
+ return
+
+ r0 = valid[0][0]
+ r1 = valid[1][0]
+
+ try:
+ x_t, y_t = triangulate_2anchor(
+ r0=r0,
+ r1=r1,
+ sep=sep,
+ anchor_z=self._p["anchor_height"],
+ tag_z=self._p["tag_height"],
+ )
+ except (ValueError, ZeroDivisionError) as exc:
+ self.get_logger().warn(f"Triangulation error: {exc}")
+ return
+
+ # Kalman filter update
+ dt = 1.0 / self._p["publish_rate"]
+ self._kf.predict(dt=dt)
+ self._kf.update(x_t, y_t)
+ kx, ky = self._kf.position()
+
+ # Publish PoseStamped in base_link
+ pose = PoseStamped()
+ pose.header = hdr
+ pose.pose.position.x = kx
+ pose.pose.position.y = ky
+ pose.pose.position.z = 0.0
+ # Orientation: face the person (yaw = atan2(y, x))
+ yaw = math.atan2(ky, kx)
+ pose.pose.orientation.z = math.sin(yaw / 2.0)
+ pose.pose.orientation.w = math.cos(yaw / 2.0)
+
+ self._target_pub.publish(pose)
+
+
+# ── Entry point ───────────────────────────────────────────────────────────────
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = UwbDriverNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.try_shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_uwb/setup.cfg b/jetson/ros2_ws/src/saltybot_uwb/setup.cfg
new file mode 100644
index 0000000..a5b4a97
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/saltybot_uwb
+[install]
+install_scripts=$base/lib/saltybot_uwb
diff --git a/jetson/ros2_ws/src/saltybot_uwb/setup.py b/jetson/ros2_ws/src/saltybot_uwb/setup.py
new file mode 100644
index 0000000..6e3af9b
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/setup.py
@@ -0,0 +1,32 @@
+from setuptools import setup
+import os
+from glob import glob
+
+package_name = "saltybot_uwb"
+
+setup(
+ name=package_name,
+ version="0.1.0",
+ packages=[package_name],
+ data_files=[
+ ("share/ament_index/resource_index/packages",
+ ["resource/" + package_name]),
+ ("share/" + package_name, ["package.xml"]),
+ (os.path.join("share", package_name, "launch"),
+ glob("launch/*.py")),
+ (os.path.join("share", package_name, "config"),
+ glob("config/*.yaml")),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="seb",
+ maintainer_email="seb@vayrette.com",
+ description="UWB follow-me driver for saltybot (MaUWB ESP32-S3 DW3000)",
+ license="MIT",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ "uwb_driver = saltybot_uwb.uwb_driver_node:main",
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py b/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py
new file mode 100644
index 0000000..7ca83fe
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py
@@ -0,0 +1,174 @@
+"""
+test_ranging_math.py — Unit tests for UWB triangulation and Kalman filter.
+
+No ROS2 / serial / GPU dependencies — runs with plain pytest.
+"""
+
+import math
+import pytest
+
+from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
+
+
+# ── triangulate_2anchor ───────────────────────────────────────────────────────
+
+class TestTriangulate2Anchor:
+
+ def test_person_directly_ahead(self):
+ """Person 2m ahead, centred: x≈2, y≈0."""
+ sep = 0.25
+ # A0 at y=+0.125, A1 at y=-0.125 → symmetric → y_t ≈ 0
+ r0 = math.sqrt(2.0**2 + 0.125**2)
+ r1 = r0 # symmetric
+ x, y = triangulate_2anchor(r0, r1, sep)
+ assert abs(x - 2.0) < 0.02
+ assert abs(y) < 0.01
+
+ def test_person_offset_left(self):
+ """Person 2m ahead, 0.5m to the left: y_t ≈ +0.5."""
+ sep = 0.25
+ # A0 at y=+0.125, A1 at y=-0.125
+ px, py = 2.0, 0.5
+ r0 = math.sqrt(px**2 + (py - 0.125)**2)
+ r1 = math.sqrt(px**2 + (py + 0.125)**2)
+ x, y = triangulate_2anchor(r0, r1, sep)
+ assert abs(x - px) < 0.05
+ assert abs(y - py) < 0.05
+
+ def test_person_offset_right(self):
+ """Person 2m ahead, 0.5m to the right: y_t ≈ -0.5."""
+ sep = 0.25
+ px, py = 2.0, -0.5
+ r0 = math.sqrt(px**2 + (py - 0.125)**2)
+ r1 = math.sqrt(px**2 + (py + 0.125)**2)
+ x, y = triangulate_2anchor(r0, r1, sep)
+ assert abs(x - px) < 0.05
+ assert abs(y - py) < 0.05
+
+ def test_height_compensation_reduces_horizontal_range(self):
+ """With height difference, horizontal range < slant range → x smaller."""
+ sep = 0.25
+ # No height: x ≈ 2.0
+ x_flat, _ = triangulate_2anchor(2.0, 2.0, sep, anchor_z=0.0, tag_z=0.0)
+ # Same slant range but 0.5m height difference
+ slant = math.sqrt(2.0**2 + 0.5**2)
+ x_tall, _ = triangulate_2anchor(slant, slant, sep, anchor_z=0.8, tag_z=0.3)
+ # x_tall should be close to 2.0 (height-compensated back to horizontal)
+ assert abs(x_tall - x_flat) < 0.05
+
+ def test_returns_nonnegative_x(self):
+ """x_t is always ≥ 0 (person can't be geometrically behind anchors)."""
+ sep = 0.25
+ # Pathological: extremely mismatched ranges → sqrt clamped to 0
+ x, _ = triangulate_2anchor(0.1, 0.1, sep)
+ assert x >= 0.0
+
+ def test_invalid_sep_raises(self):
+ with pytest.raises(ValueError):
+ triangulate_2anchor(1.0, 1.0, sep=0.0)
+
+ def test_negative_range_raises(self):
+ with pytest.raises(ValueError):
+ triangulate_2anchor(-1.0, 1.0, sep=0.25)
+
+ def test_close_range(self):
+ """Person very close (0.3m) — should not crash."""
+ sep = 0.25
+ px, py = 0.3, 0.0
+ r0 = math.sqrt(px**2 + 0.125**2)
+ r1 = r0
+ x, y = triangulate_2anchor(r0, r1, sep)
+ assert x >= 0.0
+
+ def test_large_range(self):
+ """Person 7m ahead — within DW3000 practical range."""
+ sep = 0.25
+ r0 = math.sqrt(7.0**2 + 0.125**2)
+ r1 = r0
+ x, y = triangulate_2anchor(r0, r1, sep)
+ assert abs(x - 7.0) < 0.1
+ assert abs(y) < 0.05
+
+
+# ── KalmanFilter2D ────────────────────────────────────────────────────────────
+
+class TestKalmanFilter2D:
+
+ def test_initial_position_after_first_update(self):
+ """First update initialises position to measurement."""
+ kf = KalmanFilter2D()
+ kf.predict()
+ kf.update(3.0, 1.0)
+ x, y = kf.position()
+ assert abs(x - 3.0) < 0.01
+ assert abs(y - 1.0) < 0.01
+
+ def test_converges_to_stationary_target(self):
+ """Repeated updates to same position → Kalman converges to it."""
+ kf = KalmanFilter2D(process_noise=0.1, measurement_noise=0.3)
+ for _ in range(50):
+ kf.predict(dt=0.05)
+ kf.update(2.0, 0.5)
+ x, y = kf.position()
+ assert abs(x - 2.0) < 0.1
+ assert abs(y - 0.5) < 0.1
+
+ def test_smooths_noisy_measurements(self):
+ """Filter output should be smoother than raw measurements."""
+ import random
+ rng = random.Random(42)
+ kf = KalmanFilter2D(process_noise=0.05, measurement_noise=0.5)
+ raw_errors = []
+ kf_errors = []
+ true_x, true_y = 2.0, 1.0
+ for _ in range(30):
+ noisy_x = true_x + rng.gauss(0, 0.3)
+ noisy_y = true_y + rng.gauss(0, 0.3)
+ kf.predict(dt=0.05)
+ kf.update(noisy_x, noisy_y)
+ kx, ky = kf.position()
+ raw_errors.append((noisy_x - true_x)**2 + (noisy_y - true_y)**2)
+ kf_errors.append((kx - true_x)**2 + (ky - true_y)**2)
+ avg_raw = sum(raw_errors[-10:]) / 10
+ avg_kf = sum(kf_errors[-10:]) / 10
+ assert avg_kf < avg_raw # filter is smoother
+
+ def test_reset_clears_state(self):
+ """After reset(), position returns to ~0."""
+ kf = KalmanFilter2D()
+ for _ in range(10):
+ kf.predict()
+ kf.update(5.0, 3.0)
+ kf.reset()
+ kf.predict()
+ kf.update(0.0, 0.0)
+ x, y = kf.position()
+ assert abs(x) < 0.1
+ assert abs(y) < 0.1
+
+ def test_velocity_nonzero_for_moving_target(self):
+ """For a target moving at constant velocity, vx should be detected."""
+ kf = KalmanFilter2D(process_noise=1.0, measurement_noise=0.1, dt=0.1)
+ # Target moving at 0.5 m/s in X
+ for i in range(40):
+ kf.predict(dt=0.1)
+ kf.update(0.5 * i * 0.1, 0.0)
+ vx, vy = kf.velocity()
+ assert abs(vx - 0.5) < 0.15
+
+ def test_custom_dt(self):
+ """predict() accepts custom dt without raising."""
+ kf = KalmanFilter2D()
+ kf.predict(dt=0.02)
+ kf.update(1.0, 1.0)
+ x, y = kf.position()
+ assert abs(x - 1.0) < 0.01
+
+ def test_no_nan_on_zero_measurement(self):
+ """Zero measurement should not produce NaN."""
+ kf = KalmanFilter2D()
+ kf.predict()
+ kf.update(0.0, 0.0)
+ x, y = kf.position()
+ assert not math.isnan(x)
+ assert not math.isnan(y)
diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt
new file mode 100644
index 0000000..0b67b39
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt
@@ -0,0 +1,15 @@
+cmake_minimum_required(VERSION 3.8)
+project(saltybot_uwb_msgs)
+
+find_package(ament_cmake REQUIRED)
+find_package(std_msgs REQUIRED)
+find_package(rosidl_default_generators REQUIRED)
+
+rosidl_generate_interfaces(${PROJECT_NAME}
+ "msg/UwbRange.msg"
+ "msg/UwbRangeArray.msg"
+ DEPENDENCIES std_msgs
+)
+
+ament_export_dependencies(rosidl_default_runtime)
+ament_package()
diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg
new file mode 100644
index 0000000..d11c8d9
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg
@@ -0,0 +1,13 @@
+# UwbRange.msg — range measurement from a single UWB anchor
+#
+# anchor_id : 0 = anchor-0 (port side), 1 = anchor-1 (starboard side)
+# range_m : measured horizontal range in metres (after height correction)
+# raw_mm : raw TWR range from AT+RANGE? response, millimetres
+# rssi : received signal strength (dBm), 0 if not reported by module
+
+std_msgs/Header header
+
+uint8 anchor_id
+float32 range_m
+uint32 raw_mm
+float32 rssi
diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRangeArray.msg b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRangeArray.msg
new file mode 100644
index 0000000..6f1bc8d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRangeArray.msg
@@ -0,0 +1,8 @@
+# UwbRangeArray.msg — all UWB anchor ranges in one message
+#
+# Published by uwb_driver_node on /uwb/ranges.
+# Contains one UwbRange entry per active anchor.
+
+std_msgs/Header header
+
+saltybot_uwb_msgs/UwbRange[] ranges
diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml b/jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml
new file mode 100644
index 0000000..d12fde8
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml
@@ -0,0 +1,22 @@
+
+
+
+ saltybot_uwb_msgs
+ 0.1.0
+ Custom ROS2 message definitions for UWB ranging (saltybot).
+ seb
+ MIT
+
+ ament_cmake
+ rosidl_default_generators
+
+ std_msgs
+
+ rosidl_default_runtime
+
+ rosidl_interface_packages
+
+
+ ament_cmake
+
+