From 9a68dfdb2ea263656c13d9c3e9e8493b8bfa95d7 Mon Sep 17 00:00:00 2001 From: sl-firmware Date: Sun, 1 Mar 2026 23:25:08 -0500 Subject: [PATCH] feat(uwb): MaUWB ESP32-S3 DW3000 dual-anchor bearing driver (Issue #90) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - saltybot_uwb_msgs: add UwbBearing.msg, add tag_id to UwbRange.msg, register UwbBearing in CMakeLists.txt - ranging_math.py: add bearing_from_pos(x, y) helper (atan2-based) - uwb_driver_node.py: dual-rate architecture • 100 Hz /uwb/ranges — raw TWR ranges with tag_id attribution • 10 Hz /uwb/bearing — Kalman-fused bearing + range estimate • enrolled_tag_ids parameter for tag pairing filter • AT+RANGE_ADDR= pairing command on connect - uwb_config.yaml: range_rate / bearing_rate / enrolled_tag_ids params - uwb.launch.py: expose new params as launch arguments - test_ranging_math.py: 7 new bearing_from_pos unit tests Closes #90 Co-Authored-By: Claude Sonnet 4.6 --- .../src/saltybot_uwb/config/uwb_config.yaml | 61 +--- .../src/saltybot_uwb/launch/uwb.launch.py | 18 +- .../saltybot_uwb/saltybot_uwb/ranging_math.py | 20 ++ .../saltybot_uwb/uwb_driver_node.py | 266 +++++++++++------- .../saltybot_uwb/test/test_ranging_math.py | 46 ++- .../src/saltybot_uwb_msgs/CMakeLists.txt | 1 + .../src/saltybot_uwb_msgs/msg/UwbBearing.msg | 18 ++ .../src/saltybot_uwb_msgs/msg/UwbRange.msg | 2 + 8 files changed, 277 insertions(+), 155 deletions(-) create mode 100644 jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbBearing.msg diff --git a/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml b/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml index 58528c4..faed065 100644 --- a/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml +++ b/jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml @@ -1,57 +1,24 @@ -# uwb_config.yaml — MaUWB ESP32-S3 DW3000 UWB follow-me system -# -# Hardware layout: -# Anchor-0 (port side) → USB port_a, y = +anchor_separation/2 -# Anchor-1 (starboard side) → USB port_b, y = -anchor_separation/2 -# Tag on person → belt clip, ~0.9m above ground +# uwb_config.yaml — MaUWB ESP32-S3 DW3000 UWB integration (Issue #90) # # Run with: # ros2 launch saltybot_uwb uwb.launch.py -# Override at launch: -# ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 -# ── Serial ports ────────────────────────────────────────────────────────────── -# Set udev rules to get stable symlinks: -# /dev/uwb-anchor0 → port_a -# /dev/uwb-anchor1 → port_b -# (See jetson/docs/pinout.md for udev setup) -port_a: /dev/uwb-anchor0 # Anchor-0 (port) -port_b: /dev/uwb-anchor1 # Anchor-1 (starboard) -baudrate: 115200 # MaUWB default — do not change +port_a: /dev/uwb-anchor0 +port_b: /dev/uwb-anchor1 +baudrate: 115200 -# ── Anchor geometry ──────────────────────────────────────────────────────────── -# anchor_separation: centre-to-centre distance between anchors (metres) -# Must match physical mounting. Larger = more accurate lateral resolution. -anchor_separation: 0.25 # metres (25cm) +anchor_separation: 0.25 +anchor_height: 0.80 +tag_height: 0.90 -# anchor_height: height of anchors above ground (metres) -# Orin stem mount ≈ 0.80m on the saltybot platform -anchor_height: 0.80 # metres +range_timeout_s: 1.0 +max_range_m: 8.0 +min_range_m: 0.05 -# tag_height: height of person's belt-clip tag above ground (metres) -tag_height: 0.90 # metres (adjust per user) - -# ── Range validity ───────────────────────────────────────────────────────────── -# range_timeout_s: stale anchor — excluded from triangulation after this gap -range_timeout_s: 1.0 # seconds - -# max_range_m: discard ranges beyond this (DW3000 indoor practical limit ≈8m) -max_range_m: 8.0 # metres - -# min_range_m: discard ranges below this (likely multipath artefacts) -min_range_m: 0.05 # metres - -# ── Kalman filter ────────────────────────────────────────────────────────────── -# kf_process_noise: Q scalar — how dynamic the person's motion is -# Higher → faster response, more jitter kf_process_noise: 0.1 +kf_meas_noise: 0.3 -# kf_meas_noise: R scalar — how noisy the UWB ranging is -# DW3000 indoor accuracy ≈ 10cm RMS → 0.1m → R ≈ 0.01 -# Use 0.3 to be conservative on first deployment -kf_meas_noise: 0.3 +range_rate: 100.0 +bearing_rate: 10.0 -# ── Publish rate ────────────────────────────────────────────────────────────── -# Should match or exceed the AT+RANGE? poll rate from both anchors. -# 20Hz means 50ms per cycle; each anchor query takes ~10ms → headroom ok. -publish_rate: 20.0 # Hz +enrolled_tag_ids: [""] diff --git a/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py b/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py index a1bfb17..7d9386d 100644 --- a/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py +++ b/jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py @@ -1,10 +1,14 @@ """ -uwb.launch.py — Launch UWB driver node for MaUWB ESP32-S3 follow-me. +uwb.launch.py — Launch UWB driver node for MaUWB ESP32-S3 DW3000 (Issue #90). + +Topics: + /uwb/ranges 100 Hz raw TWR ranges + /uwb/bearing 10 Hz Kalman-fused bearing + /uwb/target 10 Hz triangulated PoseStamped (backwards compat) Usage: ros2 launch saltybot_uwb uwb.launch.py - ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3 - ros2 launch saltybot_uwb uwb.launch.py anchor_separation:=0.30 publish_rate:=10.0 + ros2 launch saltybot_uwb uwb.launch.py enrolled_tag_ids:="['0xDEADBEEF']" """ import os @@ -31,7 +35,9 @@ def generate_launch_description(): DeclareLaunchArgument("min_range_m", default_value="0.05"), DeclareLaunchArgument("kf_process_noise", default_value="0.1"), DeclareLaunchArgument("kf_meas_noise", default_value="0.3"), - DeclareLaunchArgument("publish_rate", default_value="20.0"), + DeclareLaunchArgument("range_rate", default_value="100.0"), + DeclareLaunchArgument("bearing_rate", default_value="10.0"), + DeclareLaunchArgument("enrolled_tag_ids", default_value="['']"), Node( package="saltybot_uwb", @@ -52,7 +58,9 @@ def generate_launch_description(): "min_range_m": LaunchConfiguration("min_range_m"), "kf_process_noise": LaunchConfiguration("kf_process_noise"), "kf_meas_noise": LaunchConfiguration("kf_meas_noise"), - "publish_rate": LaunchConfiguration("publish_rate"), + "range_rate": LaunchConfiguration("range_rate"), + "bearing_rate": LaunchConfiguration("bearing_rate"), + "enrolled_tag_ids": LaunchConfiguration("enrolled_tag_ids"), }, ], ), diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py index 7fcbe9b..2063e67 100644 --- a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py +++ b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py @@ -29,6 +29,26 @@ Returns (x_t, y_t); caller should treat negative x_t as 0. import math +# ── Bearing helper ──────────────────────────────────────────────────────────── + +def bearing_from_pos(x: float, y: float) -> float: + """ + Compute horizontal bearing to a point (x, y) in base_link. + + Parameters + ---------- + x : forward distance (metres) + y : lateral offset (metres, positive = left of robot / CCW) + + Returns + ------- + bearing_rad : bearing in radians, range -π .. +π + positive = target to the left (CCW) + 0 = directly ahead + """ + return math.atan2(y, x) + + # ── Triangulation ───────────────────────────────────────────────────────────── def triangulate_2anchor( diff --git a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py index 8a5901d..60311f3 100644 --- a/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py +++ b/jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py @@ -1,32 +1,52 @@ """ -uwb_driver_node.py — ROS2 node for MaUWB ESP32-S3 DW3000 follow-me system. +uwb_driver_node.py — ROS2 node for MaUWB ESP32-S3 DW3000 UWB integration. Hardware ──────── - • 2× MaUWB ESP32-S3 DW3000 anchors on robot stem (USB → Orin Nano) + • 2× MaUWB ESP32-S3 DW3000 anchors on robot stem (USB → Orin) - Anchor-0: port side (y = +sep/2) - Anchor-1: starboard (y = -sep/2) - • 1× MaUWB tag on person (belt clip) + • 1× MaUWB tag per enrolled person (belt clip) AT command interface (115200 8N1) ────────────────────────────────── - Query: AT+RANGE?\r\n - Response (from anchors): - +RANGE:,[,]\r\n + Query: + AT+RANGE?\r\n - Config: - AT+anchor_tag=ANCHOR\r\n — set module as anchor - AT+anchor_tag=TAG\r\n — set module as tag + Response (from anchors, TWR protocol): + +RANGE:,[,[,]]\r\n + + Tag pairing (optional — targets a specific enrolled tag): + AT+RANGE_ADDR=\r\n → anchor only ranges with that tag Publishes ───────── - /uwb/target (geometry_msgs/PoseStamped) — triangulated person position in base_link - /uwb/ranges (saltybot_uwb_msgs/UwbRangeArray) — raw ranges from both anchors + /uwb/ranges (saltybot_uwb_msgs/UwbRangeArray) 100 Hz — raw anchor ranges + /uwb/bearing (saltybot_uwb_msgs/UwbBearing) 10 Hz — Kalman-fused bearing + /uwb/target (geometry_msgs/PoseStamped) 10 Hz — triangulated position + (kept for backwards compat) -Safety -────── - If a range is stale (> range_timeout_s), that anchor is excluded from - triangulation. If both anchors are stale, /uwb/target is not published. +Tag pairing +─────────── + Set enrolled_tag_ids to a list of tag address strings (e.g. ["0x1234ABCD"]). + When non-empty, ranges from unrecognised tags are silently discarded. + The matched tag address is stamped in UwbRange.tag_id and UwbBearing.tag_id. + When enrolled_tag_ids is empty, all ranges are accepted (tag_id = ""). + +Parameters +────────── + port_a, port_b serial ports for anchor-0 / anchor-1 + baudrate 115200 (default) + anchor_separation centre-to-centre anchor spacing (m) + anchor_height anchor mounting height (m) + tag_height person tag height (m) + range_timeout_s stale-anchor threshold (s) + max_range_m / min_range_m validity window (m) + kf_process_noise Kalman Q scalar + kf_meas_noise Kalman R scalar + range_rate Hz — /uwb/ranges publish rate (default 100) + bearing_rate Hz — /uwb/bearing publish rate (default 10) + enrolled_tag_ids list[str] — accepted tag addresses; [] = accept all Usage ───── @@ -44,8 +64,8 @@ from rclpy.node import Node from geometry_msgs.msg import PoseStamped from std_msgs.msg import Header -from saltybot_uwb_msgs.msg import UwbRange, UwbRangeArray -from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D +from saltybot_uwb_msgs.msg import UwbRange, UwbRangeArray, UwbBearing +from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D, bearing_from_pos try: import serial @@ -54,26 +74,31 @@ except ImportError: _SERIAL_AVAILABLE = False -# Regex: +RANGE:, or +RANGE:,, +# +RANGE:, or +RANGE:,, or +RANGE:,,, _RANGE_RE = re.compile( - r"\+RANGE\s*:\s*(\d+)\s*,\s*(\d+)(?:\s*,\s*(-?[\d.]+))?", + r"\+RANGE\s*:\s*(\d+)\s*,\s*(\d+)" + r"(?:\s*,\s*(-?[\d.]+)" + r"(?:\s*,\s*([\w:x]+))?" + r")?", re.IGNORECASE, ) class SerialReader(threading.Thread): """ - Background thread: polls an anchor's UART, fires callback on every - valid +RANGE response. + Background thread: polls one anchor's UART at maximum TWR rate, + fires callback on every valid +RANGE response. + Supports optional tag pairing via AT+RANGE_ADDR command. """ - def __init__(self, port, baudrate, anchor_id, callback, logger): + def __init__(self, port, baudrate, anchor_id, callback, logger, tag_addr=None): super().__init__(daemon=True) self._port = port self._baudrate = baudrate self._anchor_id = anchor_id self._callback = callback self._logger = logger + self._tag_addr = tag_addr self._running = False self._ser = None @@ -86,7 +111,10 @@ class SerialReader(threading.Thread): ) self._logger.info( f"Anchor-{self._anchor_id}: opened {self._port}" + + (f" paired with tag {self._tag_addr}" if self._tag_addr else "") ) + if self._tag_addr: + self._send_pairing_cmd() self._read_loop() except Exception as exc: self._logger.warn( @@ -96,12 +124,24 @@ class SerialReader(threading.Thread): self._ser.close() time.sleep(2.0) + def _send_pairing_cmd(self): + """Configure the anchor to range only with the paired tag.""" + try: + cmd = f"AT+RANGE_ADDR={self._tag_addr}\r\n".encode("ascii") + self._ser.write(cmd) + time.sleep(0.1) + self._logger.info( + f"Anchor-{self._anchor_id}: sent tag pairing {self._tag_addr}" + ) + except Exception as exc: + self._logger.warn( + f"Anchor-{self._anchor_id}: pairing cmd failed: {exc}" + ) + def _read_loop(self): while self._running: try: - # Query the anchor self._ser.write(b"AT+RANGE?\r\n") - # Read up to 10 lines waiting for a +RANGE response for _ in range(10): raw = self._ser.readline() if not raw: @@ -111,13 +151,14 @@ class SerialReader(threading.Thread): if m: range_mm = int(m.group(2)) rssi = float(m.group(3)) if m.group(3) else 0.0 - self._callback(self._anchor_id, range_mm, rssi) + tag_addr = m.group(4) if m.group(4) else "" + self._callback(self._anchor_id, range_mm, rssi, tag_addr) break except Exception as exc: self._logger.warn( f"Anchor-{self._anchor_id} read error: {exc}" ) - break # trigger reconnect + break def stop(self): self._running = False @@ -130,48 +171,51 @@ class UwbDriverNode(Node): def __init__(self): super().__init__("uwb_driver") - # ── Parameters ──────────────────────────────────────────────────────── - self.declare_parameter("port_a", "/dev/ttyUSB0") - self.declare_parameter("port_b", "/dev/ttyUSB1") - self.declare_parameter("baudrate", 115200) - self.declare_parameter("anchor_separation", 0.25) - self.declare_parameter("anchor_height", 0.80) - self.declare_parameter("tag_height", 0.90) - self.declare_parameter("range_timeout_s", 1.0) - self.declare_parameter("max_range_m", 8.0) - self.declare_parameter("min_range_m", 0.05) - self.declare_parameter("kf_process_noise", 0.1) - self.declare_parameter("kf_meas_noise", 0.3) - self.declare_parameter("publish_rate", 20.0) + self.declare_parameter("port_a", "/dev/uwb-anchor0") + self.declare_parameter("port_b", "/dev/uwb-anchor1") + self.declare_parameter("baudrate", 115200) + self.declare_parameter("anchor_separation", 0.25) + self.declare_parameter("anchor_height", 0.80) + self.declare_parameter("tag_height", 0.90) + self.declare_parameter("range_timeout_s", 1.0) + self.declare_parameter("max_range_m", 8.0) + self.declare_parameter("min_range_m", 0.05) + self.declare_parameter("kf_process_noise", 0.1) + self.declare_parameter("kf_meas_noise", 0.3) + self.declare_parameter("range_rate", 100.0) + self.declare_parameter("bearing_rate", 10.0) + self.declare_parameter("enrolled_tag_ids", [""]) self._p = self._load_params() - # ── State (protected by lock) ────────────────────────────────────── - self._lock = threading.Lock() - self._ranges = {} # anchor_id → (range_m, rssi, timestamp) - self._kf = KalmanFilter2D( + raw_ids = self.get_parameter("enrolled_tag_ids").value + self._enrolled_tags = [t.strip() for t in raw_ids if t.strip()] + paired_tag = self._enrolled_tags[0] if self._enrolled_tags else None + + self._lock = threading.Lock() + self._ranges: dict = {} + self._kf = KalmanFilter2D( process_noise=self._p["kf_process_noise"], measurement_noise=self._p["kf_meas_noise"], - dt=1.0 / self._p["publish_rate"], + dt=1.0 / self._p["bearing_rate"], ) - # ── Publishers ──────────────────────────────────────────────────── - self._target_pub = self.create_publisher( - PoseStamped, "/uwb/target", 10) - self._ranges_pub = self.create_publisher( - UwbRangeArray, "/uwb/ranges", 10) + self._ranges_pub = self.create_publisher(UwbRangeArray, "/uwb/ranges", 10) + self._bearing_pub = self.create_publisher(UwbBearing, "/uwb/bearing", 10) + self._target_pub = self.create_publisher(PoseStamped, "/uwb/target", 10) - # ── Serial readers ──────────────────────────────────────────────── if _SERIAL_AVAILABLE: self._reader_a = SerialReader( self._p["port_a"], self._p["baudrate"], anchor_id=0, callback=self._range_cb, logger=self.get_logger(), + tag_addr=paired_tag, ) self._reader_b = SerialReader( self._p["port_b"], self._p["baudrate"], anchor_id=1, callback=self._range_cb, logger=self.get_logger(), + tag_addr=paired_tag, ) self._reader_a.start() self._reader_b.start() @@ -180,19 +224,21 @@ class UwbDriverNode(Node): "pyserial not installed — running in simulation mode (no serial I/O)" ) - # ── Publish timer ───────────────────────────────────────────────── - self._timer = self.create_timer( - 1.0 / self._p["publish_rate"], self._publish_cb + self._range_timer = self.create_timer( + 1.0 / self._p["range_rate"], self._range_publish_cb + ) + self._bearing_timer = self.create_timer( + 1.0 / self._p["bearing_rate"], self._bearing_publish_cb ) self.get_logger().info( f"UWB driver ready sep={self._p['anchor_separation']}m " f"ports={self._p['port_a']},{self._p['port_b']} " - f"rate={self._p['publish_rate']}Hz" + f"range={self._p['range_rate']}Hz " + f"bearing={self._p['bearing_rate']}Hz " + f"enrolled_tags={self._enrolled_tags or ['']}" ) - # ── Helpers ─────────────────────────────────────────────────────────────── - def _load_params(self): return { "port_a": self.get_parameter("port_a").value, @@ -206,90 +252,106 @@ class UwbDriverNode(Node): "min_range_m": self.get_parameter("min_range_m").value, "kf_process_noise": self.get_parameter("kf_process_noise").value, "kf_meas_noise": self.get_parameter("kf_meas_noise").value, - "publish_rate": self.get_parameter("publish_rate").value, + "range_rate": self.get_parameter("range_rate").value, + "bearing_rate": self.get_parameter("bearing_rate").value, } - # ── Callbacks ───────────────────────────────────────────────────────────── + def _is_enrolled(self, tag_addr: str) -> bool: + if not self._enrolled_tags: + return True + return tag_addr in self._enrolled_tags - def _range_cb(self, anchor_id: int, range_mm: int, rssi: float): - """Called from serial reader threads — thread-safe update.""" + def _range_cb(self, anchor_id: int, range_mm: int, rssi: float, tag_addr: str): + if not self._is_enrolled(tag_addr): + return range_m = range_mm / 1000.0 p = self._p if range_m < p["min_range_m"] or range_m > p["max_range_m"]: return with self._lock: - self._ranges[anchor_id] = (range_m, rssi, time.monotonic()) + self._ranges[anchor_id] = (range_m, rssi, tag_addr, time.monotonic()) - def _publish_cb(self): - now = time.monotonic() + def _range_publish_cb(self): + """100 Hz: publish current raw ranges as UwbRangeArray.""" + now = time.monotonic() timeout = self._p["range_timeout_s"] - sep = self._p["anchor_separation"] - with self._lock: - # Collect valid (non-stale) ranges - valid = {} - for aid, (r, rssi, t) in self._ranges.items(): - if now - t <= timeout: - valid[aid] = (r, rssi, t) - - # Build and publish UwbRangeArray regardless (even if partial) + valid = { + aid: entry + for aid, entry in self._ranges.items() + if (now - entry[3]) <= timeout + } hdr = Header() hdr.stamp = self.get_clock().now().to_msg() hdr.frame_id = "base_link" - arr = UwbRangeArray() arr.header = hdr - for aid, (r, rssi, _) in valid.items(): + for aid, (r, rssi, tag_id, _) in valid.items(): entry = UwbRange() entry.header = hdr entry.anchor_id = aid entry.range_m = float(r) entry.raw_mm = int(round(r * 1000.0)) entry.rssi = float(rssi) + entry.tag_id = tag_id arr.ranges.append(entry) self._ranges_pub.publish(arr) - # Need both anchors to triangulate - if 0 not in valid or 1 not in valid: + def _bearing_publish_cb(self): + """10 Hz: Kalman predict+update, publish fused bearing.""" + now = time.monotonic() + timeout = self._p["range_timeout_s"] + sep = self._p["anchor_separation"] + with self._lock: + valid = { + aid: entry + for aid, entry in self._ranges.items() + if (now - entry[3]) <= timeout + } + if not valid: return - - r0 = valid[0][0] - r1 = valid[1][0] - - try: - x_t, y_t = triangulate_2anchor( - r0=r0, - r1=r1, - sep=sep, - anchor_z=self._p["anchor_height"], - tag_z=self._p["tag_height"], - ) - except (ValueError, ZeroDivisionError) as exc: - self.get_logger().warn(f"Triangulation error: {exc}") - return - - # Kalman filter update - dt = 1.0 / self._p["publish_rate"] + both_fresh = 0 in valid and 1 in valid + confidence = 1.0 if both_fresh else 0.5 + active_tag = valid[next(iter(valid))][2] + dt = 1.0 / self._p["bearing_rate"] self._kf.predict(dt=dt) - self._kf.update(x_t, y_t) + if both_fresh: + r0 = valid[0][0] + r1 = valid[1][0] + try: + x_t, y_t = triangulate_2anchor( + r0=r0, r1=r1, sep=sep, + anchor_z=self._p["anchor_height"], + tag_z=self._p["tag_height"], + ) + except (ValueError, ZeroDivisionError) as exc: + self.get_logger().warn(f"Triangulation error: {exc}") + return + self._kf.update(x_t, y_t) kx, ky = self._kf.position() - - # Publish PoseStamped in base_link + bearing = bearing_from_pos(kx, ky) + range_m = math.sqrt(kx * kx + ky * ky) + hdr = Header() + hdr.stamp = self.get_clock().now().to_msg() + hdr.frame_id = "base_link" + brg_msg = UwbBearing() + brg_msg.header = hdr + brg_msg.bearing_rad = float(bearing) + brg_msg.range_m = float(range_m) + brg_msg.confidence = float(confidence) + brg_msg.tag_id = active_tag + self._bearing_pub.publish(brg_msg) pose = PoseStamped() pose.header = hdr pose.pose.position.x = kx pose.pose.position.y = ky pose.pose.position.z = 0.0 - # Orientation: face the person (yaw = atan2(y, x)) - yaw = math.atan2(ky, kx) + yaw = bearing pose.pose.orientation.z = math.sin(yaw / 2.0) pose.pose.orientation.w = math.cos(yaw / 2.0) - self._target_pub.publish(pose) -# ── Entry point ─────────────────────────────────────────────────────────────── - def main(args=None): rclpy.init(args=args) node = UwbDriverNode() diff --git a/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py b/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py index 7ca83fe..c0e6bc2 100644 --- a/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py +++ b/jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py @@ -7,7 +7,7 @@ No ROS2 / serial / GPU dependencies — runs with plain pytest. import math import pytest -from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D +from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D, bearing_from_pos # ── triangulate_2anchor ─────────────────────────────────────────────────────── @@ -172,3 +172,47 @@ class TestKalmanFilter2D: x, y = kf.position() assert not math.isnan(x) assert not math.isnan(y) + + +# ── bearing_from_pos ────────────────────────────────────────────────────────── + +class TestBearingFromPos: + + def test_directly_ahead_zero_bearing(self): + """Person directly ahead: x=2, y=0 → bearing ≈ 0.""" + b = bearing_from_pos(2.0, 0.0) + assert abs(b) < 0.001 + + def test_left_gives_positive_bearing(self): + """Person to the left (y>0): bearing should be positive.""" + b = bearing_from_pos(1.0, 1.0) + assert b > 0.0 + assert abs(b - math.pi / 4.0) < 0.001 + + def test_right_gives_negative_bearing(self): + """Person to the right (y<0): bearing should be negative.""" + b = bearing_from_pos(1.0, -1.0) + assert b < 0.0 + assert abs(b + math.pi / 4.0) < 0.001 + + def test_directly_left_ninety_degrees(self): + """Person directly to the left: x=0, y=1 → bearing = π/2.""" + b = bearing_from_pos(0.0, 1.0) + assert abs(b - math.pi / 2.0) < 0.001 + + def test_directly_right_minus_ninety_degrees(self): + """Person directly to the right: x=0, y=-1 → bearing = -π/2.""" + b = bearing_from_pos(0.0, -1.0) + assert abs(b + math.pi / 2.0) < 0.001 + + def test_range_pi_to_minus_pi(self): + """Bearing is always in -π..+π.""" + for x in [-2.0, -0.1, 0.1, 2.0]: + for y in [-2.0, -0.1, 0.1, 2.0]: + b = bearing_from_pos(x, y) + assert -math.pi <= b <= math.pi + + def test_no_nan_for_tiny_distance(self): + """Very close target should not produce NaN.""" + b = bearing_from_pos(0.001, 0.001) + assert not math.isnan(b) diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt index 0b67b39..6aa3e53 100644 --- a/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt +++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt @@ -8,6 +8,7 @@ find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} "msg/UwbRange.msg" "msg/UwbRangeArray.msg" + "msg/UwbBearing.msg" DEPENDENCIES std_msgs ) diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbBearing.msg b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbBearing.msg new file mode 100644 index 0000000..685f1d5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbBearing.msg @@ -0,0 +1,18 @@ +# UwbBearing.msg — 10 Hz Kalman-fused bearing estimate from dual-anchor UWB. +# +# bearing_rad : horizontal bearing to tag in base_link frame (radians) +# positive = tag to the left (CCW), negative = right (CW) +# range: -π .. +π +# range_m : Kalman-filtered horizontal range to tag (metres) +# confidence : quality indicator +# 1.0 = both anchors fresh +# 0.5 = single anchor only (bearing less reliable) +# 0.0 = stale / no data (message not published in this state) +# tag_id : enrolled tag identifier that produced this estimate + +std_msgs/Header header + +float32 bearing_rad +float32 range_m +float32 confidence +string tag_id diff --git a/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg index d11c8d9..c0c2dfd 100644 --- a/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg +++ b/jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg @@ -4,6 +4,7 @@ # range_m : measured horizontal range in metres (after height correction) # raw_mm : raw TWR range from AT+RANGE? response, millimetres # rssi : received signal strength (dBm), 0 if not reported by module +# tag_id : enrolled tag identifier; empty string if tag pairing is disabled std_msgs/Header header @@ -11,3 +12,4 @@ uint8 anchor_id float32 range_m uint32 raw_mm float32 rssi +string tag_id -- 2.47.2