Compare commits
No commits in common. "4c7fa938a557e52e0a7c19615dc3639dedad9cf5" and "45332f1a8b1d8b58fccccd6bcf05bf93af204765" have entirely different histories.
4c7fa938a5
...
45332f1a8b
@ -1,7 +0,0 @@
|
|||||||
uwb_logger:
|
|
||||||
ros__parameters:
|
|
||||||
log_dir: ~/uwb_logs
|
|
||||||
enable_csv: true
|
|
||||||
default_n_samples: 200
|
|
||||||
default_timeout_s: 30.0
|
|
||||||
flush_interval_s: 5.0
|
|
||||||
@ -1,28 +0,0 @@
|
|||||||
"""uwb_logger.launch.py — Launch UWB position logger (Issue #634)."""
|
|
||||||
|
|
||||||
from launch import LaunchDescription
|
|
||||||
from launch.actions import DeclareLaunchArgument
|
|
||||||
from launch.substitutions import LaunchConfiguration
|
|
||||||
from launch_ros.actions import Node
|
|
||||||
from ament_index_python.packages import get_package_share_directory
|
|
||||||
import os
|
|
||||||
|
|
||||||
|
|
||||||
def generate_launch_description() -> LaunchDescription:
|
|
||||||
pkg_dir = get_package_share_directory("saltybot_uwb_logger")
|
|
||||||
params_file = os.path.join(pkg_dir, "config", "uwb_logger_params.yaml")
|
|
||||||
|
|
||||||
return LaunchDescription([
|
|
||||||
DeclareLaunchArgument("log_dir", default_value="~/uwb_logs"),
|
|
||||||
Node(
|
|
||||||
package="saltybot_uwb_logger",
|
|
||||||
executable="uwb_logger",
|
|
||||||
name="uwb_logger",
|
|
||||||
parameters=[
|
|
||||||
params_file,
|
|
||||||
{"log_dir": LaunchConfiguration("log_dir")},
|
|
||||||
],
|
|
||||||
output="screen",
|
|
||||||
emulate_tty=True,
|
|
||||||
),
|
|
||||||
])
|
|
||||||
@ -1,31 +0,0 @@
|
|||||||
<?xml version="1.0"?>
|
|
||||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
|
||||||
<package format="3">
|
|
||||||
<name>saltybot_uwb_logger</name>
|
|
||||||
<version>0.1.0</version>
|
|
||||||
<description>
|
|
||||||
UWB position logger and accuracy analyzer (Issue #634).
|
|
||||||
Logs fused pose, raw UWB pose, per-anchor ranges to CSV.
|
|
||||||
Provides /saltybot/uwb/start_accuracy_test: collects N samples at a
|
|
||||||
known position and computes CEP50, CEP95, RMSE, max error.
|
|
||||||
</description>
|
|
||||||
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
|
|
||||||
<license>Apache-2.0</license>
|
|
||||||
|
|
||||||
<depend>rclpy</depend>
|
|
||||||
<depend>geometry_msgs</depend>
|
|
||||||
<depend>std_msgs</depend>
|
|
||||||
<depend>saltybot_uwb_msgs</depend>
|
|
||||||
<depend>saltybot_uwb_logger_msgs</depend>
|
|
||||||
|
|
||||||
<exec_depend>python3-numpy</exec_depend>
|
|
||||||
|
|
||||||
<test_depend>ament_copyright</test_depend>
|
|
||||||
<test_depend>ament_flake8</test_depend>
|
|
||||||
<test_depend>ament_pep257</test_depend>
|
|
||||||
<test_depend>python3-pytest</test_depend>
|
|
||||||
|
|
||||||
<export>
|
|
||||||
<build_type>ament_python</build_type>
|
|
||||||
</export>
|
|
||||||
</package>
|
|
||||||
Binary file not shown.
Binary file not shown.
@ -1,99 +0,0 @@
|
|||||||
"""
|
|
||||||
accuracy_stats.py — UWB positioning accuracy statistics (Issue #634)
|
|
||||||
|
|
||||||
Pure numpy, no ROS2 dependency.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import math
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class AccuracyStats:
|
|
||||||
n_samples: int
|
|
||||||
mean_x: float
|
|
||||||
mean_y: float
|
|
||||||
std_x: float
|
|
||||||
std_y: float
|
|
||||||
std_2d: float
|
|
||||||
bias_x: float
|
|
||||||
bias_y: float
|
|
||||||
cep50: float
|
|
||||||
cep95: float
|
|
||||||
max_error: float
|
|
||||||
rmse: float
|
|
||||||
|
|
||||||
|
|
||||||
def compute_stats(
|
|
||||||
xs: np.ndarray,
|
|
||||||
ys: np.ndarray,
|
|
||||||
truth_x: float,
|
|
||||||
truth_y: float,
|
|
||||||
) -> AccuracyStats:
|
|
||||||
"""
|
|
||||||
Compute accuracy statistics from position samples.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
xs, ys : 1-D arrays of measured x / y positions (metres)
|
|
||||||
truth_x/y : known ground-truth position (metres)
|
|
||||||
|
|
||||||
Raises
|
|
||||||
------
|
|
||||||
ValueError if fewer than 2 samples or mismatched lengths.
|
|
||||||
"""
|
|
||||||
xs = np.asarray(xs, dtype=float)
|
|
||||||
ys = np.asarray(ys, dtype=float)
|
|
||||||
n = len(xs)
|
|
||||||
if n < 2:
|
|
||||||
raise ValueError(f"Need at least 2 samples, got {n}")
|
|
||||||
if len(ys) != n:
|
|
||||||
raise ValueError("xs and ys must have the same length")
|
|
||||||
|
|
||||||
errors = np.sqrt((xs - truth_x) ** 2 + (ys - truth_y) ** 2)
|
|
||||||
|
|
||||||
mean_x = float(np.mean(xs))
|
|
||||||
mean_y = float(np.mean(ys))
|
|
||||||
std_x = float(np.std(xs, ddof=1))
|
|
||||||
std_y = float(np.std(ys, ddof=1))
|
|
||||||
|
|
||||||
return AccuracyStats(
|
|
||||||
n_samples = n,
|
|
||||||
mean_x = mean_x,
|
|
||||||
mean_y = mean_y,
|
|
||||||
std_x = std_x,
|
|
||||||
std_y = std_y,
|
|
||||||
std_2d = math.sqrt(std_x ** 2 + std_y ** 2),
|
|
||||||
bias_x = mean_x - truth_x,
|
|
||||||
bias_y = mean_y - truth_y,
|
|
||||||
cep50 = float(np.percentile(errors, 50)),
|
|
||||||
cep95 = float(np.percentile(errors, 95)),
|
|
||||||
max_error = float(np.max(errors)),
|
|
||||||
rmse = float(np.sqrt(np.mean(errors ** 2))),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class RangeAccum:
|
|
||||||
"""Running accumulator for per-anchor range statistics."""
|
|
||||||
anchor_id: int
|
|
||||||
_samples: list = field(default_factory=list, repr=False)
|
|
||||||
|
|
||||||
def add(self, range_m: float) -> None:
|
|
||||||
self._samples.append(range_m)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def count(self) -> int:
|
|
||||||
return len(self._samples)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def mean(self) -> float:
|
|
||||||
return float(np.mean(self._samples)) if self._samples else 0.0
|
|
||||||
|
|
||||||
@property
|
|
||||||
def std(self) -> float:
|
|
||||||
return float(np.std(self._samples, ddof=1)) if len(self._samples) >= 2 else 0.0
|
|
||||||
@ -1,409 +0,0 @@
|
|||||||
"""
|
|
||||||
logger_node.py — UWB position logger and accuracy analyzer (Issue #634)
|
|
||||||
|
|
||||||
Subscriptions
|
|
||||||
─────────────
|
|
||||||
/saltybot/pose/fused geometry_msgs/PoseStamped — EKF-fused pose (200 Hz)
|
|
||||||
/saltybot/uwb/pose geometry_msgs/PoseStamped — raw UWB pose (10 Hz)
|
|
||||||
/uwb/ranges saltybot_uwb_msgs/UwbRangeArray — per-anchor ranges
|
|
||||||
|
|
||||||
Service
|
|
||||||
───────
|
|
||||||
/saltybot/uwb/start_accuracy_test StartAccuracyTest
|
|
||||||
Collects N fused-pose samples at known position, computes accuracy
|
|
||||||
metrics, publishes AccuracyReport, writes JSON summary.
|
|
||||||
|
|
||||||
Publishes
|
|
||||||
─────────
|
|
||||||
/saltybot/uwb/accuracy_report AccuracyReport
|
|
||||||
|
|
||||||
CSV logging (continuous, to log_dir)
|
|
||||||
─────────────────────────────────────
|
|
||||||
fused_pose_<DATE>.csv — timestamp_ns, x_m, y_m, heading_rad
|
|
||||||
uwb_pose_<DATE>.csv — timestamp_ns, x_m, y_m
|
|
||||||
uwb_ranges_<DATE>.csv — timestamp_ns, anchor_id, range_m, raw_mm, rssi, tag_id
|
|
||||||
accuracy_<test_id>.json — accuracy test results
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
──────────
|
|
||||||
log_dir ~/uwb_logs
|
|
||||||
enable_csv true
|
|
||||||
default_n_samples 200
|
|
||||||
default_timeout_s 30.0
|
|
||||||
flush_interval_s 5.0
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import csv
|
|
||||||
import json
|
|
||||||
import math
|
|
||||||
import os
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
|
||||||
|
|
||||||
from geometry_msgs.msg import PoseStamped
|
|
||||||
from saltybot_uwb_msgs.msg import UwbRangeArray
|
|
||||||
from saltybot_uwb_logger_msgs.msg import AccuracyReport
|
|
||||||
from saltybot_uwb_logger_msgs.srv import StartAccuracyTest
|
|
||||||
|
|
||||||
from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum
|
|
||||||
|
|
||||||
|
|
||||||
_SENSOR_QOS = QoSProfile(
|
|
||||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
|
||||||
history=HistoryPolicy.KEEP_LAST,
|
|
||||||
depth=10,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class UwbLoggerNode(Node):
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
super().__init__("uwb_logger")
|
|
||||||
|
|
||||||
# ── Parameters ────────────────────────────────────────────────────
|
|
||||||
self.declare_parameter("log_dir", os.path.expanduser("~/uwb_logs"))
|
|
||||||
self.declare_parameter("enable_csv", True)
|
|
||||||
self.declare_parameter("default_n_samples", 200)
|
|
||||||
self.declare_parameter("default_timeout_s", 30.0)
|
|
||||||
self.declare_parameter("flush_interval_s", 5.0)
|
|
||||||
|
|
||||||
self._log_dir = Path(self.get_parameter("log_dir").value)
|
|
||||||
self._enable_csv = self.get_parameter("enable_csv").value
|
|
||||||
self._default_n = self.get_parameter("default_n_samples").value
|
|
||||||
self._default_to = self.get_parameter("default_timeout_s").value
|
|
||||||
self._flush_iv = self.get_parameter("flush_interval_s").value
|
|
||||||
|
|
||||||
if self._enable_csv:
|
|
||||||
self._log_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
# ── CSV writers ───────────────────────────────────────────────────
|
|
||||||
date_tag = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
||||||
self._csv_lock = threading.Lock()
|
|
||||||
self._csv_files = []
|
|
||||||
self._fused_csv = self._open_csv(
|
|
||||||
f"fused_pose_{date_tag}.csv",
|
|
||||||
["timestamp_ns", "x_m", "y_m", "heading_rad"])
|
|
||||||
self._uwb_csv = self._open_csv(
|
|
||||||
f"uwb_pose_{date_tag}.csv",
|
|
||||||
["timestamp_ns", "x_m", "y_m"])
|
|
||||||
self._ranges_csv = self._open_csv(
|
|
||||||
f"uwb_ranges_{date_tag}.csv",
|
|
||||||
["timestamp_ns", "anchor_id", "range_m", "raw_mm", "rssi", "tag_id"])
|
|
||||||
|
|
||||||
# ── Accuracy test state ────────────────────────────────────────────
|
|
||||||
self._test_lock = threading.Lock()
|
|
||||||
self._test_active = False
|
|
||||||
self._test_id = ""
|
|
||||||
self._test_truth_x = 0.0
|
|
||||||
self._test_truth_y = 0.0
|
|
||||||
self._test_n_target = 0
|
|
||||||
self._test_xs: list = []
|
|
||||||
self._test_ys: list = []
|
|
||||||
self._test_ranges: dict[int, RangeAccum] = {}
|
|
||||||
self._test_t0 = 0.0
|
|
||||||
self._test_timeout = 0.0
|
|
||||||
self._test_done_event = threading.Event()
|
|
||||||
|
|
||||||
# ── Publisher ─────────────────────────────────────────────────────
|
|
||||||
self._report_pub = self.create_publisher(
|
|
||||||
AccuracyReport, "/saltybot/uwb/accuracy_report", 10
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Subscriptions ─────────────────────────────────────────────────
|
|
||||||
self.create_subscription(
|
|
||||||
PoseStamped, "/saltybot/pose/fused",
|
|
||||||
self._fused_cb, _SENSOR_QOS)
|
|
||||||
self.create_subscription(
|
|
||||||
PoseStamped, "/saltybot/uwb/pose",
|
|
||||||
self._uwb_pose_cb, _SENSOR_QOS)
|
|
||||||
self.create_subscription(
|
|
||||||
UwbRangeArray, "/uwb/ranges",
|
|
||||||
self._ranges_cb, _SENSOR_QOS)
|
|
||||||
|
|
||||||
# ── Service ───────────────────────────────────────────────────────
|
|
||||||
self._srv = self.create_service(
|
|
||||||
StartAccuracyTest,
|
|
||||||
"/saltybot/uwb/start_accuracy_test",
|
|
||||||
self._handle_start_test)
|
|
||||||
|
|
||||||
# ── Periodic flush ────────────────────────────────────────────────
|
|
||||||
self.create_timer(self._flush_iv, self._flush_csv)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"UWB logger ready — log_dir={self._log_dir} csv={self._enable_csv}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def destroy_node(self) -> None:
|
|
||||||
self._flush_csv()
|
|
||||||
for fh in self._csv_files:
|
|
||||||
try:
|
|
||||||
fh.close()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
super().destroy_node()
|
|
||||||
|
|
||||||
# ── Callbacks ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _fused_cb(self, msg: PoseStamped) -> None:
|
|
||||||
ts = _stamp_ns(msg.header.stamp)
|
|
||||||
x = msg.pose.position.x
|
|
||||||
y = msg.pose.position.y
|
|
||||||
q = msg.pose.orientation
|
|
||||||
heading = 2.0 * math.atan2(float(q.z), float(q.w))
|
|
||||||
|
|
||||||
if self._enable_csv:
|
|
||||||
with self._csv_lock:
|
|
||||||
if self._fused_csv:
|
|
||||||
self._fused_csv.writerow([ts, x, y, heading])
|
|
||||||
|
|
||||||
with self._test_lock:
|
|
||||||
if self._test_active:
|
|
||||||
self._test_xs.append(x)
|
|
||||||
self._test_ys.append(y)
|
|
||||||
if len(self._test_xs) >= self._test_n_target:
|
|
||||||
self._test_active = False
|
|
||||||
self._test_done_event.set()
|
|
||||||
|
|
||||||
def _uwb_pose_cb(self, msg: PoseStamped) -> None:
|
|
||||||
ts = _stamp_ns(msg.header.stamp)
|
|
||||||
x = msg.pose.position.x
|
|
||||||
y = msg.pose.position.y
|
|
||||||
if self._enable_csv:
|
|
||||||
with self._csv_lock:
|
|
||||||
if self._uwb_csv:
|
|
||||||
self._uwb_csv.writerow([ts, x, y])
|
|
||||||
|
|
||||||
def _ranges_cb(self, msg: UwbRangeArray) -> None:
|
|
||||||
ts = _stamp_ns(msg.header.stamp)
|
|
||||||
for r in msg.ranges:
|
|
||||||
if self._enable_csv:
|
|
||||||
with self._csv_lock:
|
|
||||||
if self._ranges_csv:
|
|
||||||
self._ranges_csv.writerow([
|
|
||||||
ts, r.anchor_id, r.range_m,
|
|
||||||
r.raw_mm, r.rssi, r.tag_id,
|
|
||||||
])
|
|
||||||
with self._test_lock:
|
|
||||||
if self._test_active:
|
|
||||||
aid = int(r.anchor_id)
|
|
||||||
if aid not in self._test_ranges:
|
|
||||||
self._test_ranges[aid] = RangeAccum(anchor_id=aid)
|
|
||||||
self._test_ranges[aid].add(float(r.range_m))
|
|
||||||
|
|
||||||
# ── Service ────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _handle_start_test(
|
|
||||||
self,
|
|
||||||
request: StartAccuracyTest.Request,
|
|
||||||
response: StartAccuracyTest.Response,
|
|
||||||
) -> StartAccuracyTest.Response:
|
|
||||||
|
|
||||||
with self._test_lock:
|
|
||||||
if self._test_active:
|
|
||||||
response.success = False
|
|
||||||
response.message = "Another test is already running"
|
|
||||||
response.test_id = self._test_id
|
|
||||||
return response
|
|
||||||
|
|
||||||
n = int(request.n_samples) if request.n_samples > 0 else self._default_n
|
|
||||||
timeout = float(request.timeout_s) if request.timeout_s > 0.0 else self._default_to
|
|
||||||
test_id = (
|
|
||||||
request.test_id.strip()
|
|
||||||
if request.test_id.strip()
|
|
||||||
else datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
||||||
)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"Accuracy test '{test_id}' — "
|
|
||||||
f"truth=({request.truth_x_m:.3f}, {request.truth_y_m:.3f}) "
|
|
||||||
f"n={n} timeout={timeout}s"
|
|
||||||
)
|
|
||||||
|
|
||||||
with self._test_lock:
|
|
||||||
self._test_id = test_id
|
|
||||||
self._test_truth_x = float(request.truth_x_m)
|
|
||||||
self._test_truth_y = float(request.truth_y_m)
|
|
||||||
self._test_n_target = n
|
|
||||||
self._test_xs = []
|
|
||||||
self._test_ys = []
|
|
||||||
self._test_ranges = {}
|
|
||||||
self._test_t0 = time.monotonic()
|
|
||||||
self._test_timeout = timeout
|
|
||||||
self._test_done_event.clear()
|
|
||||||
self._test_active = True
|
|
||||||
|
|
||||||
threading.Thread(
|
|
||||||
target=self._run_test,
|
|
||||||
args=(test_id, request.truth_x_m, request.truth_y_m, n, timeout),
|
|
||||||
daemon=True,
|
|
||||||
).start()
|
|
||||||
|
|
||||||
response.success = True
|
|
||||||
response.message = f"Test '{test_id}' started, collecting {n} samples"
|
|
||||||
response.test_id = test_id
|
|
||||||
return response
|
|
||||||
|
|
||||||
def _run_test(
|
|
||||||
self,
|
|
||||||
test_id: str,
|
|
||||||
truth_x: float,
|
|
||||||
truth_y: float,
|
|
||||||
n_target: int,
|
|
||||||
timeout: float,
|
|
||||||
) -> None:
|
|
||||||
finished = self._test_done_event.wait(timeout=timeout)
|
|
||||||
|
|
||||||
with self._test_lock:
|
|
||||||
self._test_active = False
|
|
||||||
xs = list(self._test_xs)
|
|
||||||
ys = list(self._test_ys)
|
|
||||||
ranges_snap = dict(self._test_ranges)
|
|
||||||
t0 = self._test_t0
|
|
||||||
|
|
||||||
duration = time.monotonic() - t0
|
|
||||||
n_got = len(xs)
|
|
||||||
|
|
||||||
if n_got < 2:
|
|
||||||
self.get_logger().error(
|
|
||||||
f"Test '{test_id}' failed: only {n_got} samples in {duration:.1f}s"
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
if not finished:
|
|
||||||
self.get_logger().warn(
|
|
||||||
f"Test '{test_id}' timed out — {n_got}/{n_target} samples"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
stats = compute_stats(np.array(xs), np.array(ys), truth_x, truth_y)
|
|
||||||
except ValueError as exc:
|
|
||||||
self.get_logger().error(f"Stats failed: {exc}")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"Test '{test_id}' — n={stats.n_samples} "
|
|
||||||
f"CEP50={stats.cep50*100:.1f}cm CEP95={stats.cep95*100:.1f}cm "
|
|
||||||
f"RMSE={stats.rmse*100:.1f}cm max={stats.max_error*100:.1f}cm"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Publish report
|
|
||||||
report = AccuracyReport()
|
|
||||||
report.header.stamp = self.get_clock().now().to_msg()
|
|
||||||
report.header.frame_id = "map"
|
|
||||||
report.truth_x_m = truth_x
|
|
||||||
report.truth_y_m = truth_y
|
|
||||||
report.n_samples = stats.n_samples
|
|
||||||
report.mean_x_m = stats.mean_x
|
|
||||||
report.mean_y_m = stats.mean_y
|
|
||||||
report.std_x_m = stats.std_x
|
|
||||||
report.std_y_m = stats.std_y
|
|
||||||
report.std_2d_m = stats.std_2d
|
|
||||||
report.bias_x_m = stats.bias_x
|
|
||||||
report.bias_y_m = stats.bias_y
|
|
||||||
report.cep50_m = stats.cep50
|
|
||||||
report.cep95_m = stats.cep95
|
|
||||||
report.max_error_m = stats.max_error
|
|
||||||
report.rmse_m = stats.rmse
|
|
||||||
report.test_id = test_id
|
|
||||||
report.duration_s = duration
|
|
||||||
|
|
||||||
anchor_ids = sorted(ranges_snap.keys())
|
|
||||||
report.anchor_ids = [int(a) for a in anchor_ids]
|
|
||||||
report.anchor_range_mean_m = [ranges_snap[a].mean for a in anchor_ids]
|
|
||||||
report.anchor_range_std_m = [ranges_snap[a].std for a in anchor_ids]
|
|
||||||
|
|
||||||
self._report_pub.publish(report)
|
|
||||||
|
|
||||||
# Write JSON
|
|
||||||
if self._enable_csv:
|
|
||||||
self._write_json(test_id, truth_x, truth_y, stats,
|
|
||||||
anchor_ids, ranges_snap, duration)
|
|
||||||
|
|
||||||
# ── CSV / JSON helpers ─────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _open_csv(
|
|
||||||
self, filename: str, headers: list
|
|
||||||
) -> Optional[csv.writer]:
|
|
||||||
if not self._enable_csv:
|
|
||||||
return None
|
|
||||||
path = self._log_dir / filename
|
|
||||||
fh = open(path, "w", newline="", buffering=1)
|
|
||||||
writer = csv.writer(fh)
|
|
||||||
writer.writerow(headers)
|
|
||||||
self._csv_files.append(fh)
|
|
||||||
return writer
|
|
||||||
|
|
||||||
def _flush_csv(self) -> None:
|
|
||||||
for fh in self._csv_files:
|
|
||||||
try:
|
|
||||||
fh.flush()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _write_json(self, test_id, truth_x, truth_y, stats,
|
|
||||||
anchor_ids, ranges_snap, duration) -> None:
|
|
||||||
path = self._log_dir / f"accuracy_{test_id}.json"
|
|
||||||
data = {
|
|
||||||
"test_id": test_id,
|
|
||||||
"truth": {"x_m": truth_x, "y_m": truth_y},
|
|
||||||
"n_samples": stats.n_samples,
|
|
||||||
"duration_s": round(duration, 3),
|
|
||||||
"mean": {"x_m": round(stats.mean_x, 4),
|
|
||||||
"y_m": round(stats.mean_y, 4)},
|
|
||||||
"bias": {"x_m": round(stats.bias_x, 4),
|
|
||||||
"y_m": round(stats.bias_y, 4)},
|
|
||||||
"std": {"x_m": round(stats.std_x, 4),
|
|
||||||
"y_m": round(stats.std_y, 4),
|
|
||||||
"2d_m": round(stats.std_2d, 4)},
|
|
||||||
"cep50_m": round(stats.cep50, 4),
|
|
||||||
"cep95_m": round(stats.cep95, 4),
|
|
||||||
"rmse_m": round(stats.rmse, 4),
|
|
||||||
"max_error_m": round(stats.max_error, 4),
|
|
||||||
"anchors": {
|
|
||||||
str(aid): {
|
|
||||||
"range_mean_m": round(ranges_snap[aid].mean, 4),
|
|
||||||
"range_std_m": round(ranges_snap[aid].std, 4),
|
|
||||||
"n": ranges_snap[aid].count,
|
|
||||||
}
|
|
||||||
for aid in anchor_ids
|
|
||||||
},
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
with open(path, "w") as f:
|
|
||||||
json.dump(data, f, indent=2)
|
|
||||||
self.get_logger().info(f"Report written: {path}")
|
|
||||||
except OSError as exc:
|
|
||||||
self.get_logger().error(f"JSON write failed: {exc}")
|
|
||||||
|
|
||||||
|
|
||||||
# ── Helpers ────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _stamp_ns(stamp) -> int:
|
|
||||||
return stamp.sec * 1_000_000_000 + stamp.nanosec
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None) -> None:
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = UwbLoggerNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.try_shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@ -1,4 +0,0 @@
|
|||||||
[develop]
|
|
||||||
script_dir=$base/lib/saltybot_uwb_logger
|
|
||||||
[install]
|
|
||||||
install_scripts=$base/lib/saltybot_uwb_logger
|
|
||||||
@ -1,29 +0,0 @@
|
|||||||
import os
|
|
||||||
from glob import glob
|
|
||||||
from setuptools import setup
|
|
||||||
|
|
||||||
package_name = "saltybot_uwb_logger"
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name=package_name,
|
|
||||||
version="0.1.0",
|
|
||||||
packages=[package_name],
|
|
||||||
data_files=[
|
|
||||||
("share/ament_index/resource_index/packages",
|
|
||||||
[f"resource/{package_name}"]),
|
|
||||||
(f"share/{package_name}", ["package.xml"]),
|
|
||||||
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
|
|
||||||
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
|
|
||||||
],
|
|
||||||
install_requires=["setuptools"],
|
|
||||||
zip_safe=True,
|
|
||||||
maintainer="sl-uwb",
|
|
||||||
maintainer_email="sl-uwb@saltylab.local",
|
|
||||||
description="UWB position logger and accuracy analyzer (Issue #634)",
|
|
||||||
license="Apache-2.0",
|
|
||||||
entry_points={
|
|
||||||
"console_scripts": [
|
|
||||||
"uwb_logger = saltybot_uwb_logger.logger_node:main",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
Binary file not shown.
@ -1,116 +0,0 @@
|
|||||||
"""Unit tests for saltybot_uwb_logger.accuracy_stats (Issue #634). No ROS2 needed."""
|
|
||||||
|
|
||||||
import math
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
||||||
from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum
|
|
||||||
|
|
||||||
|
|
||||||
def _circle(n, r, cx=0.0, cy=0.0):
|
|
||||||
a = np.linspace(0, 2 * math.pi, n, endpoint=False)
|
|
||||||
return cx + r * np.cos(a), cy + r * np.sin(a)
|
|
||||||
|
|
||||||
|
|
||||||
class TestComputeStats:
|
|
||||||
def test_zero_error(self):
|
|
||||||
xs = np.full(5, 2.0); ys = np.full(5, 3.0)
|
|
||||||
s = compute_stats(xs, ys, 2.0, 3.0)
|
|
||||||
assert abs(s.cep50) < 1e-9
|
|
||||||
assert abs(s.rmse) < 1e-9
|
|
||||||
assert abs(s.max_error) < 1e-9
|
|
||||||
|
|
||||||
def test_known_bias(self):
|
|
||||||
rng = np.random.default_rng(0)
|
|
||||||
xs = rng.normal(4.0, 0.01, 100); ys = rng.normal(2.0, 0.01, 100)
|
|
||||||
s = compute_stats(xs, ys, 3.0, 2.0)
|
|
||||||
assert abs(s.bias_x - 1.0) < 0.05
|
|
||||||
assert abs(s.bias_y) < 0.05
|
|
||||||
|
|
||||||
def test_cep50_below_cep95(self):
|
|
||||||
rng = np.random.default_rng(1)
|
|
||||||
xs = rng.normal(0, 0.1, 500); ys = rng.normal(0, 0.1, 500)
|
|
||||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
|
||||||
assert s.cep50 < s.cep95
|
|
||||||
|
|
||||||
def test_cep50_is_median(self):
|
|
||||||
rng = np.random.default_rng(2)
|
|
||||||
xs = rng.normal(0, 0.15, 1000); ys = rng.normal(0, 0.15, 1000)
|
|
||||||
errors = np.sqrt(xs**2 + ys**2)
|
|
||||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
|
||||||
assert abs(s.cep50 - float(np.median(errors))) < 1e-9
|
|
||||||
|
|
||||||
def test_rmse(self):
|
|
||||||
xs = np.array([0.1, -0.1, 0.2, -0.2]); ys = np.zeros(4)
|
|
||||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
|
||||||
assert abs(s.rmse - math.sqrt(np.mean(xs**2))) < 1e-9
|
|
||||||
|
|
||||||
def test_max_error(self):
|
|
||||||
xs = np.array([0.0, 0.0, 1.0]); ys = np.zeros(3)
|
|
||||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
|
||||||
assert abs(s.max_error - 1.0) < 1e-9
|
|
||||||
|
|
||||||
def test_circle_symmetry(self):
|
|
||||||
xs, ys = _circle(360, 0.12)
|
|
||||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
|
||||||
assert abs(s.bias_x) < 1e-6
|
|
||||||
assert abs(s.cep50 - 0.12) < 1e-6
|
|
||||||
|
|
||||||
def test_std_2d_pythagorean(self):
|
|
||||||
rng = np.random.default_rng(3)
|
|
||||||
xs = rng.normal(0, 0.1, 200); ys = rng.normal(0, 0.2, 200)
|
|
||||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
|
||||||
assert abs(s.std_2d - math.sqrt(s.std_x**2 + s.std_y**2)) < 1e-9
|
|
||||||
|
|
||||||
def test_n_samples(self):
|
|
||||||
xs = np.ones(42); ys = np.zeros(42)
|
|
||||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
|
||||||
assert s.n_samples == 42
|
|
||||||
|
|
||||||
def test_too_few_raises(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
compute_stats(np.array([1.0]), np.array([1.0]), 0.0, 0.0)
|
|
||||||
|
|
||||||
def test_length_mismatch_raises(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
compute_stats(np.array([1.0, 2.0]), np.array([1.0]), 0.0, 0.0)
|
|
||||||
|
|
||||||
|
|
||||||
class TestRealistic:
|
|
||||||
def test_10cm_cep50(self):
|
|
||||||
rng = np.random.default_rng(42)
|
|
||||||
xs = rng.normal(3.0, 0.07, 500); ys = rng.normal(2.0, 0.07, 500)
|
|
||||||
s = compute_stats(xs, ys, 3.0, 2.0)
|
|
||||||
assert 0.05 < s.cep50 < 0.20
|
|
||||||
|
|
||||||
def test_biased_uwb(self):
|
|
||||||
rng = np.random.default_rng(7)
|
|
||||||
xs = rng.normal(3.15, 0.05, 300); ys = rng.normal(2.0, 0.05, 300)
|
|
||||||
s = compute_stats(xs, ys, 3.0, 2.0)
|
|
||||||
assert abs(s.bias_x - 0.15) < 0.02
|
|
||||||
|
|
||||||
|
|
||||||
class TestRangeAccum:
|
|
||||||
def test_empty(self):
|
|
||||||
a = RangeAccum(0)
|
|
||||||
assert a.mean == 0.0 and a.std == 0.0 and a.count == 0
|
|
||||||
|
|
||||||
def test_single_std_zero(self):
|
|
||||||
a = RangeAccum(0); a.add(2.0)
|
|
||||||
assert a.std == 0.0
|
|
||||||
|
|
||||||
def test_mean_std(self):
|
|
||||||
a = RangeAccum(0)
|
|
||||||
for v in [1.0, 2.0, 3.0, 4.0, 5.0]:
|
|
||||||
a.add(v)
|
|
||||||
assert abs(a.mean - 3.0) < 1e-9
|
|
||||||
assert abs(a.std - float(np.std([1,2,3,4,5], ddof=1))) < 1e-9
|
|
||||||
assert a.count == 5
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
pytest.main([__file__, "-v"])
|
|
||||||
@ -1,15 +0,0 @@
|
|||||||
cmake_minimum_required(VERSION 3.8)
|
|
||||||
project(saltybot_uwb_logger_msgs)
|
|
||||||
|
|
||||||
find_package(ament_cmake REQUIRED)
|
|
||||||
find_package(std_msgs REQUIRED)
|
|
||||||
find_package(rosidl_default_generators REQUIRED)
|
|
||||||
|
|
||||||
rosidl_generate_interfaces(${PROJECT_NAME}
|
|
||||||
"msg/AccuracyReport.msg"
|
|
||||||
"srv/StartAccuracyTest.srv"
|
|
||||||
DEPENDENCIES std_msgs
|
|
||||||
)
|
|
||||||
|
|
||||||
ament_export_dependencies(rosidl_default_runtime)
|
|
||||||
ament_package()
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
# AccuracyReport.msg — UWB positioning accuracy test results (Issue #634)
|
|
||||||
#
|
|
||||||
# Published on /saltybot/uwb/accuracy_report after StartAccuracyTest completes.
|
|
||||||
|
|
||||||
std_msgs/Header header
|
|
||||||
|
|
||||||
float64 truth_x_m
|
|
||||||
float64 truth_y_m
|
|
||||||
|
|
||||||
uint32 n_samples
|
|
||||||
|
|
||||||
float64 mean_x_m
|
|
||||||
float64 mean_y_m
|
|
||||||
|
|
||||||
float64 std_x_m
|
|
||||||
float64 std_y_m
|
|
||||||
float64 std_2d_m
|
|
||||||
|
|
||||||
float64 bias_x_m
|
|
||||||
float64 bias_y_m
|
|
||||||
|
|
||||||
float64 cep50_m
|
|
||||||
float64 cep95_m
|
|
||||||
float64 max_error_m
|
|
||||||
float64 rmse_m
|
|
||||||
|
|
||||||
uint8[] anchor_ids
|
|
||||||
float64[] anchor_range_mean_m
|
|
||||||
float64[] anchor_range_std_m
|
|
||||||
|
|
||||||
string test_id
|
|
||||||
float64 duration_s
|
|
||||||
@ -1,21 +0,0 @@
|
|||||||
<?xml version="1.0"?>
|
|
||||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
|
||||||
<package format="3">
|
|
||||||
<name>saltybot_uwb_logger_msgs</name>
|
|
||||||
<version>0.1.0</version>
|
|
||||||
<description>Message and service definitions for UWB logger (Issue #634).</description>
|
|
||||||
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
|
|
||||||
<license>Apache-2.0</license>
|
|
||||||
|
|
||||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
|
||||||
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
|
||||||
|
|
||||||
<depend>std_msgs</depend>
|
|
||||||
<exec_depend>rosidl_default_runtime</exec_depend>
|
|
||||||
|
|
||||||
<member_of_group>rosidl_interface_packages</member_of_group>
|
|
||||||
|
|
||||||
<export>
|
|
||||||
<build_type>ament_cmake</build_type>
|
|
||||||
</export>
|
|
||||||
</package>
|
|
||||||
@ -1,15 +0,0 @@
|
|||||||
# StartAccuracyTest.srv — trigger UWB accuracy test at a known position (Issue #634)
|
|
||||||
|
|
||||||
# ── Request ────────────────────────────────────────────────────────────────
|
|
||||||
float64 truth_x_m
|
|
||||||
float64 truth_y_m
|
|
||||||
uint32 n_samples
|
|
||||||
float64 timeout_s
|
|
||||||
string test_id
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
# ── Response ───────────────────────────────────────────────────────────────
|
|
||||||
bool success
|
|
||||||
string message
|
|
||||||
string test_id
|
|
||||||
Loading…
x
Reference in New Issue
Block a user