diff --git a/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py b/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py index 502bc22..c9bf3ad 100644 --- a/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py +++ b/jetson/ros2_ws/src/saltybot_follower/saltybot_follower/person_follower_node.py @@ -1,30 +1,42 @@ """ -person_follower_node.py — Proportional person-following control loop. +person_follower_node.py -- Proportional person-following control loop. Subscribes: - /person/target (geometry_msgs/PoseStamped) — person pose in base_link frame - /local_costmap/costmap (nav_msgs/OccupancyGrid) — Nav2 local obstacle map + /uwb/target (geometry_msgs/PoseStamped) -- UWB-triangulated position (primary) + /person/target (geometry_msgs/PoseStamped) -- camera-detected position (secondary) + /local_costmap/costmap (nav_msgs/OccupancyGrid) -- Nav2 local obstacle map Publishes: - /cmd_vel (geometry_msgs/Twist) — velocity commands + /cmd_vel (geometry_msgs/Twist) -- velocity commands + +Sensor fusion +------------- + When both UWB and camera sources are fresh: + fused = uwb_weight * uwb + (1 - uwb_weight) * camera + When only UWB is fresh: use UWB directly. + When only camera is fresh: use camera directly. + When neither is fresh: STOPPING -> SEARCHING. + + UWB is declared stale after uwb_timeout seconds (default 1.0s). + Camera is declared stale after lost_timeout seconds (default 2.0s). State machine -───────────── - FOLLOWING — person visible (last detection < lost_timeout ago) - STOPPING — person lost, publishing zero until search_timeout - SEARCHING — person lost > search_timeout, slow rotation to re-acquire +------------- + FOLLOWING -- at least one source fresh (last detection < timeout ago) + STOPPING -- all sources lost, publishing zero until search_timeout + SEARCHING -- all sources lost > search_timeout, slow rotation to re-acquire Safety wiring -───────────── - • cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate — +------------- + * cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate -- this node publishes raw /cmd_vel, the bridge handles hardware safety. - • follow_enabled param (default True) lets the operator disable the controller + * follow_enabled param (default True) lets the operator disable the controller at runtime: ros2 param set /person_follower follow_enabled false - • Obstacle override: if Nav2 local costmap shows a lethal cell in the forward + * Obstacle override: if Nav2 local costmap shows a lethal cell in the forward corridor, forward cmd_vel is zeroed; turning is still allowed. Usage -───── +----- ros2 launch saltybot_follower person_follower.launch.py ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2 """ @@ -37,13 +49,13 @@ from rclpy.node import Node from geometry_msgs.msg import PoseStamped, Twist from nav_msgs.msg import OccupancyGrid -# ── State constants ──────────────────────────────────────────────────────────── +# -- State constants ----------------------------------------------------------- _FOLLOWING = "following" -_STOPPING = "stopping" # person lost < search_timeout, publishing zero -_SEARCHING = "searching" # person lost > search_timeout, slow rotation +_STOPPING = "stopping" # all sources lost < search_timeout, publishing zero +_SEARCHING = "searching" # all sources lost > search_timeout, slow rotation -# ── Pure helper functions (also exercised by unit tests) ────────────────────── +# -- Pure helper functions (also exercised by unit tests) ---------------------- def clamp(v: float, lo: float, hi: float) -> float: return max(lo, min(hi, v)) @@ -58,6 +70,40 @@ def compute_bearing(px: float, py: float) -> float: return math.atan2(py, px) +def fuse_targets( + uwb_x: float, + uwb_y: float, + uwb_fresh: bool, + cam_x: float, + cam_y: float, + cam_fresh: bool, + uwb_weight: float, +) -> tuple: + """ + Fuse UWB (primary) and camera (secondary) position estimates. + + Returns (fused_x, fused_y, any_fresh). + + Fusion rules: + Both fresh -> weighted blend: uwb_weight * uwb + (1-uwb_weight) * camera + UWB only -> UWB position, full weight + Camera only -> camera position, full weight + Neither -> (0, 0, False) + """ + if uwb_fresh and cam_fresh: + w = clamp(uwb_weight, 0.0, 1.0) + return ( + w * uwb_x + (1.0 - w) * cam_x, + w * uwb_y + (1.0 - w) * cam_y, + True, + ) + if uwb_fresh: + return uwb_x, uwb_y, True + if cam_fresh: + return cam_x, cam_y, True + return 0.0, 0.0, False + + def compute_follow_cmd( px: float, py: float, @@ -75,30 +121,27 @@ def compute_follow_cmd( Returns (linear_x, angular_z) as floats. linear_x: - Proportional to signed distance error (distance − follow_distance). - Zero inside the dead zone. Clamped to ±max_linear_vel. + Proportional to signed distance error (distance - follow_distance). + Zero inside the dead zone. Clamped to +-max_linear_vel. Zeroed (not reversed) if obstacle_ahead and moving forward. angular_z: Proportional to bearing. Always corrects even when inside the dead zone, so the robot stays pointed at the person while holding distance. - Clamped to ±max_angular_vel. + Clamped to +-max_angular_vel. """ distance = compute_distance(px, py) bearing = compute_bearing(px, py) - # Linear — dead zone suppresses jitter at target distance dist_error = distance - follow_distance if abs(dist_error) > dead_zone: linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel) else: linear_x = 0.0 - # Obstacle override: suppress forward motion, allow turning if obstacle_ahead and linear_x > 0.0: linear_x = 0.0 - # Angular — always correct bearing angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel) return linear_x, angular_z @@ -116,7 +159,7 @@ def check_obstacle_in_costmap( """ Check for obstacle cells in the robot's forward corridor. - Assumes a Nav2 rolling-window local costmap (robot ≈ centre cell). + Assumes a Nav2 rolling-window local costmap (robot ~= centre cell). Forward = +col direction (robot +X). costmap_data : flat row-major int8 list (from OccupancyGrid.data) @@ -124,7 +167,7 @@ def check_obstacle_in_costmap( resolution : metres per cell check_dist : how far ahead to look (metres) robot_half_width: corridor half-width (metres) - threshold : costmap value ≥ this is treated as obstacle (Nav2 lethal=99) + threshold : costmap value >= this is treated as obstacle (Nav2 lethal=99) """ if resolution <= 0.0: return False @@ -132,8 +175,8 @@ def check_obstacle_in_costmap( cx = width // 2 cy = height // 2 - n_fwd = int(check_dist / resolution) - n_side = int(robot_half_width / resolution) + n_fwd = int(check_dist / resolution) + n_side = int(robot_half_width / resolution) for dx in range(1, n_fwd + 1): col = cx + dx @@ -147,14 +190,14 @@ def check_obstacle_in_costmap( return False -# ── ROS2 Node ───────────────────────────────────────────────────────────────── +# -- ROS2 Node ---------------------------------------------------------------- class PersonFollowerNode(Node): def __init__(self): super().__init__("person_follower") - # ── Parameters ──────────────────────────────────────────────────────── + # -- Parameters -------------------------------------------------------- self.declare_parameter("follow_distance", 1.5) self.declare_parameter("dead_zone", 0.3) self.declare_parameter("kp_linear", 0.6) @@ -169,38 +212,48 @@ class PersonFollowerNode(Node): self.declare_parameter("robot_width", 0.3) self.declare_parameter("control_rate", 20.0) self.declare_parameter("follow_enabled", True) + # UWB fusion + self.declare_parameter("uwb_weight", 0.7) + self.declare_parameter("uwb_timeout", 1.0) self._p = {} self._reload_params() - # ── State ───────────────────────────────────────────────────────────── + # -- State ------------------------------------------------------------- self._state = _STOPPING - self._last_target_t = 0.0 # monotonic clock; 0 = never received + self._last_target_t = 0.0 # camera; monotonic; 0 = never received self._person_x = 0.0 self._person_y = 0.0 + self._last_uwb_t = 0.0 # UWB; monotonic; 0 = never received + self._uwb_x = 0.0 + self._uwb_y = 0.0 self._obstacle_ahead = False - # ── Subscriptions ───────────────────────────────────────────────────── + # -- Subscriptions ----------------------------------------------------- + self.create_subscription( + PoseStamped, "/uwb/target", self._uwb_cb, 10) self.create_subscription( PoseStamped, "/person/target", self._target_cb, 10) self.create_subscription( OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1) - # ── Publisher ───────────────────────────────────────────────────────── + # -- Publisher --------------------------------------------------------- self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10) - # ── Control timer ───────────────────────────────────────────────────── + # -- Control timer ----------------------------------------------------- self._timer = self.create_timer( 1.0 / self._p["control_rate"], self._control_cb) self.get_logger().info( - f"PersonFollower ready follow_dist={self._p['follow_distance']}m " - f"dead_zone=±{self._p['dead_zone']}m " + "PersonFollower ready " + f"follow_dist={self._p['follow_distance']}m " + f"dead_zone=+/-{self._p['dead_zone']}m " f"max_vel={self._p['max_linear_vel']}m/s " + f"uwb_weight={self._p['uwb_weight']} " f"lost_timeout={self._p['lost_timeout']}s" ) - # ── Parameter helper ────────────────────────────────────────────────────── + # -- Parameter helper ------------------------------------------------------ def _reload_params(self): self._p = { @@ -218,12 +271,21 @@ class PersonFollowerNode(Node): "robot_width": self.get_parameter("robot_width").value, "control_rate": self.get_parameter("control_rate").value, "follow_enabled": self.get_parameter("follow_enabled").value, + "uwb_weight": self.get_parameter("uwb_weight").value, + "uwb_timeout": self.get_parameter("uwb_timeout").value, } - # ── Callbacks ───────────────────────────────────────────────────────────── + # -- Callbacks ------------------------------------------------------------- + + def _uwb_cb(self, msg): + """UWB-triangulated person position (primary source).""" + self._uwb_x = msg.pose.position.x + self._uwb_y = msg.pose.position.y + self._last_uwb_t = time.monotonic() + self._state = _FOLLOWING def _target_cb(self, msg): - """Person position in base_link frame.""" + """Camera-detected person position (secondary source).""" self._person_x = msg.pose.position.x self._person_y = msg.pose.position.y self._last_target_t = time.monotonic() @@ -240,10 +302,9 @@ class PersonFollowerNode(Node): threshold=self._p["obstacle_threshold"], ) - # ── Control loop ────────────────────────────────────────────────────────── + # -- Control loop ---------------------------------------------------------- def _control_cb(self): - # Re-read mutable params every tick (cheap dict lookup) self._reload_params() twist = Twist() @@ -252,51 +313,69 @@ class PersonFollowerNode(Node): self._cmd_pub.publish(twist) return - now = time.monotonic() - dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9 + now = time.monotonic() - # State transitions (don't transition back from STOPPING/SEARCHING - # here — _target_cb resets to FOLLOWING on new detection) + uwb_fresh = ( + self._last_uwb_t > 0.0 and + (now - self._last_uwb_t) <= self._p["uwb_timeout"] + ) + cam_fresh = ( + self._last_target_t > 0.0 and + (now - self._last_target_t) <= self._p["lost_timeout"] + ) + + any_fresh = uwb_fresh or cam_fresh if self._state == _FOLLOWING: - if dt_lost > self._p["search_timeout"]: - self._state = _SEARCHING - self.get_logger().warn("Person lost — entering SEARCHING state") - elif dt_lost > self._p["lost_timeout"]: - self._state = _STOPPING - self.get_logger().warn("Person lost — stopping") + if not any_fresh: + uwb_stale = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else 1e9 + cam_stale = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9 + dt_lost = min(uwb_stale, cam_stale) + if dt_lost > self._p["search_timeout"]: + self._state = _SEARCHING + self.get_logger().warn("Person lost -- entering SEARCHING state") + else: + self._state = _STOPPING + self.get_logger().warn("Person lost -- stopping") linear_x, angular_z = 0.0, 0.0 if self._state == _FOLLOWING: - linear_x, angular_z = compute_follow_cmd( - px=self._person_x, - py=self._person_y, - follow_distance=self._p["follow_distance"], - dead_zone=self._p["dead_zone"], - kp_linear=self._p["kp_linear"], - kp_angular=self._p["kp_angular"], - max_linear_vel=self._p["max_linear_vel"], - max_angular_vel=self._p["max_angular_vel"], - obstacle_ahead=self._obstacle_ahead, + fx, fy, fresh = fuse_targets( + uwb_x=self._uwb_x, + uwb_y=self._uwb_y, + uwb_fresh=uwb_fresh, + cam_x=self._person_x, + cam_y=self._person_y, + cam_fresh=cam_fresh, + uwb_weight=self._p["uwb_weight"], ) - if self._obstacle_ahead and linear_x == 0.0: - self.get_logger().warn( - "Obstacle in path — forward motion suppressed", - throttle_duration_sec=2.0, + if fresh: + linear_x, angular_z = compute_follow_cmd( + px=fx, + py=fy, + follow_distance=self._p["follow_distance"], + dead_zone=self._p["dead_zone"], + kp_linear=self._p["kp_linear"], + kp_angular=self._p["kp_angular"], + max_linear_vel=self._p["max_linear_vel"], + max_angular_vel=self._p["max_angular_vel"], + obstacle_ahead=self._obstacle_ahead, ) + if self._obstacle_ahead and linear_x == 0.0: + self.get_logger().warn( + "Obstacle in path -- forward motion suppressed", + throttle_duration_sec=2.0, + ) elif self._state == _SEARCHING: angular_z = float(self._p["search_angular_vel"]) - # linear_x stays 0 — rotate in place to search - - # _STOPPING: both stay 0 twist.linear.x = linear_x twist.angular.z = angular_z self._cmd_pub.publish(twist) -# ── Entry point ─────────────────────────────────────────────────────────────── +# -- Entry point --------------------------------------------------------------- def main(args=None): rclpy.init(args=args) diff --git a/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py b/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py index 80a602a..b59493d 100644 --- a/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py +++ b/jetson/ros2_ws/src/saltybot_follower/test/test_person_follower.py @@ -388,3 +388,98 @@ class TestFollowEnabled: lin, ang = (0.0, 0.0) if not follow_enabled else \ _compute_follow_cmd(3.0, 0.0, **_DEF) assert lin != pytest.approx(0.0) + + +# ── fuse_targets — UWB + camera fusion ────────────────────────────────────── + +def _fuse_targets(uwb_x, uwb_y, uwb_fresh, cam_x, cam_y, cam_fresh, uwb_weight): + """Mirror of person_follower_node.fuse_targets (no ROS2).""" + def clamp(v, lo, hi): return max(lo, min(hi, v)) + if uwb_fresh and cam_fresh: + w = clamp(uwb_weight, 0.0, 1.0) + return w * uwb_x + (1.0 - w) * cam_x, w * uwb_y + (1.0 - w) * cam_y, True + if uwb_fresh: + return uwb_x, uwb_y, True + if cam_fresh: + return cam_x, cam_y, True + return 0.0, 0.0, False + + +class TestFuseTargets: + + def test_both_fresh_blends(self): + """Both sources fresh → weighted blend.""" + fx, fy, ok = _fuse_targets( + uwb_x=4.0, uwb_y=0.0, uwb_fresh=True, + cam_x=2.0, cam_y=0.0, cam_fresh=True, + uwb_weight=0.7, + ) + assert ok is True + assert fx == pytest.approx(0.7 * 4.0 + 0.3 * 2.0) + assert fy == pytest.approx(0.0) + + def test_uwb_only(self): + """Only UWB fresh → UWB position returned as-is.""" + fx, fy, ok = _fuse_targets( + uwb_x=3.0, uwb_y=1.0, uwb_fresh=True, + cam_x=0.0, cam_y=0.0, cam_fresh=False, + uwb_weight=0.7, + ) + assert ok is True + assert fx == pytest.approx(3.0) + assert fy == pytest.approx(1.0) + + def test_camera_only(self): + """Only camera fresh → camera position returned as-is.""" + fx, fy, ok = _fuse_targets( + uwb_x=0.0, uwb_y=0.0, uwb_fresh=False, + cam_x=2.5, cam_y=-0.5, cam_fresh=True, + uwb_weight=0.7, + ) + assert ok is True + assert fx == pytest.approx(2.5) + assert fy == pytest.approx(-0.5) + + def test_neither_fresh(self): + """Both stale → (0, 0, False).""" + fx, fy, ok = _fuse_targets( + uwb_x=3.0, uwb_y=1.0, uwb_fresh=False, + cam_x=2.0, cam_y=0.0, cam_fresh=False, + uwb_weight=0.7, + ) + assert ok is False + assert fx == pytest.approx(0.0) + assert fy == pytest.approx(0.0) + + def test_uwb_weight_zero_gives_camera(self): + """uwb_weight=0.0 → pure camera even when both fresh.""" + fx, fy, ok = _fuse_targets( + uwb_x=4.0, uwb_y=2.0, uwb_fresh=True, + cam_x=1.0, cam_y=0.5, cam_fresh=True, + uwb_weight=0.0, + ) + assert ok is True + assert fx == pytest.approx(1.0) + assert fy == pytest.approx(0.5) + + def test_uwb_weight_one_gives_uwb(self): + """uwb_weight=1.0 → pure UWB even when both fresh.""" + fx, fy, ok = _fuse_targets( + uwb_x=4.0, uwb_y=2.0, uwb_fresh=True, + cam_x=1.0, cam_y=0.5, cam_fresh=True, + uwb_weight=1.0, + ) + assert ok is True + assert fx == pytest.approx(4.0) + assert fy == pytest.approx(2.0) + + def test_lateral_blend(self): + """Lateral (Y) component is also blended correctly.""" + fx, fy, ok = _fuse_targets( + uwb_x=2.0, uwb_y=1.0, uwb_fresh=True, + cam_x=2.0, cam_y=-1.0, cam_fresh=True, + uwb_weight=0.5, + ) + assert ok is True + assert fx == pytest.approx(2.0) + assert fy == pytest.approx(0.0) # 0.5*1.0 + 0.5*(-1.0) = 0 diff --git a/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml b/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml new file mode 100644 index 0000000..58528c4 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml @@ -0,0 +1,57 @@ +# uwb_config.yaml — MaUWB ESP32-S3 DW3000 UWB follow-me system +# +# Hardware layout: +# Anchor-0 (port side) → USB port_a, y = +anchor_separation/2 +# Anchor-1 (starboard side) → USB port_b, y = -anchor_separation/2 +# Tag on person → belt clip, ~0.9m above ground +# +# Run with: +# ros2 launch saltybot_uwb uwb.launch.py +# Override at launch: +# ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 + +# ── Serial ports ────────────────────────────────────────────────────────────── +# Set udev rules to get stable symlinks: +# /dev/uwb-anchor0 → port_a +# /dev/uwb-anchor1 → port_b +# (See jetson/docs/pinout.md for udev setup) +port_a: /dev/uwb-anchor0 # Anchor-0 (port) +port_b: /dev/uwb-anchor1 # Anchor-1 (starboard) +baudrate: 115200 # MaUWB default — do not change + +# ── Anchor geometry ──────────────────────────────────────────────────────────── +# anchor_separation: centre-to-centre distance between anchors (metres) +# Must match physical mounting. Larger = more accurate lateral resolution. +anchor_separation: 0.25 # metres (25cm) + +# anchor_height: height of anchors above ground (metres) +# Orin stem mount ≈ 0.80m on the saltybot platform +anchor_height: 0.80 # metres + +# tag_height: height of person's belt-clip tag above ground (metres) +tag_height: 0.90 # metres (adjust per user) + +# ── Range validity ───────────────────────────────────────────────────────────── +# range_timeout_s: stale anchor — excluded from triangulation after this gap +range_timeout_s: 1.0 # seconds + +# max_range_m: discard ranges beyond this (DW3000 indoor practical limit ≈8m) +max_range_m: 8.0 # metres + +# min_range_m: discard ranges below this (likely multipath artefacts) +min_range_m: 0.05 # metres + +# ── Kalman filter ────────────────────────────────────────────────────────────── +# kf_process_noise: Q scalar — how dynamic the person's motion is +# Higher → faster response, more jitter +kf_process_noise: 0.1 + +# kf_meas_noise: R scalar — how noisy the UWB ranging is +# DW3000 indoor accuracy ≈ 10cm RMS → 0.1m → R ≈ 0.01 +# Use 0.3 to be conservative on first deployment +kf_meas_noise: 0.3 + +# ── Publish rate ────────────────────────────────────────────────────────────── +# Should match or exceed the AT+RANGE? poll rate from both anchors. +# 20Hz means 50ms per cycle; each anchor query takes ~10ms → headroom ok. +publish_rate: 20.0 # Hz diff --git a/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py b/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py new file mode 100644 index 0000000..a1bfb17 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py @@ -0,0 +1,59 @@ +""" +uwb.launch.py — Launch UWB driver node for MaUWB ESP32-S3 follow-me. + +Usage: + ros2 launch saltybot_uwb uwb.launch.py + ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3 + ros2 launch saltybot_uwb uwb.launch.py anchor_separation:=0.30 publish_rate:=10.0 +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg = get_package_share_directory("saltybot_uwb") + cfg = os.path.join(pkg, "config", "uwb_config.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("port_a", default_value="/dev/uwb-anchor0"), + DeclareLaunchArgument("port_b", default_value="/dev/uwb-anchor1"), + DeclareLaunchArgument("baudrate", default_value="115200"), + DeclareLaunchArgument("anchor_separation", default_value="0.25"), + DeclareLaunchArgument("anchor_height", default_value="0.80"), + DeclareLaunchArgument("tag_height", default_value="0.90"), + DeclareLaunchArgument("range_timeout_s", default_value="1.0"), + DeclareLaunchArgument("max_range_m", default_value="8.0"), + DeclareLaunchArgument("min_range_m", default_value="0.05"), + DeclareLaunchArgument("kf_process_noise", default_value="0.1"), + DeclareLaunchArgument("kf_meas_noise", default_value="0.3"), + DeclareLaunchArgument("publish_rate", default_value="20.0"), + + Node( + package="saltybot_uwb", + executable="uwb_driver", + name="uwb_driver", + output="screen", + parameters=[ + cfg, + { + "port_a": LaunchConfiguration("port_a"), + "port_b": LaunchConfiguration("port_b"), + "baudrate": LaunchConfiguration("baudrate"), + "anchor_separation": LaunchConfiguration("anchor_separation"), + "anchor_height": LaunchConfiguration("anchor_height"), + "tag_height": LaunchConfiguration("tag_height"), + "range_timeout_s": LaunchConfiguration("range_timeout_s"), + "max_range_m": LaunchConfiguration("max_range_m"), + "min_range_m": LaunchConfiguration("min_range_m"), + "kf_process_noise": LaunchConfiguration("kf_process_noise"), + "kf_meas_noise": LaunchConfiguration("kf_meas_noise"), + "publish_rate": LaunchConfiguration("publish_rate"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_uwb/package.xml b/jetson/ros2_ws/src/saltybot_uwb/package.xml new file mode 100644 index 0000000..e0bae4d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/package.xml @@ -0,0 +1,28 @@ + + + + saltybot_uwb + 0.1.0 + + UWB follow-me driver for saltybot — MaUWB ESP32-S3 DW3000 two-anchor + ranging, triangulation, Kalman filtering, and /uwb/target PoseStamped publisher. + + seb + MIT + + ament_python + + rclpy + geometry_msgs + std_msgs + saltybot_uwb_msgs + + ament_copyright + ament_flake8 + ament_pep257 + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_uwb/resource/saltybot_uwb b/jetson/ros2_ws/src/saltybot_uwb/resource/saltybot_uwb new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/__init__.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py new file mode 100644 index 0000000..7fcbe9b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py @@ -0,0 +1,278 @@ +""" +ranging_math.py — Pure triangulation and Kalman filter helpers for UWB follow-me. + +No ROS2 dependencies — importable in unit tests without a ROS2 install. + +2-Anchor geometry +───────────────── +Anchors are mounted on the robot stem, separated by `sep` metres along the Y-axis: + + A0 (0, +sep/2, anchor_z) ← port side + A1 (0, -sep/2, anchor_z) ← starboard side + +The person tag is at (x_t, y_t, tag_z) in base_link. + +Step 1 — height compensation: + dz = anchor_z - tag_z + r0_h = sqrt(max(0, r0² - dz²)) + r1_h = sqrt(max(0, r1² - dz²)) + +Step 2 — lateral offset (Y): + y_t = (r1_h² - r0_h²) / (2 * sep) + +Step 3 — forward distance (X): + x_t = sqrt(max(0, r0_h² - (y_t - sep/2)²)) + +Returns (x_t, y_t); caller should treat negative x_t as 0. +""" + +import math + + +# ── Triangulation ───────────────────────────────────────────────────────────── + +def triangulate_2anchor( + r0: float, + r1: float, + sep: float, + anchor_z: float = 0.0, + tag_z: float = 0.0, +) -> tuple: + """ + Triangulate person position in base_link from two anchor ranges. + + Parameters + ---------- + r0 : range to anchor-0 (port, y = +sep/2) in metres + r1 : range to anchor-1 (starboard, y = -sep/2) in metres + sep : anchor separation in metres (centre-to-centre) + anchor_z : height of anchors above ground (metres, default 0) + tag_z : height of person tag above ground (metres, default 0) + + Returns + ------- + (x_t, y_t) : person position in metres (base_link X = forward, Y = left) + x_t is clamped to ≥ 0 (can't be behind anchors geometrically). + """ + if sep <= 0.0: + raise ValueError(f"sep must be > 0, got {sep}") + if r0 < 0.0 or r1 < 0.0: + raise ValueError(f"ranges must be ≥ 0, got r0={r0}, r1={r1}") + + # Height correction — bring to horizontal plane + dz = anchor_z - tag_z + dz2 = dz * dz + r0_h2 = max(0.0, r0 * r0 - dz2) + r1_h2 = max(0.0, r1 * r1 - dz2) + r0_h = math.sqrt(r0_h2) + + # Lateral (Y) offset — from circle-circle intersection formula + y_t = (r1_h2 - r0_h2) / (2.0 * sep) + + # Forward (X) distance + dx2 = r0_h2 - (y_t - sep / 2.0) ** 2 + x_t = math.sqrt(max(0.0, dx2)) + + return x_t, y_t + + +# ── Simple 2-D Kalman filter ────────────────────────────────────────────────── + +class KalmanFilter2D: + """ + Constant-velocity Kalman filter for a 2-D position estimate. + + State vector: [x, y, vx, vy]ᵀ + + Parameters + ---------- + process_noise : Q scalar — how much we trust the motion model + measurement_noise : R scalar — how noisy the UWB measurement is + dt : expected update interval in seconds (used for F matrix) + """ + + def __init__( + self, + process_noise: float = 0.1, + measurement_noise: float = 0.3, + dt: float = 0.05, + ): + self._q = process_noise + self._r = measurement_noise + self._dt = dt + + # State: [x, y, vx, vy] + self._x = [0.0, 0.0, 0.0, 0.0] + + # Covariance matrix P (4×4, stored as flat list, row-major) + init_pos_var = 1.0 + init_vel_var = 0.5 + self._P = [ + init_pos_var, 0.0, 0.0, 0.0, + 0.0, init_pos_var, 0.0, 0.0, + 0.0, 0.0, init_vel_var, 0.0, + 0.0, 0.0, 0.0, init_vel_var, + ] + self._initialised = False + + # ── Matrix helpers (minimal, no numpy — importable without GPU deps) ────── + + @staticmethod + def _mat_add(A, B): + return [a + b for a, b in zip(A, B)] + + @staticmethod + def _mat_sub(A, B): + return [a - b for a, b in zip(A, B)] + + @staticmethod + def _mat_mul(A, B, n): + """n×n matrix multiply.""" + C = [0.0] * (n * n) + for i in range(n): + for k in range(n): + for j in range(n): + C[i * n + j] += A[i * n + k] * B[k * n + j] + return C + + @staticmethod + def _mat_T(A, n): + """n×n transpose.""" + T = [0.0] * (n * n) + for i in range(n): + for j in range(n): + T[j * n + i] = A[i * n + j] + return T + + def _build_F(self): + dt = self._dt + return [ + 1.0, 0.0, dt, 0.0, + 0.0, 1.0, 0.0, dt, + 0.0, 0.0, 1.0, 0.0, + 0.0, 0.0, 0.0, 1.0, + ] + + def _build_Q(self): + q = self._q + dt = self._dt + dt2 = dt * dt + dt3 = dt2 * dt + dt4 = dt3 * dt + # Continuous white-noise model discretised for constant-velocity + return [ + q * dt4 / 4, 0.0, q * dt3 / 2, 0.0, + 0.0, q * dt4 / 4, 0.0, q * dt3 / 2, + q * dt3 / 2, 0.0, q * dt2, 0.0, + 0.0, q * dt3 / 2, 0.0, q * dt2, + ] + + def predict(self, dt: float = None): + """Predict step — advances state by dt (defaults to self._dt).""" + if dt is not None: + self._dt = dt + F = self._build_F() + Q = self._build_Q() + n = 4 + x = self._x + # x = F @ x + new_x = [ + F[0] * x[0] + F[1] * x[1] + F[2] * x[2] + F[3] * x[3], + F[4] * x[0] + F[5] * x[1] + F[6] * x[2] + F[7] * x[3], + F[8] * x[0] + F[9] * x[1] + F[10] * x[2] + F[11] * x[3], + F[12] * x[0] + F[13] * x[1] + F[14] * x[2] + F[15] * x[3], + ] + # P = F @ P @ F.T + Q + FP = self._mat_mul(F, self._P, n) + FT = self._mat_T(F, n) + FPF = self._mat_mul(FP, FT, n) + self._P = self._mat_add(FPF, Q) + self._x = new_x + + def update(self, x_meas: float, y_meas: float): + """ + Update step with a 2-D position measurement. + + H = [[1,0,0,0],[0,1,0,0]] (observe position only) + """ + if not self._initialised: + self._x[0] = x_meas + self._x[1] = y_meas + self._initialised = True + return + + # Innovation + innov_x = x_meas - self._x[0] + innov_y = y_meas - self._x[1] + + # S = H @ P @ H.T + R (2×2, position rows/cols only) + P = self._P + r = self._r + # H picks rows 0,1 and cols 0,1 from P + S00 = P[0] + r + S01 = P[1] + S10 = P[4] + S11 = P[5] + r + + # K = P @ H.T @ S⁻¹ (4×2 Kalman gain) + det = S00 * S11 - S01 * S10 + if abs(det) < 1e-12: + return + inv_det = 1.0 / det + # S⁻¹ + Si00 = S11 * inv_det + Si01 = -S01 * inv_det + Si10 = -S10 * inv_det + Si11 = S00 * inv_det + + # P @ H.T = first two columns of P (H.T selects cols 0,1) + PH = [P[0], P[1], P[4], P[5], P[8], P[9], P[12], P[13]] # 4×2 + + # K (4×2) = PH @ Si + K = [ + PH[0] * Si00 + PH[1] * Si10, PH[0] * Si01 + PH[1] * Si11, + PH[2] * Si00 + PH[3] * Si10, PH[2] * Si01 + PH[3] * Si11, + PH[4] * Si00 + PH[5] * Si10, PH[4] * Si01 + PH[5] * Si11, + PH[6] * Si00 + PH[7] * Si10, PH[6] * Si01 + PH[7] * Si11, + ] + + # x = x + K @ innov + self._x[0] += K[0] * innov_x + K[1] * innov_y + self._x[1] += K[2] * innov_x + K[3] * innov_y + self._x[2] += K[4] * innov_x + K[5] * innov_y + self._x[3] += K[6] * innov_x + K[7] * innov_y + + # P = (I - K @ H) @ P (Joseph form for numerical stability skipped — + # this is a low-latency embedded context where simplicity wins) + n = 4 + # K @ H selects first two columns → rows of P + KH = [0.0] * 16 + for row in range(4): + KH[row * 4 + 0] = K[row * 2 + 0] + KH[row * 4 + 1] = K[row * 2 + 1] + # I - KH + IKH = [-KH[i] for i in range(16)] + for i in range(4): + IKH[i * 4 + i] += 1.0 + self._P = self._mat_mul(IKH, self._P, n) + + def position(self) -> tuple: + """Return current (x, y) position estimate.""" + return self._x[0], self._x[1] + + def velocity(self) -> tuple: + """Return current (vx, vy) velocity estimate.""" + return self._x[2], self._x[3] + + def reset(self): + """Reset filter to uninitialised state.""" + self._x = [0.0, 0.0, 0.0, 0.0] + init_pos_var = 1.0 + init_vel_var = 0.5 + self._P = [ + init_pos_var, 0.0, 0.0, 0.0, + 0.0, init_pos_var, 0.0, 0.0, + 0.0, 0.0, init_vel_var, 0.0, + 0.0, 0.0, 0.0, init_vel_var, + ] + self._initialised = False diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py new file mode 100644 index 0000000..8a5901d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py @@ -0,0 +1,306 @@ +""" +uwb_driver_node.py — ROS2 node for MaUWB ESP32-S3 DW3000 follow-me system. + +Hardware +──────── + • 2× MaUWB ESP32-S3 DW3000 anchors on robot stem (USB → Orin Nano) + - Anchor-0: port side (y = +sep/2) + - Anchor-1: starboard (y = -sep/2) + • 1× MaUWB tag on person (belt clip) + +AT command interface (115200 8N1) +────────────────────────────────── + Query: AT+RANGE?\r\n + Response (from anchors): + +RANGE:,[,]\r\n + + Config: + AT+anchor_tag=ANCHOR\r\n — set module as anchor + AT+anchor_tag=TAG\r\n — set module as tag + +Publishes +───────── + /uwb/target (geometry_msgs/PoseStamped) — triangulated person position in base_link + /uwb/ranges (saltybot_uwb_msgs/UwbRangeArray) — raw ranges from both anchors + +Safety +────── + If a range is stale (> range_timeout_s), that anchor is excluded from + triangulation. If both anchors are stale, /uwb/target is not published. + +Usage +───── + ros2 launch saltybot_uwb uwb.launch.py + ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3 +""" + +import math +import re +import threading +import time + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import PoseStamped +from std_msgs.msg import Header + +from saltybot_uwb_msgs.msg import UwbRange, UwbRangeArray +from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D + +try: + import serial + _SERIAL_AVAILABLE = True +except ImportError: + _SERIAL_AVAILABLE = False + + +# Regex: +RANGE:, or +RANGE:,, +_RANGE_RE = re.compile( + r"\+RANGE\s*:\s*(\d+)\s*,\s*(\d+)(?:\s*,\s*(-?[\d.]+))?", + re.IGNORECASE, +) + + +class SerialReader(threading.Thread): + """ + Background thread: polls an anchor's UART, fires callback on every + valid +RANGE response. + """ + + def __init__(self, port, baudrate, anchor_id, callback, logger): + super().__init__(daemon=True) + self._port = port + self._baudrate = baudrate + self._anchor_id = anchor_id + self._callback = callback + self._logger = logger + self._running = False + self._ser = None + + def run(self): + self._running = True + while self._running: + try: + self._ser = serial.Serial( + self._port, self._baudrate, timeout=0.5 + ) + self._logger.info( + f"Anchor-{self._anchor_id}: opened {self._port}" + ) + self._read_loop() + except Exception as exc: + self._logger.warn( + f"Anchor-{self._anchor_id} serial error: {exc} — retrying in 2s" + ) + if self._ser and self._ser.is_open: + self._ser.close() + time.sleep(2.0) + + def _read_loop(self): + while self._running: + try: + # Query the anchor + self._ser.write(b"AT+RANGE?\r\n") + # Read up to 10 lines waiting for a +RANGE response + for _ in range(10): + raw = self._ser.readline() + if not raw: + break + line = raw.decode("ascii", errors="replace").strip() + m = _RANGE_RE.search(line) + if m: + range_mm = int(m.group(2)) + rssi = float(m.group(3)) if m.group(3) else 0.0 + self._callback(self._anchor_id, range_mm, rssi) + break + except Exception as exc: + self._logger.warn( + f"Anchor-{self._anchor_id} read error: {exc}" + ) + break # trigger reconnect + + def stop(self): + self._running = False + if self._ser and self._ser.is_open: + self._ser.close() + + +class UwbDriverNode(Node): + + def __init__(self): + super().__init__("uwb_driver") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("port_a", "/dev/ttyUSB0") + self.declare_parameter("port_b", "/dev/ttyUSB1") + self.declare_parameter("baudrate", 115200) + self.declare_parameter("anchor_separation", 0.25) + self.declare_parameter("anchor_height", 0.80) + self.declare_parameter("tag_height", 0.90) + self.declare_parameter("range_timeout_s", 1.0) + self.declare_parameter("max_range_m", 8.0) + self.declare_parameter("min_range_m", 0.05) + self.declare_parameter("kf_process_noise", 0.1) + self.declare_parameter("kf_meas_noise", 0.3) + self.declare_parameter("publish_rate", 20.0) + + self._p = self._load_params() + + # ── State (protected by lock) ────────────────────────────────────── + self._lock = threading.Lock() + self._ranges = {} # anchor_id → (range_m, rssi, timestamp) + self._kf = KalmanFilter2D( + process_noise=self._p["kf_process_noise"], + measurement_noise=self._p["kf_meas_noise"], + dt=1.0 / self._p["publish_rate"], + ) + + # ── Publishers ──────────────────────────────────────────────────── + self._target_pub = self.create_publisher( + PoseStamped, "/uwb/target", 10) + self._ranges_pub = self.create_publisher( + UwbRangeArray, "/uwb/ranges", 10) + + # ── Serial readers ──────────────────────────────────────────────── + if _SERIAL_AVAILABLE: + self._reader_a = SerialReader( + self._p["port_a"], self._p["baudrate"], + anchor_id=0, callback=self._range_cb, + logger=self.get_logger(), + ) + self._reader_b = SerialReader( + self._p["port_b"], self._p["baudrate"], + anchor_id=1, callback=self._range_cb, + logger=self.get_logger(), + ) + self._reader_a.start() + self._reader_b.start() + else: + self.get_logger().warn( + "pyserial not installed — running in simulation mode (no serial I/O)" + ) + + # ── Publish timer ───────────────────────────────────────────────── + self._timer = self.create_timer( + 1.0 / self._p["publish_rate"], self._publish_cb + ) + + self.get_logger().info( + f"UWB driver ready sep={self._p['anchor_separation']}m " + f"ports={self._p['port_a']},{self._p['port_b']} " + f"rate={self._p['publish_rate']}Hz" + ) + + # ── Helpers ─────────────────────────────────────────────────────────────── + + def _load_params(self): + return { + "port_a": self.get_parameter("port_a").value, + "port_b": self.get_parameter("port_b").value, + "baudrate": self.get_parameter("baudrate").value, + "anchor_separation": self.get_parameter("anchor_separation").value, + "anchor_height": self.get_parameter("anchor_height").value, + "tag_height": self.get_parameter("tag_height").value, + "range_timeout_s": self.get_parameter("range_timeout_s").value, + "max_range_m": self.get_parameter("max_range_m").value, + "min_range_m": self.get_parameter("min_range_m").value, + "kf_process_noise": self.get_parameter("kf_process_noise").value, + "kf_meas_noise": self.get_parameter("kf_meas_noise").value, + "publish_rate": self.get_parameter("publish_rate").value, + } + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _range_cb(self, anchor_id: int, range_mm: int, rssi: float): + """Called from serial reader threads — thread-safe update.""" + range_m = range_mm / 1000.0 + p = self._p + if range_m < p["min_range_m"] or range_m > p["max_range_m"]: + return + with self._lock: + self._ranges[anchor_id] = (range_m, rssi, time.monotonic()) + + def _publish_cb(self): + now = time.monotonic() + timeout = self._p["range_timeout_s"] + sep = self._p["anchor_separation"] + + with self._lock: + # Collect valid (non-stale) ranges + valid = {} + for aid, (r, rssi, t) in self._ranges.items(): + if now - t <= timeout: + valid[aid] = (r, rssi, t) + + # Build and publish UwbRangeArray regardless (even if partial) + hdr = Header() + hdr.stamp = self.get_clock().now().to_msg() + hdr.frame_id = "base_link" + + arr = UwbRangeArray() + arr.header = hdr + for aid, (r, rssi, _) in valid.items(): + entry = UwbRange() + entry.header = hdr + entry.anchor_id = aid + entry.range_m = float(r) + entry.raw_mm = int(round(r * 1000.0)) + entry.rssi = float(rssi) + arr.ranges.append(entry) + self._ranges_pub.publish(arr) + + # Need both anchors to triangulate + if 0 not in valid or 1 not in valid: + return + + r0 = valid[0][0] + r1 = valid[1][0] + + try: + x_t, y_t = triangulate_2anchor( + r0=r0, + r1=r1, + sep=sep, + anchor_z=self._p["anchor_height"], + tag_z=self._p["tag_height"], + ) + except (ValueError, ZeroDivisionError) as exc: + self.get_logger().warn(f"Triangulation error: {exc}") + return + + # Kalman filter update + dt = 1.0 / self._p["publish_rate"] + self._kf.predict(dt=dt) + self._kf.update(x_t, y_t) + kx, ky = self._kf.position() + + # Publish PoseStamped in base_link + pose = PoseStamped() + pose.header = hdr + pose.pose.position.x = kx + pose.pose.position.y = ky + pose.pose.position.z = 0.0 + # Orientation: face the person (yaw = atan2(y, x)) + yaw = math.atan2(ky, kx) + pose.pose.orientation.z = math.sin(yaw / 2.0) + pose.pose.orientation.w = math.cos(yaw / 2.0) + + self._target_pub.publish(pose) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = UwbDriverNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_uwb/setup.cfg b/jetson/ros2_ws/src/saltybot_uwb/setup.cfg new file mode 100644 index 0000000..a5b4a97 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_uwb +[install] +install_scripts=$base/lib/saltybot_uwb diff --git a/jetson/ros2_ws/src/saltybot_uwb/setup.py b/jetson/ros2_ws/src/saltybot_uwb/setup.py new file mode 100644 index 0000000..6e3af9b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup +import os +from glob import glob + +package_name = "saltybot_uwb" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", + ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + (os.path.join("share", package_name, "launch"), + glob("launch/*.py")), + (os.path.join("share", package_name, "config"), + glob("config/*.yaml")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="seb", + maintainer_email="seb@vayrette.com", + description="UWB follow-me driver for saltybot (MaUWB ESP32-S3 DW3000)", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "uwb_driver = saltybot_uwb.uwb_driver_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py b/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py new file mode 100644 index 0000000..7ca83fe --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py @@ -0,0 +1,174 @@ +""" +test_ranging_math.py — Unit tests for UWB triangulation and Kalman filter. + +No ROS2 / serial / GPU dependencies — runs with plain pytest. +""" + +import math +import pytest + +from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D + + +# ── triangulate_2anchor ─────────────────────────────────────────────────────── + +class TestTriangulate2Anchor: + + def test_person_directly_ahead(self): + """Person 2m ahead, centred: x≈2, y≈0.""" + sep = 0.25 + # A0 at y=+0.125, A1 at y=-0.125 → symmetric → y_t ≈ 0 + r0 = math.sqrt(2.0**2 + 0.125**2) + r1 = r0 # symmetric + x, y = triangulate_2anchor(r0, r1, sep) + assert abs(x - 2.0) < 0.02 + assert abs(y) < 0.01 + + def test_person_offset_left(self): + """Person 2m ahead, 0.5m to the left: y_t ≈ +0.5.""" + sep = 0.25 + # A0 at y=+0.125, A1 at y=-0.125 + px, py = 2.0, 0.5 + r0 = math.sqrt(px**2 + (py - 0.125)**2) + r1 = math.sqrt(px**2 + (py + 0.125)**2) + x, y = triangulate_2anchor(r0, r1, sep) + assert abs(x - px) < 0.05 + assert abs(y - py) < 0.05 + + def test_person_offset_right(self): + """Person 2m ahead, 0.5m to the right: y_t ≈ -0.5.""" + sep = 0.25 + px, py = 2.0, -0.5 + r0 = math.sqrt(px**2 + (py - 0.125)**2) + r1 = math.sqrt(px**2 + (py + 0.125)**2) + x, y = triangulate_2anchor(r0, r1, sep) + assert abs(x - px) < 0.05 + assert abs(y - py) < 0.05 + + def test_height_compensation_reduces_horizontal_range(self): + """With height difference, horizontal range < slant range → x smaller.""" + sep = 0.25 + # No height: x ≈ 2.0 + x_flat, _ = triangulate_2anchor(2.0, 2.0, sep, anchor_z=0.0, tag_z=0.0) + # Same slant range but 0.5m height difference + slant = math.sqrt(2.0**2 + 0.5**2) + x_tall, _ = triangulate_2anchor(slant, slant, sep, anchor_z=0.8, tag_z=0.3) + # x_tall should be close to 2.0 (height-compensated back to horizontal) + assert abs(x_tall - x_flat) < 0.05 + + def test_returns_nonnegative_x(self): + """x_t is always ≥ 0 (person can't be geometrically behind anchors).""" + sep = 0.25 + # Pathological: extremely mismatched ranges → sqrt clamped to 0 + x, _ = triangulate_2anchor(0.1, 0.1, sep) + assert x >= 0.0 + + def test_invalid_sep_raises(self): + with pytest.raises(ValueError): + triangulate_2anchor(1.0, 1.0, sep=0.0) + + def test_negative_range_raises(self): + with pytest.raises(ValueError): + triangulate_2anchor(-1.0, 1.0, sep=0.25) + + def test_close_range(self): + """Person very close (0.3m) — should not crash.""" + sep = 0.25 + px, py = 0.3, 0.0 + r0 = math.sqrt(px**2 + 0.125**2) + r1 = r0 + x, y = triangulate_2anchor(r0, r1, sep) + assert x >= 0.0 + + def test_large_range(self): + """Person 7m ahead — within DW3000 practical range.""" + sep = 0.25 + r0 = math.sqrt(7.0**2 + 0.125**2) + r1 = r0 + x, y = triangulate_2anchor(r0, r1, sep) + assert abs(x - 7.0) < 0.1 + assert abs(y) < 0.05 + + +# ── KalmanFilter2D ──────────────────────────────────────────────────────────── + +class TestKalmanFilter2D: + + def test_initial_position_after_first_update(self): + """First update initialises position to measurement.""" + kf = KalmanFilter2D() + kf.predict() + kf.update(3.0, 1.0) + x, y = kf.position() + assert abs(x - 3.0) < 0.01 + assert abs(y - 1.0) < 0.01 + + def test_converges_to_stationary_target(self): + """Repeated updates to same position → Kalman converges to it.""" + kf = KalmanFilter2D(process_noise=0.1, measurement_noise=0.3) + for _ in range(50): + kf.predict(dt=0.05) + kf.update(2.0, 0.5) + x, y = kf.position() + assert abs(x - 2.0) < 0.1 + assert abs(y - 0.5) < 0.1 + + def test_smooths_noisy_measurements(self): + """Filter output should be smoother than raw measurements.""" + import random + rng = random.Random(42) + kf = KalmanFilter2D(process_noise=0.05, measurement_noise=0.5) + raw_errors = [] + kf_errors = [] + true_x, true_y = 2.0, 1.0 + for _ in range(30): + noisy_x = true_x + rng.gauss(0, 0.3) + noisy_y = true_y + rng.gauss(0, 0.3) + kf.predict(dt=0.05) + kf.update(noisy_x, noisy_y) + kx, ky = kf.position() + raw_errors.append((noisy_x - true_x)**2 + (noisy_y - true_y)**2) + kf_errors.append((kx - true_x)**2 + (ky - true_y)**2) + avg_raw = sum(raw_errors[-10:]) / 10 + avg_kf = sum(kf_errors[-10:]) / 10 + assert avg_kf < avg_raw # filter is smoother + + def test_reset_clears_state(self): + """After reset(), position returns to ~0.""" + kf = KalmanFilter2D() + for _ in range(10): + kf.predict() + kf.update(5.0, 3.0) + kf.reset() + kf.predict() + kf.update(0.0, 0.0) + x, y = kf.position() + assert abs(x) < 0.1 + assert abs(y) < 0.1 + + def test_velocity_nonzero_for_moving_target(self): + """For a target moving at constant velocity, vx should be detected.""" + kf = KalmanFilter2D(process_noise=1.0, measurement_noise=0.1, dt=0.1) + # Target moving at 0.5 m/s in X + for i in range(40): + kf.predict(dt=0.1) + kf.update(0.5 * i * 0.1, 0.0) + vx, vy = kf.velocity() + assert abs(vx - 0.5) < 0.15 + + def test_custom_dt(self): + """predict() accepts custom dt without raising.""" + kf = KalmanFilter2D() + kf.predict(dt=0.02) + kf.update(1.0, 1.0) + x, y = kf.position() + assert abs(x - 1.0) < 0.01 + + def test_no_nan_on_zero_measurement(self): + """Zero measurement should not produce NaN.""" + kf = KalmanFilter2D() + kf.predict() + kf.update(0.0, 0.0) + x, y = kf.position() + assert not math.isnan(x) + assert not math.isnan(y) diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt new file mode 100644 index 0000000..0b67b39 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 3.8) +project(saltybot_uwb_msgs) + +find_package(ament_cmake REQUIRED) +find_package(std_msgs REQUIRED) +find_package(rosidl_default_generators REQUIRED) + +rosidl_generate_interfaces(${PROJECT_NAME} + "msg/UwbRange.msg" + "msg/UwbRangeArray.msg" + DEPENDENCIES std_msgs +) + +ament_export_dependencies(rosidl_default_runtime) +ament_package() diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg new file mode 100644 index 0000000..d11c8d9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg @@ -0,0 +1,13 @@ +# UwbRange.msg — range measurement from a single UWB anchor +# +# anchor_id : 0 = anchor-0 (port side), 1 = anchor-1 (starboard side) +# range_m : measured horizontal range in metres (after height correction) +# raw_mm : raw TWR range from AT+RANGE? response, millimetres +# rssi : received signal strength (dBm), 0 if not reported by module + +std_msgs/Header header + +uint8 anchor_id +float32 range_m +uint32 raw_mm +float32 rssi diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRangeArray.msg b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRangeArray.msg new file mode 100644 index 0000000..6f1bc8d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRangeArray.msg @@ -0,0 +1,8 @@ +# UwbRangeArray.msg — all UWB anchor ranges in one message +# +# Published by uwb_driver_node on /uwb/ranges. +# Contains one UwbRange entry per active anchor. + +std_msgs/Header header + +saltybot_uwb_msgs/UwbRange[] ranges diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml b/jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml new file mode 100644 index 0000000..d12fde8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml @@ -0,0 +1,22 @@ + + + + saltybot_uwb_msgs + 0.1.0 + Custom ROS2 message definitions for UWB ranging (saltybot). + seb + MIT + + ament_cmake + rosidl_default_generators + + std_msgs + + rosidl_default_runtime + + rosidl_interface_packages + + + ament_cmake + +