Merge pull request 'feat: UWB accuracy analyzer (Issue #634)' (#641) from sl-uwb/issue-634-uwb-logger into main
This commit is contained in:
commit
4c7fa938a5
@ -0,0 +1,7 @@
|
||||
uwb_logger:
|
||||
ros__parameters:
|
||||
log_dir: ~/uwb_logs
|
||||
enable_csv: true
|
||||
default_n_samples: 200
|
||||
default_timeout_s: 30.0
|
||||
flush_interval_s: 5.0
|
||||
@ -0,0 +1,28 @@
|
||||
"""uwb_logger.launch.py — Launch UWB position logger (Issue #634)."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
import os
|
||||
|
||||
|
||||
def generate_launch_description() -> LaunchDescription:
|
||||
pkg_dir = get_package_share_directory("saltybot_uwb_logger")
|
||||
params_file = os.path.join(pkg_dir, "config", "uwb_logger_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument("log_dir", default_value="~/uwb_logs"),
|
||||
Node(
|
||||
package="saltybot_uwb_logger",
|
||||
executable="uwb_logger",
|
||||
name="uwb_logger",
|
||||
parameters=[
|
||||
params_file,
|
||||
{"log_dir": LaunchConfiguration("log_dir")},
|
||||
],
|
||||
output="screen",
|
||||
emulate_tty=True,
|
||||
),
|
||||
])
|
||||
31
jetson/ros2_ws/src/saltybot_uwb_logger/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_uwb_logger/package.xml
Normal file
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_uwb_logger</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
UWB position logger and accuracy analyzer (Issue #634).
|
||||
Logs fused pose, raw UWB pose, per-anchor ranges to CSV.
|
||||
Provides /saltybot/uwb/start_accuracy_test: collects N samples at a
|
||||
known position and computes CEP50, CEP95, RMSE, max error.
|
||||
</description>
|
||||
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
|
||||
<license>Apache-2.0</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>saltybot_uwb_msgs</depend>
|
||||
<depend>saltybot_uwb_logger_msgs</depend>
|
||||
|
||||
<exec_depend>python3-numpy</exec_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
Binary file not shown.
Binary file not shown.
@ -0,0 +1,99 @@
|
||||
"""
|
||||
accuracy_stats.py — UWB positioning accuracy statistics (Issue #634)
|
||||
|
||||
Pure numpy, no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@dataclass
|
||||
class AccuracyStats:
|
||||
n_samples: int
|
||||
mean_x: float
|
||||
mean_y: float
|
||||
std_x: float
|
||||
std_y: float
|
||||
std_2d: float
|
||||
bias_x: float
|
||||
bias_y: float
|
||||
cep50: float
|
||||
cep95: float
|
||||
max_error: float
|
||||
rmse: float
|
||||
|
||||
|
||||
def compute_stats(
|
||||
xs: np.ndarray,
|
||||
ys: np.ndarray,
|
||||
truth_x: float,
|
||||
truth_y: float,
|
||||
) -> AccuracyStats:
|
||||
"""
|
||||
Compute accuracy statistics from position samples.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
xs, ys : 1-D arrays of measured x / y positions (metres)
|
||||
truth_x/y : known ground-truth position (metres)
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError if fewer than 2 samples or mismatched lengths.
|
||||
"""
|
||||
xs = np.asarray(xs, dtype=float)
|
||||
ys = np.asarray(ys, dtype=float)
|
||||
n = len(xs)
|
||||
if n < 2:
|
||||
raise ValueError(f"Need at least 2 samples, got {n}")
|
||||
if len(ys) != n:
|
||||
raise ValueError("xs and ys must have the same length")
|
||||
|
||||
errors = np.sqrt((xs - truth_x) ** 2 + (ys - truth_y) ** 2)
|
||||
|
||||
mean_x = float(np.mean(xs))
|
||||
mean_y = float(np.mean(ys))
|
||||
std_x = float(np.std(xs, ddof=1))
|
||||
std_y = float(np.std(ys, ddof=1))
|
||||
|
||||
return AccuracyStats(
|
||||
n_samples = n,
|
||||
mean_x = mean_x,
|
||||
mean_y = mean_y,
|
||||
std_x = std_x,
|
||||
std_y = std_y,
|
||||
std_2d = math.sqrt(std_x ** 2 + std_y ** 2),
|
||||
bias_x = mean_x - truth_x,
|
||||
bias_y = mean_y - truth_y,
|
||||
cep50 = float(np.percentile(errors, 50)),
|
||||
cep95 = float(np.percentile(errors, 95)),
|
||||
max_error = float(np.max(errors)),
|
||||
rmse = float(np.sqrt(np.mean(errors ** 2))),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RangeAccum:
|
||||
"""Running accumulator for per-anchor range statistics."""
|
||||
anchor_id: int
|
||||
_samples: list = field(default_factory=list, repr=False)
|
||||
|
||||
def add(self, range_m: float) -> None:
|
||||
self._samples.append(range_m)
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
return len(self._samples)
|
||||
|
||||
@property
|
||||
def mean(self) -> float:
|
||||
return float(np.mean(self._samples)) if self._samples else 0.0
|
||||
|
||||
@property
|
||||
def std(self) -> float:
|
||||
return float(np.std(self._samples, ddof=1)) if len(self._samples) >= 2 else 0.0
|
||||
@ -0,0 +1,409 @@
|
||||
"""
|
||||
logger_node.py — UWB position logger and accuracy analyzer (Issue #634)
|
||||
|
||||
Subscriptions
|
||||
─────────────
|
||||
/saltybot/pose/fused geometry_msgs/PoseStamped — EKF-fused pose (200 Hz)
|
||||
/saltybot/uwb/pose geometry_msgs/PoseStamped — raw UWB pose (10 Hz)
|
||||
/uwb/ranges saltybot_uwb_msgs/UwbRangeArray — per-anchor ranges
|
||||
|
||||
Service
|
||||
───────
|
||||
/saltybot/uwb/start_accuracy_test StartAccuracyTest
|
||||
Collects N fused-pose samples at known position, computes accuracy
|
||||
metrics, publishes AccuracyReport, writes JSON summary.
|
||||
|
||||
Publishes
|
||||
─────────
|
||||
/saltybot/uwb/accuracy_report AccuracyReport
|
||||
|
||||
CSV logging (continuous, to log_dir)
|
||||
─────────────────────────────────────
|
||||
fused_pose_<DATE>.csv — timestamp_ns, x_m, y_m, heading_rad
|
||||
uwb_pose_<DATE>.csv — timestamp_ns, x_m, y_m
|
||||
uwb_ranges_<DATE>.csv — timestamp_ns, anchor_id, range_m, raw_mm, rssi, tag_id
|
||||
accuracy_<test_id>.json — accuracy test results
|
||||
|
||||
Parameters
|
||||
──────────
|
||||
log_dir ~/uwb_logs
|
||||
enable_csv true
|
||||
default_n_samples 200
|
||||
default_timeout_s 30.0
|
||||
flush_interval_s 5.0
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
from saltybot_uwb_msgs.msg import UwbRangeArray
|
||||
from saltybot_uwb_logger_msgs.msg import AccuracyReport
|
||||
from saltybot_uwb_logger_msgs.srv import StartAccuracyTest
|
||||
|
||||
from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
|
||||
|
||||
class UwbLoggerNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("uwb_logger")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────
|
||||
self.declare_parameter("log_dir", os.path.expanduser("~/uwb_logs"))
|
||||
self.declare_parameter("enable_csv", True)
|
||||
self.declare_parameter("default_n_samples", 200)
|
||||
self.declare_parameter("default_timeout_s", 30.0)
|
||||
self.declare_parameter("flush_interval_s", 5.0)
|
||||
|
||||
self._log_dir = Path(self.get_parameter("log_dir").value)
|
||||
self._enable_csv = self.get_parameter("enable_csv").value
|
||||
self._default_n = self.get_parameter("default_n_samples").value
|
||||
self._default_to = self.get_parameter("default_timeout_s").value
|
||||
self._flush_iv = self.get_parameter("flush_interval_s").value
|
||||
|
||||
if self._enable_csv:
|
||||
self._log_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# ── CSV writers ───────────────────────────────────────────────────
|
||||
date_tag = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||
self._csv_lock = threading.Lock()
|
||||
self._csv_files = []
|
||||
self._fused_csv = self._open_csv(
|
||||
f"fused_pose_{date_tag}.csv",
|
||||
["timestamp_ns", "x_m", "y_m", "heading_rad"])
|
||||
self._uwb_csv = self._open_csv(
|
||||
f"uwb_pose_{date_tag}.csv",
|
||||
["timestamp_ns", "x_m", "y_m"])
|
||||
self._ranges_csv = self._open_csv(
|
||||
f"uwb_ranges_{date_tag}.csv",
|
||||
["timestamp_ns", "anchor_id", "range_m", "raw_mm", "rssi", "tag_id"])
|
||||
|
||||
# ── Accuracy test state ────────────────────────────────────────────
|
||||
self._test_lock = threading.Lock()
|
||||
self._test_active = False
|
||||
self._test_id = ""
|
||||
self._test_truth_x = 0.0
|
||||
self._test_truth_y = 0.0
|
||||
self._test_n_target = 0
|
||||
self._test_xs: list = []
|
||||
self._test_ys: list = []
|
||||
self._test_ranges: dict[int, RangeAccum] = {}
|
||||
self._test_t0 = 0.0
|
||||
self._test_timeout = 0.0
|
||||
self._test_done_event = threading.Event()
|
||||
|
||||
# ── Publisher ─────────────────────────────────────────────────────
|
||||
self._report_pub = self.create_publisher(
|
||||
AccuracyReport, "/saltybot/uwb/accuracy_report", 10
|
||||
)
|
||||
|
||||
# ── Subscriptions ─────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
PoseStamped, "/saltybot/pose/fused",
|
||||
self._fused_cb, _SENSOR_QOS)
|
||||
self.create_subscription(
|
||||
PoseStamped, "/saltybot/uwb/pose",
|
||||
self._uwb_pose_cb, _SENSOR_QOS)
|
||||
self.create_subscription(
|
||||
UwbRangeArray, "/uwb/ranges",
|
||||
self._ranges_cb, _SENSOR_QOS)
|
||||
|
||||
# ── Service ───────────────────────────────────────────────────────
|
||||
self._srv = self.create_service(
|
||||
StartAccuracyTest,
|
||||
"/saltybot/uwb/start_accuracy_test",
|
||||
self._handle_start_test)
|
||||
|
||||
# ── Periodic flush ────────────────────────────────────────────────
|
||||
self.create_timer(self._flush_iv, self._flush_csv)
|
||||
|
||||
self.get_logger().info(
|
||||
f"UWB logger ready — log_dir={self._log_dir} csv={self._enable_csv}"
|
||||
)
|
||||
|
||||
def destroy_node(self) -> None:
|
||||
self._flush_csv()
|
||||
for fh in self._csv_files:
|
||||
try:
|
||||
fh.close()
|
||||
except Exception:
|
||||
pass
|
||||
super().destroy_node()
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────
|
||||
|
||||
def _fused_cb(self, msg: PoseStamped) -> None:
|
||||
ts = _stamp_ns(msg.header.stamp)
|
||||
x = msg.pose.position.x
|
||||
y = msg.pose.position.y
|
||||
q = msg.pose.orientation
|
||||
heading = 2.0 * math.atan2(float(q.z), float(q.w))
|
||||
|
||||
if self._enable_csv:
|
||||
with self._csv_lock:
|
||||
if self._fused_csv:
|
||||
self._fused_csv.writerow([ts, x, y, heading])
|
||||
|
||||
with self._test_lock:
|
||||
if self._test_active:
|
||||
self._test_xs.append(x)
|
||||
self._test_ys.append(y)
|
||||
if len(self._test_xs) >= self._test_n_target:
|
||||
self._test_active = False
|
||||
self._test_done_event.set()
|
||||
|
||||
def _uwb_pose_cb(self, msg: PoseStamped) -> None:
|
||||
ts = _stamp_ns(msg.header.stamp)
|
||||
x = msg.pose.position.x
|
||||
y = msg.pose.position.y
|
||||
if self._enable_csv:
|
||||
with self._csv_lock:
|
||||
if self._uwb_csv:
|
||||
self._uwb_csv.writerow([ts, x, y])
|
||||
|
||||
def _ranges_cb(self, msg: UwbRangeArray) -> None:
|
||||
ts = _stamp_ns(msg.header.stamp)
|
||||
for r in msg.ranges:
|
||||
if self._enable_csv:
|
||||
with self._csv_lock:
|
||||
if self._ranges_csv:
|
||||
self._ranges_csv.writerow([
|
||||
ts, r.anchor_id, r.range_m,
|
||||
r.raw_mm, r.rssi, r.tag_id,
|
||||
])
|
||||
with self._test_lock:
|
||||
if self._test_active:
|
||||
aid = int(r.anchor_id)
|
||||
if aid not in self._test_ranges:
|
||||
self._test_ranges[aid] = RangeAccum(anchor_id=aid)
|
||||
self._test_ranges[aid].add(float(r.range_m))
|
||||
|
||||
# ── Service ────────────────────────────────────────────────────────────
|
||||
|
||||
def _handle_start_test(
|
||||
self,
|
||||
request: StartAccuracyTest.Request,
|
||||
response: StartAccuracyTest.Response,
|
||||
) -> StartAccuracyTest.Response:
|
||||
|
||||
with self._test_lock:
|
||||
if self._test_active:
|
||||
response.success = False
|
||||
response.message = "Another test is already running"
|
||||
response.test_id = self._test_id
|
||||
return response
|
||||
|
||||
n = int(request.n_samples) if request.n_samples > 0 else self._default_n
|
||||
timeout = float(request.timeout_s) if request.timeout_s > 0.0 else self._default_to
|
||||
test_id = (
|
||||
request.test_id.strip()
|
||||
if request.test_id.strip()
|
||||
else datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Accuracy test '{test_id}' — "
|
||||
f"truth=({request.truth_x_m:.3f}, {request.truth_y_m:.3f}) "
|
||||
f"n={n} timeout={timeout}s"
|
||||
)
|
||||
|
||||
with self._test_lock:
|
||||
self._test_id = test_id
|
||||
self._test_truth_x = float(request.truth_x_m)
|
||||
self._test_truth_y = float(request.truth_y_m)
|
||||
self._test_n_target = n
|
||||
self._test_xs = []
|
||||
self._test_ys = []
|
||||
self._test_ranges = {}
|
||||
self._test_t0 = time.monotonic()
|
||||
self._test_timeout = timeout
|
||||
self._test_done_event.clear()
|
||||
self._test_active = True
|
||||
|
||||
threading.Thread(
|
||||
target=self._run_test,
|
||||
args=(test_id, request.truth_x_m, request.truth_y_m, n, timeout),
|
||||
daemon=True,
|
||||
).start()
|
||||
|
||||
response.success = True
|
||||
response.message = f"Test '{test_id}' started, collecting {n} samples"
|
||||
response.test_id = test_id
|
||||
return response
|
||||
|
||||
def _run_test(
|
||||
self,
|
||||
test_id: str,
|
||||
truth_x: float,
|
||||
truth_y: float,
|
||||
n_target: int,
|
||||
timeout: float,
|
||||
) -> None:
|
||||
finished = self._test_done_event.wait(timeout=timeout)
|
||||
|
||||
with self._test_lock:
|
||||
self._test_active = False
|
||||
xs = list(self._test_xs)
|
||||
ys = list(self._test_ys)
|
||||
ranges_snap = dict(self._test_ranges)
|
||||
t0 = self._test_t0
|
||||
|
||||
duration = time.monotonic() - t0
|
||||
n_got = len(xs)
|
||||
|
||||
if n_got < 2:
|
||||
self.get_logger().error(
|
||||
f"Test '{test_id}' failed: only {n_got} samples in {duration:.1f}s"
|
||||
)
|
||||
return
|
||||
|
||||
if not finished:
|
||||
self.get_logger().warn(
|
||||
f"Test '{test_id}' timed out — {n_got}/{n_target} samples"
|
||||
)
|
||||
|
||||
try:
|
||||
stats = compute_stats(np.array(xs), np.array(ys), truth_x, truth_y)
|
||||
except ValueError as exc:
|
||||
self.get_logger().error(f"Stats failed: {exc}")
|
||||
return
|
||||
|
||||
self.get_logger().info(
|
||||
f"Test '{test_id}' — n={stats.n_samples} "
|
||||
f"CEP50={stats.cep50*100:.1f}cm CEP95={stats.cep95*100:.1f}cm "
|
||||
f"RMSE={stats.rmse*100:.1f}cm max={stats.max_error*100:.1f}cm"
|
||||
)
|
||||
|
||||
# Publish report
|
||||
report = AccuracyReport()
|
||||
report.header.stamp = self.get_clock().now().to_msg()
|
||||
report.header.frame_id = "map"
|
||||
report.truth_x_m = truth_x
|
||||
report.truth_y_m = truth_y
|
||||
report.n_samples = stats.n_samples
|
||||
report.mean_x_m = stats.mean_x
|
||||
report.mean_y_m = stats.mean_y
|
||||
report.std_x_m = stats.std_x
|
||||
report.std_y_m = stats.std_y
|
||||
report.std_2d_m = stats.std_2d
|
||||
report.bias_x_m = stats.bias_x
|
||||
report.bias_y_m = stats.bias_y
|
||||
report.cep50_m = stats.cep50
|
||||
report.cep95_m = stats.cep95
|
||||
report.max_error_m = stats.max_error
|
||||
report.rmse_m = stats.rmse
|
||||
report.test_id = test_id
|
||||
report.duration_s = duration
|
||||
|
||||
anchor_ids = sorted(ranges_snap.keys())
|
||||
report.anchor_ids = [int(a) for a in anchor_ids]
|
||||
report.anchor_range_mean_m = [ranges_snap[a].mean for a in anchor_ids]
|
||||
report.anchor_range_std_m = [ranges_snap[a].std for a in anchor_ids]
|
||||
|
||||
self._report_pub.publish(report)
|
||||
|
||||
# Write JSON
|
||||
if self._enable_csv:
|
||||
self._write_json(test_id, truth_x, truth_y, stats,
|
||||
anchor_ids, ranges_snap, duration)
|
||||
|
||||
# ── CSV / JSON helpers ─────────────────────────────────────────────────
|
||||
|
||||
def _open_csv(
|
||||
self, filename: str, headers: list
|
||||
) -> Optional[csv.writer]:
|
||||
if not self._enable_csv:
|
||||
return None
|
||||
path = self._log_dir / filename
|
||||
fh = open(path, "w", newline="", buffering=1)
|
||||
writer = csv.writer(fh)
|
||||
writer.writerow(headers)
|
||||
self._csv_files.append(fh)
|
||||
return writer
|
||||
|
||||
def _flush_csv(self) -> None:
|
||||
for fh in self._csv_files:
|
||||
try:
|
||||
fh.flush()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _write_json(self, test_id, truth_x, truth_y, stats,
|
||||
anchor_ids, ranges_snap, duration) -> None:
|
||||
path = self._log_dir / f"accuracy_{test_id}.json"
|
||||
data = {
|
||||
"test_id": test_id,
|
||||
"truth": {"x_m": truth_x, "y_m": truth_y},
|
||||
"n_samples": stats.n_samples,
|
||||
"duration_s": round(duration, 3),
|
||||
"mean": {"x_m": round(stats.mean_x, 4),
|
||||
"y_m": round(stats.mean_y, 4)},
|
||||
"bias": {"x_m": round(stats.bias_x, 4),
|
||||
"y_m": round(stats.bias_y, 4)},
|
||||
"std": {"x_m": round(stats.std_x, 4),
|
||||
"y_m": round(stats.std_y, 4),
|
||||
"2d_m": round(stats.std_2d, 4)},
|
||||
"cep50_m": round(stats.cep50, 4),
|
||||
"cep95_m": round(stats.cep95, 4),
|
||||
"rmse_m": round(stats.rmse, 4),
|
||||
"max_error_m": round(stats.max_error, 4),
|
||||
"anchors": {
|
||||
str(aid): {
|
||||
"range_mean_m": round(ranges_snap[aid].mean, 4),
|
||||
"range_std_m": round(ranges_snap[aid].std, 4),
|
||||
"n": ranges_snap[aid].count,
|
||||
}
|
||||
for aid in anchor_ids
|
||||
},
|
||||
}
|
||||
try:
|
||||
with open(path, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
self.get_logger().info(f"Report written: {path}")
|
||||
except OSError as exc:
|
||||
self.get_logger().error(f"JSON write failed: {exc}")
|
||||
|
||||
|
||||
# ── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
def _stamp_ns(stamp) -> int:
|
||||
return stamp.sec * 1_000_000_000 + stamp.nanosec
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = UwbLoggerNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_uwb_logger/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_uwb_logger/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_uwb_logger
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_uwb_logger
|
||||
29
jetson/ros2_ws/src/saltybot_uwb_logger/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_uwb_logger/setup.py
Normal file
@ -0,0 +1,29 @@
|
||||
import os
|
||||
from glob import glob
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_uwb_logger"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages",
|
||||
[f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
|
||||
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-uwb",
|
||||
maintainer_email="sl-uwb@saltylab.local",
|
||||
description="UWB position logger and accuracy analyzer (Issue #634)",
|
||||
license="Apache-2.0",
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"uwb_logger = saltybot_uwb_logger.logger_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
Binary file not shown.
@ -0,0 +1,116 @@
|
||||
"""Unit tests for saltybot_uwb_logger.accuracy_stats (Issue #634). No ROS2 needed."""
|
||||
|
||||
import math
|
||||
import sys
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum
|
||||
|
||||
|
||||
def _circle(n, r, cx=0.0, cy=0.0):
|
||||
a = np.linspace(0, 2 * math.pi, n, endpoint=False)
|
||||
return cx + r * np.cos(a), cy + r * np.sin(a)
|
||||
|
||||
|
||||
class TestComputeStats:
|
||||
def test_zero_error(self):
|
||||
xs = np.full(5, 2.0); ys = np.full(5, 3.0)
|
||||
s = compute_stats(xs, ys, 2.0, 3.0)
|
||||
assert abs(s.cep50) < 1e-9
|
||||
assert abs(s.rmse) < 1e-9
|
||||
assert abs(s.max_error) < 1e-9
|
||||
|
||||
def test_known_bias(self):
|
||||
rng = np.random.default_rng(0)
|
||||
xs = rng.normal(4.0, 0.01, 100); ys = rng.normal(2.0, 0.01, 100)
|
||||
s = compute_stats(xs, ys, 3.0, 2.0)
|
||||
assert abs(s.bias_x - 1.0) < 0.05
|
||||
assert abs(s.bias_y) < 0.05
|
||||
|
||||
def test_cep50_below_cep95(self):
|
||||
rng = np.random.default_rng(1)
|
||||
xs = rng.normal(0, 0.1, 500); ys = rng.normal(0, 0.1, 500)
|
||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||
assert s.cep50 < s.cep95
|
||||
|
||||
def test_cep50_is_median(self):
|
||||
rng = np.random.default_rng(2)
|
||||
xs = rng.normal(0, 0.15, 1000); ys = rng.normal(0, 0.15, 1000)
|
||||
errors = np.sqrt(xs**2 + ys**2)
|
||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||
assert abs(s.cep50 - float(np.median(errors))) < 1e-9
|
||||
|
||||
def test_rmse(self):
|
||||
xs = np.array([0.1, -0.1, 0.2, -0.2]); ys = np.zeros(4)
|
||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||
assert abs(s.rmse - math.sqrt(np.mean(xs**2))) < 1e-9
|
||||
|
||||
def test_max_error(self):
|
||||
xs = np.array([0.0, 0.0, 1.0]); ys = np.zeros(3)
|
||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||
assert abs(s.max_error - 1.0) < 1e-9
|
||||
|
||||
def test_circle_symmetry(self):
|
||||
xs, ys = _circle(360, 0.12)
|
||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||
assert abs(s.bias_x) < 1e-6
|
||||
assert abs(s.cep50 - 0.12) < 1e-6
|
||||
|
||||
def test_std_2d_pythagorean(self):
|
||||
rng = np.random.default_rng(3)
|
||||
xs = rng.normal(0, 0.1, 200); ys = rng.normal(0, 0.2, 200)
|
||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||
assert abs(s.std_2d - math.sqrt(s.std_x**2 + s.std_y**2)) < 1e-9
|
||||
|
||||
def test_n_samples(self):
|
||||
xs = np.ones(42); ys = np.zeros(42)
|
||||
s = compute_stats(xs, ys, 0.0, 0.0)
|
||||
assert s.n_samples == 42
|
||||
|
||||
def test_too_few_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
compute_stats(np.array([1.0]), np.array([1.0]), 0.0, 0.0)
|
||||
|
||||
def test_length_mismatch_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
compute_stats(np.array([1.0, 2.0]), np.array([1.0]), 0.0, 0.0)
|
||||
|
||||
|
||||
class TestRealistic:
|
||||
def test_10cm_cep50(self):
|
||||
rng = np.random.default_rng(42)
|
||||
xs = rng.normal(3.0, 0.07, 500); ys = rng.normal(2.0, 0.07, 500)
|
||||
s = compute_stats(xs, ys, 3.0, 2.0)
|
||||
assert 0.05 < s.cep50 < 0.20
|
||||
|
||||
def test_biased_uwb(self):
|
||||
rng = np.random.default_rng(7)
|
||||
xs = rng.normal(3.15, 0.05, 300); ys = rng.normal(2.0, 0.05, 300)
|
||||
s = compute_stats(xs, ys, 3.0, 2.0)
|
||||
assert abs(s.bias_x - 0.15) < 0.02
|
||||
|
||||
|
||||
class TestRangeAccum:
|
||||
def test_empty(self):
|
||||
a = RangeAccum(0)
|
||||
assert a.mean == 0.0 and a.std == 0.0 and a.count == 0
|
||||
|
||||
def test_single_std_zero(self):
|
||||
a = RangeAccum(0); a.add(2.0)
|
||||
assert a.std == 0.0
|
||||
|
||||
def test_mean_std(self):
|
||||
a = RangeAccum(0)
|
||||
for v in [1.0, 2.0, 3.0, 4.0, 5.0]:
|
||||
a.add(v)
|
||||
assert abs(a.mean - 3.0) < 1e-9
|
||||
assert abs(a.std - float(np.std([1,2,3,4,5], ddof=1))) < 1e-9
|
||||
assert a.count == 5
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
15
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/CMakeLists.txt
Normal file
15
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,15 @@
|
||||
cmake_minimum_required(VERSION 3.8)
|
||||
project(saltybot_uwb_logger_msgs)
|
||||
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(std_msgs REQUIRED)
|
||||
find_package(rosidl_default_generators REQUIRED)
|
||||
|
||||
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/AccuracyReport.msg"
|
||||
"srv/StartAccuracyTest.srv"
|
||||
DEPENDENCIES std_msgs
|
||||
)
|
||||
|
||||
ament_export_dependencies(rosidl_default_runtime)
|
||||
ament_package()
|
||||
@ -0,0 +1,32 @@
|
||||
# AccuracyReport.msg — UWB positioning accuracy test results (Issue #634)
|
||||
#
|
||||
# Published on /saltybot/uwb/accuracy_report after StartAccuracyTest completes.
|
||||
|
||||
std_msgs/Header header
|
||||
|
||||
float64 truth_x_m
|
||||
float64 truth_y_m
|
||||
|
||||
uint32 n_samples
|
||||
|
||||
float64 mean_x_m
|
||||
float64 mean_y_m
|
||||
|
||||
float64 std_x_m
|
||||
float64 std_y_m
|
||||
float64 std_2d_m
|
||||
|
||||
float64 bias_x_m
|
||||
float64 bias_y_m
|
||||
|
||||
float64 cep50_m
|
||||
float64 cep95_m
|
||||
float64 max_error_m
|
||||
float64 rmse_m
|
||||
|
||||
uint8[] anchor_ids
|
||||
float64[] anchor_range_mean_m
|
||||
float64[] anchor_range_std_m
|
||||
|
||||
string test_id
|
||||
float64 duration_s
|
||||
21
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/package.xml
Normal file
21
jetson/ros2_ws/src/saltybot_uwb_logger_msgs/package.xml
Normal file
@ -0,0 +1,21 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_uwb_logger_msgs</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Message and service definitions for UWB logger (Issue #634).</description>
|
||||
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
|
||||
<license>Apache-2.0</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||
|
||||
<depend>std_msgs</depend>
|
||||
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||
|
||||
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,15 @@
|
||||
# StartAccuracyTest.srv — trigger UWB accuracy test at a known position (Issue #634)
|
||||
|
||||
# ── Request ────────────────────────────────────────────────────────────────
|
||||
float64 truth_x_m
|
||||
float64 truth_y_m
|
||||
uint32 n_samples
|
||||
float64 timeout_s
|
||||
string test_id
|
||||
|
||||
---
|
||||
|
||||
# ── Response ───────────────────────────────────────────────────────────────
|
||||
bool success
|
||||
string message
|
||||
string test_id
|
||||
Loading…
x
Reference in New Issue
Block a user