feat: UWB position logger and accuracy analyzer (Issue #634)
saltybot_uwb_logger_msgs (new package):
- AccuracyReport.msg: n_samples, mean/bias/std (x,y,2D), CEP50, CEP95,
RMSE, max_error, per-anchor range stats, test_id, duration_s
- StartAccuracyTest.srv: request (truth_x/y_m, n_samples, timeout_s,
test_id) → response (success, message, test_id)
saltybot_uwb_logger (new package):
- accuracy_stats.py: compute_stats() + RangeAccum — pure numpy, no ROS2
CEP50/CEP95 = 50th/95th percentile of 2-D error; bias, std, RMSE, max
- logger_node.py: /uwb_logger ROS2 node
Subscribes:
/saltybot/pose/fused → fused_pose_<DATE>.csv (ts, x, y, heading)
/saltybot/uwb/pose → uwb_pose_<DATE>.csv (ts, x, y)
/uwb/ranges → uwb_ranges_<DATE>.csv (ts, anchor_id, range_m,
raw_mm, rssi, tag_id)
Service /saltybot/uwb/start_accuracy_test:
Collects N fused-pose samples at known (truth_x, truth_y) in background
thread. On completion or timeout: publishes AccuracyReport on
/saltybot/uwb/accuracy_report + writes accuracy_<test_id>.json.
Per-anchor range stats included. CSV flushed every 5 s.
Tests: 16/16 passing (test/test_accuracy_stats.py, no ROS2/hardware)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
0e8758e9e1
commit
343e53081a
@ -0,0 +1,7 @@
|
|||||||
|
uwb_logger:
|
||||||
|
ros__parameters:
|
||||||
|
log_dir: ~/uwb_logs
|
||||||
|
enable_csv: true
|
||||||
|
default_n_samples: 200
|
||||||
|
default_timeout_s: 30.0
|
||||||
|
flush_interval_s: 5.0
|
||||||
@ -0,0 +1,28 @@
|
|||||||
|
"""uwb_logger.launch.py — Launch UWB position logger (Issue #634)."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description() -> LaunchDescription:
|
||||||
|
pkg_dir = get_package_share_directory("saltybot_uwb_logger")
|
||||||
|
params_file = os.path.join(pkg_dir, "config", "uwb_logger_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("log_dir", default_value="~/uwb_logs"),
|
||||||
|
Node(
|
||||||
|
package="saltybot_uwb_logger",
|
||||||
|
executable="uwb_logger",
|
||||||
|
name="uwb_logger",
|
||||||
|
parameters=[
|
||||||
|
params_file,
|
||||||
|
{"log_dir": LaunchConfiguration("log_dir")},
|
||||||
|
],
|
||||||
|
output="screen",
|
||||||
|
emulate_tty=True,
|
||||||
|
),
|
||||||
|
])
|
||||||
31
jetson/ros2_ws/src/saltybot_uwb_logger/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_uwb_logger/package.xml
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_uwb_logger</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
UWB position logger and accuracy analyzer (Issue #634).
|
||||||
|
Logs fused pose, raw UWB pose, per-anchor ranges to CSV.
|
||||||
|
Provides /saltybot/uwb/start_accuracy_test: collects N samples at a
|
||||||
|
known position and computes CEP50, CEP95, RMSE, max error.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
|
||||||
|
<license>Apache-2.0</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>saltybot_uwb_msgs</depend>
|
||||||
|
<depend>saltybot_uwb_logger_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>python3-numpy</exec_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
Binary file not shown.
Binary file not shown.
@ -0,0 +1,99 @@
|
|||||||
|
"""
|
||||||
|
accuracy_stats.py — UWB positioning accuracy statistics (Issue #634)
|
||||||
|
|
||||||
|
Pure numpy, no ROS2 dependency.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class AccuracyStats:
|
||||||
|
n_samples: int
|
||||||
|
mean_x: float
|
||||||
|
mean_y: float
|
||||||
|
std_x: float
|
||||||
|
std_y: float
|
||||||
|
std_2d: float
|
||||||
|
bias_x: float
|
||||||
|
bias_y: float
|
||||||
|
cep50: float
|
||||||
|
cep95: float
|
||||||
|
max_error: float
|
||||||
|
rmse: float
|
||||||
|
|
||||||
|
|
||||||
|
def compute_stats(
|
||||||
|
xs: np.ndarray,
|
||||||
|
ys: np.ndarray,
|
||||||
|
truth_x: float,
|
||||||
|
truth_y: float,
|
||||||
|
) -> AccuracyStats:
|
||||||
|
"""
|
||||||
|
Compute accuracy statistics from position samples.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
xs, ys : 1-D arrays of measured x / y positions (metres)
|
||||||
|
truth_x/y : known ground-truth position (metres)
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
ValueError if fewer than 2 samples or mismatched lengths.
|
||||||
|
"""
|
||||||
|
xs = np.asarray(xs, dtype=float)
|
||||||
|
ys = np.asarray(ys, dtype=float)
|
||||||
|
n = len(xs)
|
||||||
|
if n < 2:
|
||||||
|
raise ValueError(f"Need at least 2 samples, got {n}")
|
||||||
|
if len(ys) != n:
|
||||||
|
raise ValueError("xs and ys must have the same length")
|
||||||
|
|
||||||
|
errors = np.sqrt((xs - truth_x) ** 2 + (ys - truth_y) ** 2)
|
||||||
|
|
||||||
|
mean_x = float(np.mean(xs))
|
||||||
|
mean_y = float(np.mean(ys))
|
||||||
|
std_x = float(np.std(xs, ddof=1))
|
||||||
|
std_y = float(np.std(ys, ddof=1))
|
||||||
|
|
||||||
|
return AccuracyStats(
|
||||||
|
n_samples = n,
|
||||||
|
mean_x = mean_x,
|
||||||
|
mean_y = mean_y,
|
||||||
|
std_x = std_x,
|
||||||
|
std_y = std_y,
|
||||||
|
std_2d = math.sqrt(std_x ** 2 + std_y ** 2),
|
||||||
|
bias_x = mean_x - truth_x,
|
||||||
|
bias_y = mean_y - truth_y,
|
||||||
|
cep50 = float(np.percentile(errors, 50)),
|
||||||
|
cep95 = float(np.percentile(errors, 95)),
|
||||||
|
max_error = float(np.max(errors)),
|
||||||
|
rmse = float(np.sqrt(np.mean(errors ** 2))),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RangeAccum:
|
||||||
|
"""Running accumulator for per-anchor range statistics."""
|
||||||
|
anchor_id: int
|
||||||
|
_samples: list = field(default_factory=list, repr=False)
|
||||||
|
|
||||||
|
def add(self, range_m: float) -> None:
|
||||||
|
self._samples.append(range_m)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def count(self) -> int:
|
||||||
|
return len(self._samples)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mean(self) -> float:
|
||||||
|
return float(np.mean(self._samples)) if self._samples else 0.0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def std(self) -> float:
|
||||||
|
return float(np.std(self._samples, ddof=1)) if len(self._samples) >= 2 else 0.0
|
||||||
@ -0,0 +1,409 @@
|
|||||||
|
"""
|
||||||
|
logger_node.py — UWB position logger and accuracy analyzer (Issue #634)
|
||||||
|
|
||||||
|
Subscriptions
|
||||||
|
─────────────
|
||||||
|
/saltybot/pose/fused geometry_msgs/PoseStamped — EKF-fused pose (200 Hz)
|
||||||
|
/saltybot/uwb/pose geometry_msgs/PoseStamped — raw UWB pose (10 Hz)
|
||||||
|
/uwb/ranges saltybot_uwb_msgs/UwbRangeArray — per-anchor ranges
|
||||||
|
|
||||||
|
Service
|
||||||
|
───────
|
||||||
|
/saltybot/uwb/start_accuracy_test StartAccuracyTest
|
||||||
|
Collects N fused-pose samples at known position, computes accuracy
|
||||||
|
metrics, publishes AccuracyReport, writes JSON summary.
|
||||||
|
|
||||||
|
Publishes
|
||||||
|
─────────
|
||||||
|
/saltybot/uwb/accuracy_report AccuracyReport
|
||||||
|
|
||||||
|
CSV logging (continuous, to log_dir)
|
||||||
|
─────────────────────────────────────
|
||||||
|
fused_pose_<DATE>.csv — timestamp_ns, x_m, y_m, heading_rad
|
||||||
|
uwb_pose_<DATE>.csv — timestamp_ns, x_m, y_m
|
||||||
|
uwb_ranges_<DATE>.csv — timestamp_ns, anchor_id, range_m, raw_mm, rssi, tag_id
|
||||||
|
accuracy_<test_id>.json — accuracy test results
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
──────────
|
||||||
|
log_dir ~/uwb_logs
|
||||||
|
enable_csv true
|
||||||
|
default_n_samples 200
|
||||||
|
default_timeout_s 30.0
|
||||||
|
flush_interval_s 5.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import csv
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
from geometry_msgs.msg import PoseStamped
|
||||||
|
from saltybot_uwb_msgs.msg import UwbRangeArray
|
||||||
|
from saltybot_uwb_logger_msgs.msg import AccuracyReport
|
||||||
|
from saltybot_uwb_logger_msgs.srv import StartAccuracyTest
|
||||||
|
|
||||||
|
from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class UwbLoggerNode(Node):
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("uwb_logger")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("log_dir", os.path.expanduser("~/uwb_logs"))
|
||||||
|
self.declare_parameter("enable_csv", True)
|
||||||
|
self.declare_parameter("default_n_samples", 200)
|
||||||
|
self.declare_parameter("default_timeout_s", 30.0)
|
||||||
|
self.declare_parameter("flush_interval_s", 5.0)
|
||||||
|
|
||||||
|
self._log_dir = Path(self.get_parameter("log_dir").value)
|
||||||
|
self._enable_csv = self.get_parameter("enable_csv").value
|
||||||
|
self._default_n = self.get_parameter("default_n_samples").value
|
||||||
|
self._default_to = self.get_parameter("default_timeout_s").value
|
||||||
|
self._flush_iv = self.get_parameter("flush_interval_s").value
|
||||||
|
|
||||||
|
if self._enable_csv:
|
||||||
|
self._log_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# ── CSV writers ───────────────────────────────────────────────────
|
||||||
|
date_tag = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||||
|
self._csv_lock = threading.Lock()
|
||||||
|
self._csv_files = []
|
||||||
|
self._fused_csv = self._open_csv(
|
||||||
|
f"fused_pose_{date_tag}.csv",
|
||||||
|
["timestamp_ns", "x_m", "y_m", "heading_rad"])
|
||||||
|
self._uwb_csv = self._open_csv(
|
||||||
|
f"uwb_pose_{date_tag}.csv",
|
||||||
|
["timestamp_ns", "x_m", "y_m"])
|
||||||
|
self._ranges_csv = self._open_csv(
|
||||||
|
f"uwb_ranges_{date_tag}.csv",
|
||||||
|
["timestamp_ns", "anchor_id", "range_m", "raw_mm", "rssi", "tag_id"])
|
||||||
|
|
||||||
|
# ── Accuracy test state ────────────────────────────────────────────
|
||||||
|
self._test_lock = threading.Lock()
|
||||||
|
self._test_active = False
|
||||||
|
self._test_id = ""
|
||||||
|
self._test_truth_x = 0.0
|
||||||
|
self._test_truth_y = 0.0
|
||||||
|
self._test_n_target = 0
|
||||||
|
self._test_xs: list = []
|
||||||
|
self._test_ys: list = []
|
||||||
|
self._test_ranges: dict[int, RangeAccum] = {}
|
||||||
|
self._test_t0 = 0.0
|
||||||
|
self._test_timeout = 0.0
|
||||||
|
self._test_done_event = threading.Event()
|
||||||
|
|
||||||
|
# ── Publisher ─────────────────────────────────────────────────────
|
||||||
|
self._report_pub = self.create_publisher(
|
||||||
|
AccuracyReport, "/saltybot/uwb/accuracy_report", 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscriptions ─────────────────────────────────────────────────
|
||||||
|
self.create_subscription(
|
||||||
|
PoseStamped, "/saltybot/pose/fused",
|
||||||
|
self._fused_cb, _SENSOR_QOS)
|
||||||
|
self.create_subscription(
|
||||||
|
PoseStamped, "/saltybot/uwb/pose",
|
||||||
|
self._uwb_pose_cb, _SENSOR_QOS)
|
||||||
|
self.create_subscription(
|
||||||
|
UwbRangeArray, "/uwb/ranges",
|
||||||
|
self._ranges_cb, _SENSOR_QOS)
|
||||||
|
|
||||||
|
# ── Service ───────────────────────────────────────────────────────
|
||||||
|
self._srv = self.create_service(
|
||||||
|
StartAccuracyTest,
|
||||||
|
"/saltybot/uwb/start_accuracy_test",
|
||||||
|
self._handle_start_test)
|
||||||
|
|
||||||
|
# ── Periodic flush ────────────────────────────────────────────────
|
||||||
|
self.create_timer(self._flush_iv, self._flush_csv)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"UWB logger ready — log_dir={self._log_dir} csv={self._enable_csv}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
self._flush_csv()
|
||||||
|
for fh in self._csv_files:
|
||||||
|
try:
|
||||||
|
fh.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _fused_cb(self, msg: PoseStamped) -> None:
|
||||||
|
ts = _stamp_ns(msg.header.stamp)
|
||||||
|
x = msg.pose.position.x
|
||||||
|
y = msg.pose.position.y
|
||||||
|
q = msg.pose.orientation
|
||||||
|
heading = 2.0 * math.atan2(float(q.z), float(q.w))
|
||||||
|
|
||||||
|
if self._enable_csv:
|
||||||
|
with self._csv_lock:
|
||||||
|
if self._fused_csv:
|
||||||
|
self._fused_csv.writerow([ts, x, y, heading])
|
||||||
|
|
||||||
|
with self._test_lock:
|
||||||
|
if self._test_active:
|
||||||
|
self._test_xs.append(x)
|
||||||
|
self._test_ys.append(y)
|
||||||
|
if len(self._test_xs) >= self._test_n_target:
|
||||||
|
self._test_active = False
|
||||||
|
self._test_done_event.set()
|
||||||
|
|
||||||
|
def _uwb_pose_cb(self, msg: PoseStamped) -> None:
|
||||||
|
ts = _stamp_ns(msg.header.stamp)
|
||||||
|
x = msg.pose.position.x
|
||||||
|
y = msg.pose.position.y
|
||||||
|
if self._enable_csv:
|
||||||
|
with self._csv_lock:
|
||||||
|
if self._uwb_csv:
|
||||||
|
self._uwb_csv.writerow([ts, x, y])
|
||||||
|
|
||||||
|
def _ranges_cb(self, msg: UwbRangeArray) -> None:
|
||||||
|
ts = _stamp_ns(msg.header.stamp)
|
||||||
|
for r in msg.ranges:
|
||||||
|
if self._enable_csv:
|
||||||
|
with self._csv_lock:
|
||||||
|
if self._ranges_csv:
|
||||||
|
self._ranges_csv.writerow([
|
||||||
|
ts, r.anchor_id, r.range_m,
|
||||||
|
r.raw_mm, r.rssi, r.tag_id,
|
||||||
|
])
|
||||||
|
with self._test_lock:
|
||||||
|
if self._test_active:
|
||||||
|
aid = int(r.anchor_id)
|
||||||
|
if aid not in self._test_ranges:
|
||||||
|
self._test_ranges[aid] = RangeAccum(anchor_id=aid)
|
||||||
|
self._test_ranges[aid].add(float(r.range_m))
|
||||||
|
|
||||||
|
# ── Service ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _handle_start_test(
|
||||||
|
self,
|
||||||
|
request: StartAccuracyTest.Request,
|
||||||
|
response: StartAccuracyTest.Response,
|
||||||
|
) -> StartAccuracyTest.Response:
|
||||||
|
|
||||||
|
with self._test_lock:
|
||||||
|
if self._test_active:
|
||||||
|
response.success = False
|
||||||
|
response.message = "Another test is already running"
|
||||||
|
response.test_id = self._test_id
|
||||||
|
return response
|
||||||
|
|
||||||
|
n = int(request.n_samples) if request.n_samples > 0 else self._default_n
|
||||||
|
timeout = float(request.timeout_s) if request.timeout_s > 0.0 else self._default_to
|
||||||
|
test_id = (
|
||||||
|
request.test_id.strip()
|
||||||
|
if request.test_id.strip()
|
||||||
|
else datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Accuracy test '{test_id}' — "
|
||||||
|
f"truth=({request.truth_x_m:.3f}, {request.truth_y_m:.3f}) "
|
||||||
|
f"n={n} timeout={timeout}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
with self._test_lock:
|
||||||
|
self._test_id = test_id
|
||||||
|
self._test_truth_x = float(request.truth_x_m)
|
||||||
|
self._test_truth_y = float(request.truth_y_m)
|
||||||
|
self._test_n_target = n
|
||||||
|
self._test_xs = []
|
||||||
|
self._test_ys = []
|
||||||
|
self._test_ranges = {}
|
||||||
|
self._test_t0 = time.monotonic()
|
||||||
|
self._test_timeout = timeout
|
||||||
|
self._test_done_event.clear()
|
||||||
|
self._test_active = True
|
||||||
|
|
||||||
|
threading.Thread(
|
||||||
|
target=self._run_test,
|
||||||
|
args=(test_id, request.truth_x_m, request.truth_y_m, n, timeout),
|
||||||
|
daemon=True,
|
||||||
|
).start()
|
||||||
|
|
||||||
|
response.success = True
|
||||||
|
response.message = f"Test '{test_id}' started, collecting {n} samples"
|
||||||
|
response.test_id = test_id
|
||||||
|
return response
|
||||||
|
|
||||||
|
def _run_test(
|
||||||
|
self,
|
||||||
|
test_id: str,
|
||||||
|
truth_x: float,
|
||||||
|
truth_y: float,
|
||||||
|
n_target: int,
|
||||||
|
timeout: float,
|
||||||
|
) -> None:
|
||||||
|
finished = self._test_done_event.wait(timeout=timeout)
|
||||||
|
|
||||||
|
with self._test_lock:
|
||||||
|
self._test_active = False
|
||||||
|
xs = list(self._test_xs)
|
||||||
|
ys = list(self._test_ys)
|
||||||
|
ranges_snap = dict(self._test_ranges)
|
||||||
|
t0 = self._test_t0
|
||||||
|
|
||||||
|
duration = time.monotonic() - t0
|
||||||
|
n_got = len(xs)
|
||||||
|
|
||||||
|
if n_got < 2:
|
||||||
|
self.get_logger().error(
|
||||||
|
f"Test '{test_id}' failed: only {n_got} samples in {duration:.1f}s"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if not finished:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Test '{test_id}' timed out — {n_got}/{n_target} samples"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
stats = compute_stats(np.array(xs), np.array(ys), truth_x, truth_y)
|
||||||
|
except ValueError as exc:
|
||||||
|
self.get_logger().error(f"Stats failed: {exc}")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Test '{test_id}' — n={stats.n_samples} "
|
||||||
|
f"CEP50={stats.cep50*100:.1f}cm CEP95={stats.cep95*100:.1f}cm "
|
||||||
|
f"RMSE={stats.rmse*100:.1f}cm max={stats.max_error*100:.1f}cm"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publish report
|
||||||
|
report = AccuracyReport()
|
||||||
|
report.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
report.header.frame_id = "map"
|
||||||
|
report.truth_x_m = truth_x
|
||||||
|
report.truth_y_m = truth_y
|
||||||
|
report.n_samples = stats.n_samples
|
||||||
|
report.mean_x_m = stats.mean_x
|
||||||
|
report.mean_y_m = stats.mean_y
|
||||||
|
report.std_x_m = stats.std_x
|
||||||
|
report.std_y_m = stats.std_y
|
||||||
|
report.std_2d_m = stats.std_2d
|
||||||
|
report.bias_x_m = stats.bias_x
|
||||||
|
report.bias_y_m = stats.bias_y
|
||||||
|
report.cep50_m = stats.cep50
|
||||||
|
report.cep95_m = stats.cep95
|
||||||
|
report.max_error_m = stats.max_error
|
||||||
|
report.rmse_m = stats.rmse
|
||||||
|
report.test_id = test_id
|
||||||
|
report.duration_s = duration
|
||||||
|
|
||||||
|
anchor_ids = sorted(ranges_snap.keys())
|
||||||
|
report.anchor_ids = [int(a) for a in anchor_ids]
|
||||||
|
report.anchor_range_mean_m = [ranges_snap[a].mean for a in anchor_ids]
|
||||||
|
report.anchor_range_std_m = [ranges_snap[a].std for a in anchor_ids]
|
||||||
|
|
||||||
|
self._report_pub.publish(report)
|
||||||
|
|
||||||
|
# Write JSON
|
||||||
|
if self._enable_csv:
|
||||||
|
self._write_json(test_id, truth_x, truth_y, stats,
|
||||||
|
anchor_ids, ranges_snap, duration)
|
||||||
|
|
||||||
|
# ── CSV / JSON helpers ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _open_csv(
|
||||||
|
self, filename: str, headers: list
|
||||||
|
) -> Optional[csv.writer]:
|
||||||
|
if not self._enable_csv:
|
||||||
|
return None
|
||||||
|
path = self._log_dir / filename
|
||||||
|
fh = open(path, "w", newline="", buffering=1)
|
||||||
|
writer = csv.writer(fh)
|
||||||
|
writer.writerow(headers)
|
||||||
|
self._csv_files.append(fh)
|
||||||
|
return writer
|
||||||
|
|
||||||
|
def _flush_csv(self) -> None:
|
||||||
|
for fh in self._csv_files:
|
||||||
|
try:
|
||||||
|
fh.flush()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _write_json(self, test_id, truth_x, truth_y, stats,
|
||||||
|
anchor_ids, ranges_snap, duration) -> None:
|
||||||
|
path = self._log_dir / f"accuracy_{test_id}.json"
|
||||||
|
data = {
|
||||||
|
"test_id": test_id,
|
||||||
|
"truth": {"x_m": truth_x, "y_m": truth_y},
|
||||||
|
"n_samples": stats.n_samples,
|
||||||
|
"duration_s": round(duration, 3),
|
||||||
|
"mean": {"x_m": round(stats.mean_x, 4),
|
||||||
|
"y_m": round(stats.mean_y, 4)},
|
||||||
|
"bias": {"x_m": round(stats.bias_x, 4),
|
||||||
|
"y_m": round(stats.bias_y, 4)},
|
||||||
|
"std": {"x_m": round(stats.std_x, 4),
|
||||||
|
"y_m": round(stats.std_y, 4),
|
||||||
|
"2d_m": round(stats.std_2d, 4)},
|
||||||
|
"cep50_m": round(stats.cep50, 4),
|
||||||
|
"cep95_m": round(stats.cep95, 4),
|
||||||
|
"rmse_m": round(stats.rmse, 4),
|
||||||
|
"max_error_m": round(stats.max_error, 4),
|
||||||
|
"anchors": {
|
||||||
|
str(aid): {
|
||||||
|
"range_mean_m": round(ranges_snap[aid].mean, 4),
|
||||||
|
"range_std_m": round(ranges_snap[aid].std, 4),
|
||||||
|
"n": ranges_snap[aid].count,
|
||||||
|
}
|
||||||
|
for aid in anchor_ids
|
||||||
|
},
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
with open(path, "w") as f:
|
||||||
|
json.dump(data, f, indent=2)
|
||||||
|
self.get_logger().info(f"Report written: {path}")
|
||||||
|
except OSError as exc:
|
||||||
|
self.get_logger().error(f"JSON write failed: {exc}")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _stamp_ns(stamp) -> int:
|
||||||
|
return stamp.sec * 1_000_000_000 + stamp.nanosec
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = UwbLoggerNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_uwb_logger/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_uwb_logger/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_uwb_logger
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_uwb_logger
|
||||||
29
jetson/ros2_ws/src/saltybot_uwb_logger/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_uwb_logger/setup.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_uwb_logger"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages",
|
||||||
|
[f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
|
||||||
|
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-uwb",
|
||||||
|
maintainer_email="sl-uwb@saltylab.local",
|
||||||
|
description="UWB position logger and accuracy analyzer (Issue #634)",
|
||||||
|
license="Apache-2.0",
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"uwb_logger = saltybot_uwb_logger.logger_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
Binary file not shown.
@ -0,0 +1,116 @@
|
|||||||
|
"""Unit tests for saltybot_uwb_logger.accuracy_stats (Issue #634). No ROS2 needed."""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||||
|
from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum
|
||||||
|
|
||||||
|
|
||||||
|
def _circle(n, r, cx=0.0, cy=0.0):
|
||||||
|
a = np.linspace(0, 2 * math.pi, n, endpoint=False)
|
||||||
|
return cx + r * np.cos(a), cy + r * np.sin(a)
|
||||||
|
|
||||||
|
|
||||||
|
class TestComputeStats:
|
||||||
|
def test_zero_error(self):
|
||||||
|
xs = np.full(5, 2.0); ys = np.full(5, 3.0)
|
||||||
|
s = compute_stats(xs, ys, 2.0, 3.0)
|
||||||
|
assert abs(s.cep50) < 1e-9
|
||||||
|
assert abs(s.rmse) < 1e-9
|
||||||
|
assert abs(s.max_error) < 1e-9
|
||||||
|
|
||||||
|
def test_known_bias(self):
|
||||||
|
rng = np.random.default_rng(0)
|
||||||
|
xs = rng.normal(4.0, 0.01, 100); ys = rng.normal(2.0, 0.01, 100)
|
||||||
|
s = compute_stats(xs, ys, 3.0, 2.0)
|
||||||
|
assert abs(s.bias_x - 1.0) < 0.05
|
||||||
|
assert abs(s.bias_y) < 0.05
|
||||||
|
|
||||||
|
def test_cep50_below_cep95(self):
|
||||||
|
rng = np.random.default_rng(1)
|
||||||
|
xs = rng.normal(0, 0.1, 500); ys = rng.normal(0, 0.1, 500)
|
||||||
|
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||||
|
assert s.cep50 < s.cep95
|
||||||
|
|
||||||
|
def test_cep50_is_median(self):
|
||||||
|
rng = np.random.default_rng(2)
|
||||||
|
xs = rng.normal(0, 0.15, 1000); ys = rng.normal(0, 0.15, 1000)
|
||||||
|
errors = np.sqrt(xs**2 + ys**2)
|
||||||
|
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||||
|
assert abs(s.cep50 - float(np.median(errors))) < 1e-9
|
||||||
|
|
||||||
|
def test_rmse(self):
|
||||||
|
xs = np.array([0.1, -0.1, 0.2, -0.2]); ys = np.zeros(4)
|
||||||
|
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||||
|
assert abs(s.rmse - math.sqrt(np.mean(xs**2))) < 1e-9
|
||||||
|
|
||||||
|
def test_max_error(self):
|
||||||
|
xs = np.array([0.0, 0.0, 1.0]); ys = np.zeros(3)
|
||||||
|
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||||
|
assert abs(s.max_error - 1.0) < 1e-9
|
||||||
|
|
||||||
|
def test_circle_symmetry(self):
|
||||||
|
xs, ys = _circle(360, 0.12)
|
||||||
|
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||||
|
assert abs(s.bias_x) < 1e-6
|
||||||
|
assert abs(s.cep50 - 0.12) < 1e-6
|
||||||
|
|
||||||
|
def test_std_2d_pythagorean(self):
|
||||||
|
rng = np.random.default_rng(3)
|
||||||
|
xs = rng.normal(0, 0.1, 200); ys = rng.normal(0, 0.2, 200)
|
||||||
|
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||||
|
assert abs(s.std_2d - math.sqrt(s.std_x**2 + s.std_y**2)) < 1e-9
|
||||||
|
|
||||||
|
def test_n_samples(self):
|
||||||
|
xs = np.ones(42); ys = np.zeros(42)
|
||||||
|
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||||
|
assert s.n_samples == 42
|
||||||
|
|
||||||
|
def test_too_few_raises(self):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
compute_stats(np.array([1.0]), np.array([1.0]), 0.0, 0.0)
|
||||||
|
|
||||||
|
def test_length_mismatch_raises(self):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
compute_stats(np.array([1.0, 2.0]), np.array([1.0]), 0.0, 0.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRealistic:
|
||||||
|
def test_10cm_cep50(self):
|
||||||
|
rng = np.random.default_rng(42)
|
||||||
|
xs = rng.normal(3.0, 0.07, 500); ys = rng.normal(2.0, 0.07, 500)
|
||||||
|
s = compute_stats(xs, ys, 3.0, 2.0)
|
||||||
|
assert 0.05 < s.cep50 < 0.20
|
||||||
|
|
||||||
|
def test_biased_uwb(self):
|
||||||
|
rng = np.random.default_rng(7)
|
||||||
|
xs = rng.normal(3.15, 0.05, 300); ys = rng.normal(2.0, 0.05, 300)
|
||||||
|
s = compute_stats(xs, ys, 3.0, 2.0)
|
||||||
|
assert abs(s.bias_x - 0.15) < 0.02
|
||||||
|
|
||||||
|
|
||||||
|
class TestRangeAccum:
|
||||||
|
def test_empty(self):
|
||||||
|
a = RangeAccum(0)
|
||||||
|
assert a.mean == 0.0 and a.std == 0.0 and a.count == 0
|
||||||
|
|
||||||
|
def test_single_std_zero(self):
|
||||||
|
a = RangeAccum(0); a.add(2.0)
|
||||||
|
assert a.std == 0.0
|
||||||
|
|
||||||
|
def test_mean_std(self):
|
||||||
|
a = RangeAccum(0)
|
||||||
|
for v in [1.0, 2.0, 3.0, 4.0, 5.0]:
|
||||||
|
a.add(v)
|
||||||
|
assert abs(a.mean - 3.0) < 1e-9
|
||||||
|
assert abs(a.std - float(np.std([1,2,3,4,5], ddof=1))) < 1e-9
|
||||||
|
assert a.count == 5
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
15
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/CMakeLists.txt
Normal file
15
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
cmake_minimum_required(VERSION 3.8)
|
||||||
|
project(saltybot_uwb_logger_msgs)
|
||||||
|
|
||||||
|
find_package(ament_cmake REQUIRED)
|
||||||
|
find_package(std_msgs REQUIRED)
|
||||||
|
find_package(rosidl_default_generators REQUIRED)
|
||||||
|
|
||||||
|
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||||
|
"msg/AccuracyReport.msg"
|
||||||
|
"srv/StartAccuracyTest.srv"
|
||||||
|
DEPENDENCIES std_msgs
|
||||||
|
)
|
||||||
|
|
||||||
|
ament_export_dependencies(rosidl_default_runtime)
|
||||||
|
ament_package()
|
||||||
@ -0,0 +1,32 @@
|
|||||||
|
# AccuracyReport.msg — UWB positioning accuracy test results (Issue #634)
|
||||||
|
#
|
||||||
|
# Published on /saltybot/uwb/accuracy_report after StartAccuracyTest completes.
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
float64 truth_x_m
|
||||||
|
float64 truth_y_m
|
||||||
|
|
||||||
|
uint32 n_samples
|
||||||
|
|
||||||
|
float64 mean_x_m
|
||||||
|
float64 mean_y_m
|
||||||
|
|
||||||
|
float64 std_x_m
|
||||||
|
float64 std_y_m
|
||||||
|
float64 std_2d_m
|
||||||
|
|
||||||
|
float64 bias_x_m
|
||||||
|
float64 bias_y_m
|
||||||
|
|
||||||
|
float64 cep50_m
|
||||||
|
float64 cep95_m
|
||||||
|
float64 max_error_m
|
||||||
|
float64 rmse_m
|
||||||
|
|
||||||
|
uint8[] anchor_ids
|
||||||
|
float64[] anchor_range_mean_m
|
||||||
|
float64[] anchor_range_std_m
|
||||||
|
|
||||||
|
string test_id
|
||||||
|
float64 duration_s
|
||||||
21
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/package.xml
Normal file
21
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/package.xml
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_uwb_logger_msgs</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Message and service definitions for UWB logger (Issue #634).</description>
|
||||||
|
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
|
||||||
|
<license>Apache-2.0</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||||
|
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||||
|
|
||||||
|
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_cmake</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,15 @@
|
|||||||
|
# StartAccuracyTest.srv — trigger UWB accuracy test at a known position (Issue #634)
|
||||||
|
|
||||||
|
# ── Request ────────────────────────────────────────────────────────────────
|
||||||
|
float64 truth_x_m
|
||||||
|
float64 truth_y_m
|
||||||
|
uint32 n_samples
|
||||||
|
float64 timeout_s
|
||||||
|
string test_id
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
# ── Response ───────────────────────────────────────────────────────────────
|
||||||
|
bool success
|
||||||
|
string message
|
||||||
|
string test_id
|
||||||
Loading…
x
Reference in New Issue
Block a user