feat: UWB tag firmware (Issue #545) #568

Merged
sl-jetson merged 1 commits from sl-perception/issue-546-uwb-ros2 into main 2026-03-14 11:49:44 -04:00
9 changed files with 907 additions and 0 deletions

View File

@ -0,0 +1,58 @@
# uwb_position_params.yaml — UWB position node configuration (Issue #546)
#
# ROS2 Python node: saltybot_uwb_position
# Serial: ESP32 UWB tag → Jetson via USB-CDC (JSON per line)
#
# Usage:
# ros2 launch saltybot_uwb_position uwb_position.launch.py
#
# JSON protocol (ESP32 → Jetson):
# Full frame (preferred):
# {"ts": 123456, "ranges": [{"id": 0, "d_mm": 1500, "rssi": -65.0}, ...]}
# Per-anchor:
# {"id": 0, "d_mm": 1500, "rssi": -65.0}
# Both "d_mm" and "range_mm" accepted for the range field.
uwb_position:
ros__parameters:
# ── Serial (USB-CDC from ESP32 DW3000 tag) ──────────────────────────────
serial_port: /dev/ttyUSB0 # udev rule: /dev/ttyUWB_TAG recommended
baudrate: 115200
# ── Anchor definitions ──────────────────────────────────────────────────
# anchor_ids: integer list of anchor IDs
# anchor_positions: flat float list, 3 values per anchor [x, y, z] in
# the map frame (metres). Length must be 3 × len(anchor_ids).
#
# Example: 4-anchor rectangular room layout, anchors at 2 m height
# Corner NW (0,0), NE (5,0), SE (5,5), SW (0,5)
anchor_ids: [0, 1, 2, 3]
anchor_positions: [
0.0, 0.0, 2.0, # anchor 0 — NW corner
5.0, 0.0, 2.0, # anchor 1 — NE corner
5.0, 5.0, 2.0, # anchor 2 — SE corner
0.0, 5.0, 2.0, # anchor 3 — SW corner
]
# ── Trilateration mode ──────────────────────────────────────────────────
solve_z: false # false = 2D (robot_z fixed), true = full 3D (needs 4+ anchors)
robot_z: 0.0 # m — robot floor height in map frame (used when solve_z=false)
# ── Validity & timing ───────────────────────────────────────────────────
publish_rate: 10.0 # Hz — /saltybot/uwb/pose + /saltybot/uwb/status
range_timeout_s: 1.5 # s — discard anchor range after this age
min_range_m: 0.05 # m — minimum valid range
max_range_m: 30.0 # m — maximum valid range (DW3000: up to 120 m, use conservative)
# ── Outlier rejection ───────────────────────────────────────────────────
outlier_threshold_m: 0.40 # m — residual above this → outlier
outlier_strikes_max: 5 # consecutive outlier frames before anchor is flagged
# ── Kalman filter ───────────────────────────────────────────────────────
kf_process_noise: 0.05 # Q — lower = smoother but slower to respond
kf_meas_noise: 0.10 # R — higher = trust KF prediction more than measurement
# ── TF2 frames ──────────────────────────────────────────────────────────
map_frame: map # parent frame
uwb_frame: uwb_link # child frame (robot UWB tag position)

View File

@ -0,0 +1,35 @@
"""Launch file for saltybot_uwb_position (Issue #546)."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
config_path = os.path.join(
get_package_share_directory("saltybot_uwb_position"),
"config",
"uwb_position_params.yaml",
)
port_arg = DeclareLaunchArgument(
"serial_port",
default_value="/dev/ttyUSB0",
description="USB serial port for ESP32 UWB tag (e.g. /dev/ttyUWB_TAG)",
)
uwb_node = Node(
package="saltybot_uwb_position",
executable="uwb_position",
name="uwb_position",
parameters=[
config_path,
{"serial_port": LaunchConfiguration("serial_port")},
],
output="screen",
)
return LaunchDescription([port_arg, uwb_node])

View File

@ -0,0 +1,36 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_position</name>
<version>0.1.0</version>
<description>
ROS2 UWB position node for Jetson (Issue #546).
Reads JSON range data from an ESP32 DW3000 UWB tag over USB serial,
trilaterates from 3+ fixed infrastructure anchors, publishes
PoseStamped on /saltybot/uwb/pose, per-anchor UwbRange on
/saltybot/uwb/range/&lt;id&gt;, JSON diagnostics on /saltybot/uwb/status,
and broadcasts the uwb_link→map TF2 transform.
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>tf2_ros</depend>
<depend>saltybot_uwb_msgs</depend>
<!-- numpy is a system dep on Jetson (python3-numpy) -->
<exec_depend>python3-numpy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,254 @@
"""
trilateration.py Pure-math helpers for UWB trilateration (Issue #546).
No ROS2 dependencies importable in unit tests without a ROS2 install.
Algorithm: linear least-squares from N 3 anchors
Given anchors A_i = (x_i, y_i, z_i) with measured ranges r_i, the robot
position p = (x, y, z) satisfies:
(x - x_i)² + (y - y_i)² + (z - z_i)² = r_i²
Subtract the equation for anchor 0 from each subsequent equation to
linearise:
2(x_i - x_0)x + 2(y_i - y_0)y + 2(z_i - z_0)z
= r_0² - r_i² + a_i² - a_0²
This yields A·p = b where A is (N-1)×3 and b is (N-1)×1.
Solve with numpy least-squares (lstsq).
2D mode (solve_z=False):
z is fixed (robot_z parameter, default 0).
Reduce to 2 unknowns (x, y): subtract z terms from b, drop the z column.
Needs N 3 anchors.
Outlier rejection:
Compute residual for each anchor: |r_meas - p - a_i|.
Reject anchors with residual > threshold. Repeat if enough remain.
"""
from __future__ import annotations
import math
from typing import List, Optional, Tuple
import numpy as np
AnchorPos = Tuple[float, float, float] # (x, y, z) in map frame (metres)
RangeM = float # measured range (metres)
# ── Core trilateration ────────────────────────────────────────────────────────
def trilaterate(
anchors: List[AnchorPos],
ranges: List[RangeM],
fixed_z: Optional[float] = None,
) -> Tuple[float, float, float]:
"""Least-squares trilateration from N ≥ 3 anchor ranges.
Parameters
----------
anchors : list of (x, y, z) anchor positions in the map frame (metres)
ranges : measured distances from robot to each anchor (metres)
fixed_z : if not None, treat robot z as this value and solve 2D only
Returns
-------
(x, y, z) : estimated robot position in the map frame (metres).
When fixed_z is given, z = fixed_z.
Raises
------
ValueError : fewer than 3 anchors provided
RuntimeError: linear system is rank-deficient (e.g., collinear anchors)
"""
n = len(anchors)
if n < 3:
raise ValueError(f"Need ≥ 3 anchors for trilateration, got {n}")
if len(ranges) != n:
raise ValueError("len(anchors) != len(ranges)")
a = np.array(anchors, dtype=np.float64) # (N, 3)
r = np.array(ranges, dtype=np.float64) # (N,)
# Reference anchor (index 0)
a0 = a[0]
r0 = r[0]
norm_a0_sq = float(np.dot(a0, a0))
if fixed_z is not None:
# 2D mode: solve for (x, y), z is known
z = fixed_z
A_rows = []
b_rows = []
for i in range(1, n):
ai = a[i]
ri = r[i]
norm_ai_sq = float(np.dot(ai, ai))
# rhs adjusted for known z
rhs = (r0**2 - ri**2 + norm_ai_sq - norm_a0_sq
+ 2.0 * (ai[2] - a0[2]) * z)
A_rows.append([2.0 * (ai[0] - a0[0]),
2.0 * (ai[1] - a0[1])])
b_rows.append(rhs)
A_mat = np.array(A_rows, dtype=np.float64)
b_vec = np.array(b_rows, dtype=np.float64)
result, _, rank, _ = np.linalg.lstsq(A_mat, b_vec, rcond=None)
if rank < 2:
raise RuntimeError("Rank-deficient 2D trilateration system — collinear anchors?")
return float(result[0]), float(result[1]), z
else:
# 3D mode: solve for (x, y, z) — needs N ≥ 4 for well-conditioned solve
A_rows = []
b_rows = []
for i in range(1, n):
ai = a[i]
ri = r[i]
norm_ai_sq = float(np.dot(ai, ai))
rhs = r0**2 - ri**2 + norm_ai_sq - norm_a0_sq
A_rows.append([2.0 * (ai[0] - a0[0]),
2.0 * (ai[1] - a0[1]),
2.0 * (ai[2] - a0[2])])
b_rows.append(rhs)
A_mat = np.array(A_rows, dtype=np.float64)
b_vec = np.array(b_rows, dtype=np.float64)
result, _, rank, _ = np.linalg.lstsq(A_mat, b_vec, rcond=None)
if rank < 3:
raise RuntimeError("Rank-deficient 3D trilateration system — coplanar anchors?")
return float(result[0]), float(result[1]), float(result[2])
# ── Outlier rejection ─────────────────────────────────────────────────────────
def reject_outliers(
anchors: List[AnchorPos],
ranges: List[RangeM],
position: Tuple[float, float, float],
threshold_m: float = 0.4,
) -> List[int]:
"""Return indices of inlier anchors (residual ≤ threshold_m).
Parameters
----------
anchors : anchor positions in map frame
ranges : measured ranges (metres)
position : current position estimate (x, y, z)
threshold_m : max allowable range residual to be an inlier
Returns
-------
inlier_indices : sorted list of indices whose residual is within threshold
"""
px, py, pz = position
inliers = []
for i, (anchor, r_meas) in enumerate(zip(anchors, ranges)):
ax, ay, az = anchor
r_pred = math.sqrt((px - ax)**2 + (py - ay)**2 + (pz - az)**2)
residual = abs(r_meas - r_pred)
if residual <= threshold_m:
inliers.append(i)
return inliers
def residuals(
anchors: List[AnchorPos],
ranges: List[RangeM],
position: Tuple[float, float, float],
) -> List[float]:
"""Compute per-anchor range residual (metres) for diagnostics."""
px, py, pz = position
res = []
for anchor, r_meas in zip(anchors, ranges):
ax, ay, az = anchor
r_pred = math.sqrt((px - ax)**2 + (py - ay)**2 + (pz - az)**2)
res.append(abs(r_meas - r_pred))
return res
# ── 3D Kalman filter (constant-velocity, position-only observations) ──────────
class KalmanFilter3D:
"""Constant-velocity Kalman filter for 3D UWB position.
State: [x, y, z, vx, vy, vz]
Observation: [x, y, z] (position only)
"""
def __init__(
self,
process_noise: float = 0.1,
measurement_noise: float = 0.15,
dt: float = 0.1,
) -> None:
self._q = process_noise
self._r = measurement_noise
self._dt = dt
self._x = np.zeros(6)
self._P = np.diag([1.0, 1.0, 1.0, 0.5, 0.5, 0.5])
self._init = False
def _build_F(self, dt: float) -> np.ndarray:
F = np.eye(6)
F[0, 3] = dt
F[1, 4] = dt
F[2, 5] = dt
return F
def _build_Q(self, dt: float) -> np.ndarray:
q = self._q
dt2 = dt * dt
dt3 = dt2 * dt
dt4 = dt3 * dt
Q = np.zeros((6, 6))
# Position-velocity cross terms (constant white-noise model)
for i in range(3):
Q[i, i ] = q * dt4 / 4.0
Q[i, i+3] = q * dt3 / 2.0
Q[i+3, i ] = q * dt3 / 2.0
Q[i+3, i+3] = q * dt2
return Q
def predict(self, dt: float | None = None) -> None:
if dt is not None:
self._dt = dt
F = self._build_F(self._dt)
Q = self._build_Q(self._dt)
self._x = F @ self._x
self._P = F @ self._P @ F.T + Q
def update(self, x_m: float, y_m: float, z_m: float) -> None:
if not self._init:
self._x[:3] = [x_m, y_m, z_m]
self._init = True
return
H = np.zeros((3, 6))
H[0, 0] = 1.0
H[1, 1] = 1.0
H[2, 2] = 1.0
R = np.eye(3) * self._r
z_vec = np.array([x_m, y_m, z_m])
innov = z_vec - H @ self._x
S = H @ self._P @ H.T + R
K = self._P @ H.T @ np.linalg.inv(S)
self._x = self._x + K @ innov
self._P = (np.eye(6) - K @ H) @ self._P
def position(self) -> Tuple[float, float, float]:
return float(self._x[0]), float(self._x[1]), float(self._x[2])
def velocity(self) -> Tuple[float, float, float]:
return float(self._x[3]), float(self._x[4]), float(self._x[5])
def reset(self) -> None:
self._x = np.zeros(6)
self._P = np.diag([1.0, 1.0, 1.0, 0.5, 0.5, 0.5])
self._init = False

View File

@ -0,0 +1,487 @@
#!/usr/bin/env python3
"""
uwb_position_node.py ROS2 UWB position node for Jetson (Issue #546).
Reads JSON range measurements from an ESP32 UWB tag (DW3000) over USB
serial, trilaterates position from 3+ fixed infrastructure anchors, and
publishes the robot's absolute position in the map frame.
Serial protocol (one JSON object per line, UTF-8):
Tag Jetson (full frame, all anchors at once):
{"ts": 123456, "ranges": [
{"id": 0, "d_mm": 1500, "rssi": -65.0},
{"id": 1, "d_mm": 2100, "rssi": -68.0},
{"id": 2, "d_mm": 1800, "rssi": -70.0}
]}
Alternate (per-anchor, one line per measurement):
{"id": 0, "d_mm": 1500, "rssi": -65.0}
Both "d_mm" and "range_mm" are accepted.
Anchor positions:
Fixed anchors are configured in uwb_position_params.yaml as flat arrays:
anchor_ids: [0, 1, 2, 3]
anchor_positions: [x0, y0, z0, x1, y1, z1, ...] # metres in map frame
Publishes:
/saltybot/uwb/pose (geometry_msgs/PoseStamped) 10 Hz
/saltybot/uwb/range/<id> (saltybot_uwb_msgs/UwbRange) per anchor, on arrival
/saltybot/uwb/status (std_msgs/String) 10 Hz JSON diagnostics
TF2:
Broadcasts uwb_link map from /saltybot/uwb/pose position.
Outlier rejection:
After initial trilateration, anchors with range residual > outlier_threshold_m
are dropped. If 3 inliers remain (2D mode) the position is re-solved.
Anchors rejected in N_strikes_max consecutive frames are flagged in status.
"""
import json
import math
import threading
import time
from typing import Dict, List, Optional, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from geometry_msgs.msg import (
PoseStamped, TransformStamped,
Quaternion,
)
from std_msgs.msg import Header, String
from tf2_ros import TransformBroadcaster
from saltybot_uwb_msgs.msg import UwbRange
from saltybot_uwb_position.trilateration import (
trilaterate,
reject_outliers,
residuals,
KalmanFilter3D,
)
try:
import serial
_SERIAL_OK = True
except ImportError:
_SERIAL_OK = False
# ── Serial reader thread ──────────────────────────────────────────────────────
class SerialJsonReader(threading.Thread):
"""Background thread: reads newline-delimited JSON from a serial port."""
def __init__(self, port: str, baudrate: int, callback, logger) -> None:
super().__init__(daemon=True)
self._port = port
self._baudrate = baudrate
self._callback = callback
self._logger = logger
self._running = False
self._ser = None
def run(self) -> None:
self._running = True
while self._running:
try:
self._ser = serial.Serial(
self._port, self._baudrate, timeout=1.0
)
self._logger.info(f"UWB serial: opened {self._port} @ {self._baudrate}")
self._read_loop()
except Exception as exc:
self._logger.warning(
f"UWB serial error ({self._port}): {exc} — retry in 2 s"
)
if self._ser and self._ser.is_open:
self._ser.close()
time.sleep(2.0)
def _read_loop(self) -> None:
while self._running:
try:
raw = self._ser.readline()
if not raw:
continue
line = raw.decode("utf-8", errors="replace").strip()
if not line:
continue
try:
obj = json.loads(line)
self._callback(obj)
except json.JSONDecodeError:
pass # ignore malformed lines silently
except Exception as exc:
self._logger.warning(f"UWB read error: {exc}")
break
def stop(self) -> None:
self._running = False
if self._ser and self._ser.is_open:
self._ser.close()
# ── Node ──────────────────────────────────────────────────────────────────────
class UwbPositionNode(Node):
"""ROS2 UWB position node — trilateration from 3+ fixed anchors."""
_MIN_ANCHORS_2D = 3
_MIN_ANCHORS_3D = 4
def __init__(self) -> None:
super().__init__("uwb_position")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("serial_port", "/dev/ttyUSB0")
self.declare_parameter("baudrate", 115200)
# Anchor configuration: flat arrays, 3 floats per anchor
self.declare_parameter("anchor_ids", [0, 1, 2])
self.declare_parameter("anchor_positions", [
0.0, 0.0, 2.0, # anchor 0: x, y, z (metres in map frame)
5.0, 0.0, 2.0, # anchor 1
5.0, 5.0, 2.0, # anchor 2
])
self.declare_parameter("solve_z", False) # 2D mode by default
self.declare_parameter("robot_z", 0.0) # m — robot floor height
self.declare_parameter("publish_rate", 10.0) # Hz — pose + status
self.declare_parameter("range_timeout_s", 1.5) # s — stale range threshold
self.declare_parameter("max_range_m", 30.0) # m — validity max
self.declare_parameter("min_range_m", 0.05) # m — validity min
self.declare_parameter("outlier_threshold_m", 0.4) # m — RANSAC threshold
self.declare_parameter("outlier_strikes_max", 5) # frames before flagging
self.declare_parameter("kf_process_noise", 0.05) # Kalman Q
self.declare_parameter("kf_meas_noise", 0.10) # Kalman R
self.declare_parameter("map_frame", "map")
self.declare_parameter("uwb_frame", "uwb_link")
# Load params
self._port = self.get_parameter("serial_port").value
self._baudrate = self.get_parameter("baudrate").value
self._solve_z = self.get_parameter("solve_z").value
self._robot_z = self.get_parameter("robot_z").value
self._publish_rate = self.get_parameter("publish_rate").value
self._timeout = self.get_parameter("range_timeout_s").value
self._max_r = self.get_parameter("max_range_m").value
self._min_r = self.get_parameter("min_range_m").value
self._outlier_thr = self.get_parameter("outlier_threshold_m").value
self._strikes_max = self.get_parameter("outlier_strikes_max").value
self._map_frame = self.get_parameter("map_frame").value
self._uwb_frame = self.get_parameter("uwb_frame").value
# Parse anchor config
anchor_ids_raw = self.get_parameter("anchor_ids").value
anchor_pos_flat = self.get_parameter("anchor_positions").value
self._anchor_ids, self._anchor_positions = self._parse_anchors(
anchor_ids_raw, anchor_pos_flat
)
self.get_logger().info(
f"UWB anchors: {[f'#{i}@({p[0]:.1f},{p[1]:.1f},{p[2]:.1f})' for i,p in zip(self._anchor_ids, self._anchor_positions)]}"
)
# ── State ─────────────────────────────────────────────────────────────
self._lock = threading.Lock()
# anchor_id → (range_m, rssi, timestamp_monotonic)
self._ranges: Dict[int, Tuple[float, float, float]] = {}
self._strikes: Dict[int, int] = {aid: 0 for aid in self._anchor_ids}
self._last_fix: Optional[Tuple[float, float, float]] = None
self._has_fix = False
self._fix_age = 0.0
# Kalman filter
self._kf = KalmanFilter3D(
process_noise=self.get_parameter("kf_process_noise").value,
measurement_noise=self.get_parameter("kf_meas_noise").value,
dt=1.0 / self._publish_rate,
)
# Per-anchor outlier publishers (pre-create for configured anchors)
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
self._range_pubs: Dict[int, object] = {}
for aid in self._anchor_ids:
self._range_pubs[aid] = self.create_publisher(
UwbRange, f"/saltybot/uwb/range/{aid}", sensor_qos
)
# Main publishers
self._pose_pub = self.create_publisher(PoseStamped, "/saltybot/uwb/pose", 10)
self._status_pub = self.create_publisher(String, "/saltybot/uwb/status", 10)
# TF2
self._tf_broadcaster = TransformBroadcaster(self)
# ── Serial reader ──────────────────────────────────────────────────────
if _SERIAL_OK:
self._reader = SerialJsonReader(
self._port, self._baudrate,
callback=self._on_serial_json,
logger=self.get_logger(),
)
self._reader.start()
else:
self.get_logger().warning(
"pyserial not installed — serial I/O disabled (simulation mode)"
)
# ── Publish timer ──────────────────────────────────────────────────────
self._prev_publish_t = self.get_clock().now().nanoseconds * 1e-9
self.create_timer(1.0 / self._publish_rate, self._publish_cb)
self.get_logger().info(
f"UwbPositionNode ready — port={self._port} "
f"anchors={len(self._anchor_ids)} "
f"mode={'3D' if self._solve_z else '2D'} "
f"rate={self._publish_rate:.0f}Hz"
)
# ── Anchor config parsing ─────────────────────────────────────────────────
def _parse_anchors(
self,
ids_raw,
pos_flat,
):
ids = list(ids_raw)
n = len(ids)
if len(pos_flat) < n * 3:
raise ValueError(
f"anchor_positions must have 3×anchor_ids entries. "
f"Got {len(pos_flat)} values for {n} anchors."
)
positions = []
for i in range(n):
base = i * 3
positions.append((
float(pos_flat[base]),
float(pos_flat[base + 1]),
float(pos_flat[base + 2]),
))
return ids, positions
# ── Serial JSON callback (called from reader thread) ──────────────────────
def _on_serial_json(self, obj: dict) -> None:
"""Parse incoming JSON and update range table."""
now = time.monotonic()
if "ranges" in obj:
# Full-frame format: {"ts": ..., "ranges": [{id, d_mm, rssi}, ...]}
for entry in obj["ranges"]:
self._ingest_range_entry(entry, now)
else:
# Per-anchor format: {"id": 0, "d_mm": 1500, "rssi": -65.0}
self._ingest_range_entry(obj, now)
def _ingest_range_entry(self, entry: dict, timestamp: float) -> None:
"""Store a single anchor range measurement after validity checks."""
try:
anchor_id = int(entry.get("id", entry.get("anchor_id", -1)))
# Accept both "d_mm" and "range_mm"
raw_mm = int(entry.get("d_mm", entry.get("range_mm", 0)))
rssi = float(entry.get("rssi", 0.0))
except (KeyError, TypeError, ValueError):
return
if anchor_id not in self._anchor_ids:
return # unknown / unconfigured anchor
range_m = raw_mm / 1000.0
if range_m < self._min_r or range_m > self._max_r:
return # outside validity window
with self._lock:
self._ranges[anchor_id] = (range_m, rssi, timestamp)
# Publish per-anchor range topic immediately
self._publish_anchor_range(anchor_id, range_m, raw_mm, rssi)
# ── Per-anchor range publisher ────────────────────────────────────────────
def _publish_anchor_range(
self, anchor_id: int, range_m: float, raw_mm: int, rssi: float
) -> None:
if anchor_id not in self._range_pubs:
return
msg = UwbRange()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = self._map_frame
msg.anchor_id = anchor_id
msg.range_m = float(range_m)
msg.raw_mm = int(raw_mm)
msg.rssi = float(rssi)
msg.tag_id = ""
self._range_pubs[anchor_id].publish(msg)
# ── Main publish callback (10 Hz) ─────────────────────────────────────────
def _publish_cb(self) -> None:
now_ros = self.get_clock().now()
now_mono = time.monotonic()
# Collect fresh ranges
with self._lock:
fresh = {
aid: entry
for aid, entry in self._ranges.items()
if (now_mono - entry[2]) <= self._timeout
}
# Check minimum anchor count for chosen mode
min_anch = self._MIN_ANCHORS_3D if self._solve_z else self._MIN_ANCHORS_2D
state = "no_fix"
pos = None
used_ids: List[int] = []
res_map: Dict[int, float] = {}
if len(fresh) >= min_anch:
active_ids = list(fresh.keys())
anch_pos = [self._anchor_positions[self._anchor_ids.index(i)]
for i in active_ids]
anch_r = [fresh[i][0] for i in active_ids]
try:
fixed_z = None if self._solve_z else self._robot_z
raw_pos = trilaterate(anch_pos, anch_r, fixed_z=fixed_z)
# Outlier rejection
inlier_idx = reject_outliers(
anch_pos, anch_r, raw_pos,
threshold_m=self._outlier_thr,
)
res_all = residuals(anch_pos, anch_r, raw_pos)
for k, aid in enumerate(active_ids):
res_map[aid] = round(res_all[k], 3)
# Update consecutive strike counters
for k, aid in enumerate(active_ids):
if k in inlier_idx:
self._strikes[aid] = 0
else:
self._strikes[aid] = self._strikes.get(aid, 0) + 1
# Re-solve with inliers if any were rejected
if len(inlier_idx) < len(active_ids) and len(inlier_idx) >= min_anch:
inlier_anch = [anch_pos[k] for k in inlier_idx]
inlier_r = [anch_r[k] for k in inlier_idx]
raw_pos = trilaterate(inlier_anch, inlier_r, fixed_z=fixed_z)
used_ids = [active_ids[k] for k in inlier_idx]
pos = raw_pos
# Kalman predict + update
dt = now_ros.nanoseconds * 1e-9 - self._prev_publish_t
self._kf.predict(dt=dt)
self._kf.update(pos[0], pos[1], pos[2])
pos = self._kf.position()
self._last_fix = pos
self._has_fix = True
self._fix_age = 0.0
state = "ok" if len(used_ids) >= min_anch else "degraded"
except (ValueError, RuntimeError) as exc:
self.get_logger().warning(f"Trilateration failed: {exc}")
state = "degraded"
elif self._has_fix:
# KF predict-only: coast on last known position
dt = now_ros.nanoseconds * 1e-9 - self._prev_publish_t
self._kf.predict(dt=dt)
pos = self._kf.position()
self._fix_age += 1.0 / self._publish_rate
state = "degraded" if self._fix_age < 5.0 else "no_fix"
self._prev_publish_t = now_ros.nanoseconds * 1e-9
if pos is None:
self._publish_status(state, 0, [], {})
return
x, y, z = pos
# ── Publish PoseStamped ────────────────────────────────────────────
pose_msg = PoseStamped()
pose_msg.header.stamp = now_ros.to_msg()
pose_msg.header.frame_id = self._map_frame
pose_msg.pose.position.x = x
pose_msg.pose.position.y = y
pose_msg.pose.position.z = z
pose_msg.pose.orientation.w = 1.0 # no orientation from UWB alone
self._pose_pub.publish(pose_msg)
# ── TF2: uwb_link → map ───────────────────────────────────────────
tf_msg = TransformStamped()
tf_msg.header.stamp = now_ros.to_msg()
tf_msg.header.frame_id = self._map_frame
tf_msg.child_frame_id = self._uwb_frame
tf_msg.transform.translation.x = x
tf_msg.transform.translation.y = y
tf_msg.transform.translation.z = z
tf_msg.transform.rotation.w = 1.0
self._tf_broadcaster.sendTransform(tf_msg)
# ── Diagnostics ───────────────────────────────────────────────────
self._publish_status(state, len(used_ids), used_ids, res_map)
# ── Status publisher ──────────────────────────────────────────────────────
def _publish_status(
self,
state: str,
n_anchors: int,
used_ids: List[int],
res_map: Dict[int, float],
) -> None:
# Flag anchors with too many consecutive outlier strikes
flagged = [
aid for aid, s in self._strikes.items() if s >= self._strikes_max
]
pos = self._kf.position() if self._has_fix else None
status = {
"state": state,
"active_anchors": n_anchors,
"used_anchor_ids": used_ids,
"flagged_anchors": flagged,
"position": {
"x": round(pos[0], 3),
"y": round(pos[1], 3),
"z": round(pos[2], 3),
} if pos else None,
"residuals_m": res_map,
"fix_age_s": round(self._fix_age, 2),
}
self._status_pub.publish(String(data=json.dumps(status)))
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = UwbPositionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_uwb_position
[install]
install_scripts=$base/lib/saltybot_uwb_position

View File

@ -0,0 +1,32 @@
import os
from glob import glob
from setuptools import setup
package_name = "saltybot_uwb_position"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages",
["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
(os.path.join("share", package_name, "launch"),
glob("launch/*.py")),
(os.path.join("share", package_name, "config"),
glob("config/*.yaml")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-perception",
maintainer_email="sl-perception@saltylab.local",
description="ROS2 UWB position node — JSON serial bridge with trilateration (Issue #546)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"uwb_position = saltybot_uwb_position.uwb_position_node:main",
],
},
)