feat: UWB anchor auto-calibration via inter-anchor ranging + MDS (Issue #602) #608

Merged
sl-jetson merged 3 commits from sl-uwb/issue-602-anchor-calibration into main 2026-03-14 15:54:57 -04:00
13 changed files with 729 additions and 0 deletions
Showing only changes of commit 587ca8a98e - Show all commits

View File

@ -0,0 +1,17 @@
uwb_anchor_calibration:
ros__parameters:
# Serial ports for each anchor, indexed by anchor ID
# udev rules create stable symlinks in /dev/uwb-anchor<N>
anchor_serials:
- /dev/uwb-anchor0
- /dev/uwb-anchor1
- /dev/uwb-anchor2
- /dev/uwb-anchor3
baud_rate: 115200
# Seconds to wait for a +PEER_RANGE response per attempt
peer_range_timeout_s: 2.0
# Range measurements to average per anchor pair (higher = more accurate)
default_n_samples: 5

View File

@ -0,0 +1,32 @@
"""
uwb_calibration.launch.py Launch the UWB anchor auto-calibration service.
Usage:
ros2 launch saltybot_uwb_calibration uwb_calibration.launch.py
Then trigger from command line:
ros2 service call /saltybot/uwb/calibrate_anchors \\
saltybot_uwb_calibration_msgs/srv/CalibrateAnchors \\
"{anchor_ids: [], n_samples: 5}"
"""
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description() -> LaunchDescription:
pkg_dir = get_package_share_directory("saltybot_uwb_calibration")
params_file = os.path.join(pkg_dir, "config", "calibration_params.yaml")
calibration_node = Node(
package="saltybot_uwb_calibration",
executable="uwb_calibration",
name="uwb_anchor_calibration",
parameters=[params_file],
output="screen",
emulate_tty=True,
)
return LaunchDescription([calibration_node])

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_calibration</name>
<version>0.1.0</version>
<description>
UWB anchor auto-calibration for SaltyBot (Issue #602).
Commands anchors to range against each other via AT+PEER_RANGE=,
builds a pairwise distance matrix, and uses classical MDS to recover
2-D anchor positions. Exposes /saltybot/uwb/calibrate_anchors service.
</description>
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>saltybot_uwb_calibration_msgs</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-serial</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,272 @@
"""
calibration_node.py UWB anchor auto-calibration ROS2 service (Issue #602)
Service: /saltybot/uwb/calibrate_anchors (CalibrateAnchors)
Workflow
1. For every ordered pair (i, j) with i < j, send AT+PEER_RANGE=<j> over
anchor i's serial port, collect `n_samples` measurements, average them.
2. Build the symmetric N×N distance matrix D.
3. Run classical MDS (mds_math.mds_2d) to recover (x, y) positions.
4. Align the frame: anchor-0 at origin, anchor-1 on +X axis.
5. Return positions + residual RMS + JSON config string.
Parameters (ROS2, see config/calibration_params.yaml)
anchor_serials list of serial port paths, index = anchor ID
e.g. ["/dev/uwb-anchor0", "/dev/uwb-anchor1", ...]
baud_rate 115200
peer_range_timeout_s 2.0 (per AT+PEER_RANGE= attempt)
default_n_samples 5
"""
from __future__ import annotations
import json
import re
import time
from typing import Optional
import numpy as np
import serial
import rclpy
from rclpy.node import Node
from saltybot_uwb_calibration_msgs.srv import CalibrateAnchors
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align
_PEER_RANGE_RE = re.compile(
r"\+PEER_RANGE:(\d+),(\d+),(\d+),([-\d.]+)"
)
_PEER_ERR_RE = re.compile(
r"\+PEER_RANGE:ERR,(\d+),(\w+)"
)
class CalibrationNode(Node):
def __init__(self) -> None:
super().__init__("uwb_anchor_calibration")
self.declare_parameter("anchor_serials", [
"/dev/uwb-anchor0",
"/dev/uwb-anchor1",
"/dev/uwb-anchor2",
"/dev/uwb-anchor3",
])
self.declare_parameter("baud_rate", 115200)
self.declare_parameter("peer_range_timeout_s", 2.0)
self.declare_parameter("default_n_samples", 5)
self._serials: list[str] = (
self.get_parameter("anchor_serials").value
)
self._baud: int = self.get_parameter("baud_rate").value
self._timeout: float = self.get_parameter("peer_range_timeout_s").value
self._default_n: int = self.get_parameter("default_n_samples").value
self._srv = self.create_service(
CalibrateAnchors,
"/saltybot/uwb/calibrate_anchors",
self._handle_calibrate,
)
self.get_logger().info(
f"UWB calibration service ready — "
f"{len(self._serials)} configured anchors"
)
# ── Service handler ────────────────────────────────────────────────────
def _handle_calibrate(
self,
request: CalibrateAnchors.Request,
response: CalibrateAnchors.Response,
) -> CalibrateAnchors.Response:
# Determine anchor IDs to calibrate
if request.anchor_ids:
ids = list(request.anchor_ids)
else:
ids = list(range(len(self._serials)))
n_anchors = len(ids)
if n_anchors < 4:
response.success = False
response.message = (
f"Need at least 4 anchors, got {n_anchors}"
)
return response
n_samples = int(request.n_samples) if request.n_samples > 0 \
else self._default_n
self.get_logger().info(
f"Starting calibration: anchors={ids}, n_samples={n_samples}"
)
# Open serial ports
ports: dict[int, serial.Serial] = {}
try:
for anchor_id in ids:
if anchor_id >= len(self._serials):
raise RuntimeError(
f"No serial port configured for anchor {anchor_id}"
)
port_path = self._serials[anchor_id]
ports[anchor_id] = serial.Serial(
port_path,
baudrate=self._baud,
timeout=self._timeout,
)
# Flush any stale data
ports[anchor_id].reset_input_buffer()
time.sleep(0.05)
self.get_logger().info(
f" Opened anchor {anchor_id}{port_path}"
)
except Exception as exc:
_close_all(ports)
response.success = False
response.message = f"Serial open failed: {exc}"
return response
# Measure all unique pairs
D = np.zeros((n_anchors, n_anchors), dtype=float)
id_to_idx = {aid: idx for idx, aid in enumerate(ids)}
try:
for i_idx, i_id in enumerate(ids):
for j_idx, j_id in enumerate(ids):
if j_idx <= i_idx:
continue
dist_m = self._measure_pair(
ports[i_id], i_id, j_id, n_samples
)
if dist_m is None:
raise RuntimeError(
f"No valid range between anchor {i_id} "
f"and anchor {j_id}"
)
D[i_idx, j_idx] = dist_m
D[j_idx, i_idx] = dist_m
self.get_logger().info(
f" [{i_id}{j_id}] = {dist_m:.3f} m"
)
except Exception as exc:
_close_all(ports)
response.success = False
response.message = str(exc)
return response
_close_all(ports)
# MDS
try:
positions, residual = mds_2d(D)
positions = anchor_frame_align(positions)
except Exception as exc:
response.success = False
response.message = f"MDS failed: {exc}"
return response
# Build JSON config
pos_dict = {}
for idx, aid in enumerate(ids):
pos_dict[str(aid)] = [
round(float(positions[idx, 0]), 4),
round(float(positions[idx, 1]), 4),
0.0,
]
json_str = json.dumps(pos_dict, indent=2)
self.get_logger().info(
f"Calibration complete — residual RMS {residual:.4f} m\n{json_str}"
)
# Fill response
response.success = True
response.message = (
f"OK — residual RMS {residual*1000:.1f} mm"
)
response.anchor_ids = [int(a) for a in ids]
response.positions_x = [float(positions[i, 0]) for i in range(n_anchors)]
response.positions_y = [float(positions[i, 1]) for i in range(n_anchors)]
response.positions_z = [0.0] * n_anchors
response.residual_rms_m = float(residual)
response.anchor_positions_json = json_str
return response
# ── Per-pair measurement ───────────────────────────────────────────────
def _measure_pair(
self,
port: serial.Serial,
my_id: int,
peer_id: int,
n_samples: int,
) -> Optional[float]:
"""
Send AT+PEER_RANGE=<peer_id> `n_samples` times over `port`.
Returns the mean range in metres, or None if all attempts fail.
"""
measurements: list[float] = []
for _ in range(n_samples):
cmd = f"AT+PEER_RANGE={peer_id}\r\n"
port.write(cmd.encode())
port.flush()
deadline = time.monotonic() + self._timeout
while time.monotonic() < deadline:
line = port.readline().decode(errors="replace").strip()
if not line:
continue
m = _PEER_RANGE_RE.match(line)
if m:
# +PEER_RANGE:<my_id>,<peer_id>,<mm>,<rssi>
if int(m.group(1)) == my_id and int(m.group(2)) == peer_id:
mm = float(m.group(3))
measurements.append(mm / 1000.0) # mm → m
break
if _PEER_ERR_RE.match(line):
break # timeout from firmware — try next sample
time.sleep(0.05) # inter-command gap
if not measurements:
return None
return float(np.mean(measurements))
# ── Helpers ────────────────────────────────────────────────────────────────
def _close_all(ports: dict) -> None:
for p in ports.values():
try:
p.close()
except Exception:
pass
def main(args=None) -> None:
rclpy.init(args=args)
node = CalibrationNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,140 @@
"""
mds_math.py Classical Multi-Dimensional Scaling for UWB anchor calibration
(Issue #602)
Given an N×N symmetric distance matrix D (in metres), recover the 2-D
Cartesian positions of N anchors using classical (metric) MDS.
Algorithm
1. Compute the double-centred squared-distance matrix B = -½ H H
where H = I - (1/N) 11 (centering matrix).
2. Eigen-decompose B = V Λ Vᵀ (symmetric).
3. Keep the 2 largest positive eigenvalues (λ₁, λ₂).
4. Positions X = V[:, :2] · diag(λ₁, λ₂).
The recovered configuration is up to a global rigid transformation
(rotation, reflection, translation). The caller is responsible for
anchoring the coordinate frame (e.g. fix anchor-0 at origin, anchor-1
on the positive-X axis).
Usage
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align
D = np.array([[0, d01, d02, d03],
[d01, 0, d12, d13],
...])
positions, residual = mds_2d(D) # (N,2) array, float RMS (m)
positions = anchor_frame_align(positions) # align frame: anchor-0 at origin
"""
from __future__ import annotations
import numpy as np
def mds_2d(D: np.ndarray) -> tuple[np.ndarray, float]:
"""
Recover 2-D anchor positions from pairwise distance matrix D.
Parameters
----------
D : (N, N) symmetric ndarray of float
Pairwise distances in metres. Diagonal must be 0.
N must be >= 3 (4+ recommended for overdetermined system).
Returns
-------
positions : (N, 2) ndarray
Recovered x, y coordinates in metres.
residual_rms : float
RMS of (D_measured - D_recovered) in metres across all unique pairs.
Raises
------
ValueError
If D is not square, not symmetric, or N < 3.
"""
D = np.asarray(D, dtype=float)
n = D.shape[0]
if D.ndim != 2 or D.shape[1] != n:
raise ValueError(f"D must be square, got shape {D.shape}")
if n < 3:
raise ValueError(f"Need at least 3 anchors, got {n}")
if not np.allclose(D, D.T, atol=1e-3):
raise ValueError("Distance matrix D is not symmetric")
# ── Step 1: double-centre D² ──────────────────────────────────────────
D2 = D ** 2
H = np.eye(n) - np.ones((n, n)) / n
B = -0.5 * H @ D2 @ H
# ── Step 2: eigendecomposition ────────────────────────────────────────
eigvals, eigvecs = np.linalg.eigh(B) # eigvals in ascending order
# ── Step 3: keep top-2 positive eigenvalues ───────────────────────────
idx = np.argsort(eigvals)[::-1] # descending
top2_idx = idx[:2]
lam = eigvals[top2_idx]
lam = np.maximum(lam, 0.0) # clamp numerical negatives
# ── Step 4: recover positions ─────────────────────────────────────────
V = eigvecs[:, top2_idx]
positions = V * np.sqrt(lam) # (N, 2)
# ── Residual RMS ──────────────────────────────────────────────────────
residual_rms = _residual(D, positions)
return positions, residual_rms
def anchor_frame_align(positions: np.ndarray) -> np.ndarray:
"""
Align recovered positions so that:
- anchor-0 is at the origin (0, 0)
- anchor-1 lies on the positive-X axis (y = 0)
This removes the MDS ambiguity (translation + rotation + reflection).
Parameters
----------
positions : (N, 2) ndarray
Returns
-------
(N, 2) ndarray aligned positions
"""
positions = np.array(positions, dtype=float)
if positions.shape[0] < 2:
return positions
# Translate so anchor-0 is at origin
positions -= positions[0]
# Rotate so anchor-1 is on positive-X axis
dx, dy = positions[1, 0], positions[1, 1]
angle = np.arctan2(dy, dx)
cos_a, sin_a = np.cos(-angle), np.sin(-angle)
R = np.array([[cos_a, -sin_a],
[sin_a, cos_a]])
positions = (R @ positions.T).T
# Ensure anchor-1 has positive X (handle reflection)
if positions[1, 0] < 0:
positions[:, 0] *= -1.0
return positions
def _residual(D_meas: np.ndarray, positions: np.ndarray) -> float:
"""Compute RMS of (measured - recovered) distances over all unique pairs."""
n = D_meas.shape[0]
errors = []
for i in range(n):
for j in range(i + 1, n):
d_rec = float(np.linalg.norm(positions[i] - positions[j]))
errors.append(D_meas[i, j] - d_rec)
if not errors:
return 0.0
return float(np.sqrt(np.mean(np.array(errors) ** 2)))

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_uwb_calibration
[install]
install_scripts=$base/lib/saltybot_uwb_calibration

View File

@ -0,0 +1,29 @@
import os
from glob import glob
from setuptools import setup
package_name = "saltybot_uwb_calibration"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-uwb",
maintainer_email="sl-uwb@saltylab.local",
description="UWB anchor auto-calibration via MDS (Issue #602)",
license="Apache-2.0",
entry_points={
"console_scripts": [
"uwb_calibration = saltybot_uwb_calibration.calibration_node:main",
],
},
)

View File

@ -0,0 +1,142 @@
"""
Unit tests for saltybot_uwb_calibration.mds_math (Issue #602).
No ROS2 or hardware required pure numpy.
"""
import math
import sys
import os
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align, _residual
def _dist_matrix(positions: np.ndarray) -> np.ndarray:
"""Build exact pairwise distance matrix from ground-truth positions."""
n = len(positions)
D = np.zeros((n, n))
for i in range(n):
for j in range(n):
D[i, j] = np.linalg.norm(positions[i] - positions[j])
return D
# ── Square configuration (4 anchors) ──────────────────────────────────────
GT_SQUARE = np.array([
[0.0, 0.0],
[4.0, 0.0],
[4.0, 3.0],
[0.0, 3.0],
])
class TestMdsSquare:
def test_residual_near_zero(self):
D = _dist_matrix(GT_SQUARE)
positions, rms = mds_2d(D)
assert rms < 1e-6, f"Residual too large: {rms}"
def test_correct_distances_preserved(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
D_rec = _dist_matrix(positions)
# All pairwise distances should match to < 1 mm
assert np.max(np.abs(D - D_rec)) < 1e-3
def test_anchor_0_at_origin(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
assert abs(positions[0, 0]) < 1e-9
assert abs(positions[0, 1]) < 1e-9
def test_anchor_1_on_positive_x(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
assert positions[1, 0] > 0
assert abs(positions[1, 1]) < 1e-9
# ── Non-uniform configuration (5 anchors) ─────────────────────────────────
GT_5 = np.array([
[0.0, 0.0],
[5.0, 0.0],
[5.0, 4.0],
[2.5, 6.0],
[0.0, 4.0],
])
class TestMds5Anchors:
def test_residual_near_zero(self):
D = _dist_matrix(GT_5)
positions, rms = mds_2d(D)
assert rms < 1e-6
def test_shape(self):
D = _dist_matrix(GT_5)
positions, _ = mds_2d(D)
assert positions.shape == (5, 2)
# ── Noisy distances (simulate UWB measurement noise) ──────────────────────
class TestMdsNoisyInput:
def test_residual_acceptable_with_5cm_noise(self):
rng = np.random.default_rng(42)
D = _dist_matrix(GT_SQUARE)
noise = rng.normal(0, 0.05, D.shape)
noise = (noise + noise.T) / 2 # keep symmetric
np.fill_diagonal(noise, 0)
D_noisy = np.maximum(D + noise, 0)
positions, rms = mds_2d(D_noisy)
# With 5 cm noise the residual should be sub-10 cm
assert rms < 0.10, f"Residual {rms:.4f} m too large"
# ── Error handling ─────────────────────────────────────────────────────────
class TestMdsErrors:
def test_non_square_raises(self):
with pytest.raises(ValueError):
mds_2d(np.ones((3, 4)))
def test_too_few_anchors_raises(self):
with pytest.raises(ValueError):
mds_2d(np.array([[0.0, 1.0], [1.0, 0.0]]))
def test_asymmetric_raises(self):
D = _dist_matrix(GT_SQUARE)
D[0, 1] += 0.5 # break symmetry
with pytest.raises(ValueError):
mds_2d(D)
# ── anchor_frame_align ─────────────────────────────────────────────────────
class TestFrameAlign:
def test_translated_positions_aligned(self):
pts = GT_SQUARE.copy() + np.array([10.0, -5.0])
aligned = anchor_frame_align(pts)
assert abs(aligned[0, 0]) < 1e-9
assert abs(aligned[0, 1]) < 1e-9
def test_rotated_positions_aligned(self):
angle = math.pi / 3
R = np.array([[math.cos(angle), -math.sin(angle)],
[math.sin(angle), math.cos(angle)]])
pts = (R @ GT_SQUARE.T).T
aligned = anchor_frame_align(pts)
assert aligned[1, 0] > 0
assert abs(aligned[1, 1]) < 1e-9
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -0,0 +1,12 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_uwb_calibration_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/CalibrateAnchors.srv"
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_calibration_msgs</name>
<version>0.1.0</version>
<description>
Service definitions for UWB anchor auto-calibration (Issue #602).
Provides CalibrateAnchors.srv used by saltybot_uwb_calibration node.
</description>
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,29 @@
# CalibrateAnchors.srv — UWB anchor auto-calibration service (Issue #602)
#
# Request: list of anchor IDs to calibrate (minimum 4).
# If anchor_ids is empty, the node uses all discovered anchors.
#
# Each anchor is commanded via AT+PEER_RANGE=<peer_id> to range against every
# other anchor. The resulting N×N distance matrix is passed through classical
# MDS to recover 2-D positions. Results are written to anchor_positions_json.
#
# Response: success flag, per-anchor [x, y, z] positions, residual RMS error,
# and JSON string suitable for dropping into fusion_params.yaml.
# ── Request ────────────────────────────────────────────────────────────────
uint8[] anchor_ids # IDs of anchors to calibrate (empty = auto-discover)
uint8 n_samples # range measurements to average per pair (default 5)
---
# ── Response ───────────────────────────────────────────────────────────────
bool success
string message # human-readable status / error description
uint8[] anchor_ids # same order as positions below
float64[] positions_x # metres, MDS-recovered X (2-D plane)
float64[] positions_y # metres, MDS-recovered Y
float64[] positions_z # metres, always 0.0 (flat-floor assumption)
float64 residual_rms_m # RMS of |D_measured - D_recovered| across all pairs
string anchor_positions_json # JSON: {"0": [x,y,z], "1": [x,y,z], ...}