diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/config/uwb_logger_params.yaml b/jetson/ros2_ws/src/saltybot_uwb_logger/config/uwb_logger_params.yaml new file mode 100644 index 0000000..552dcc1 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/config/uwb_logger_params.yaml @@ -0,0 +1,7 @@ +uwb_logger: + ros__parameters: + log_dir: ~/uwb_logs + enable_csv: true + default_n_samples: 200 + default_timeout_s: 30.0 + flush_interval_s: 5.0 diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/launch/uwb_logger.launch.py b/jetson/ros2_ws/src/saltybot_uwb_logger/launch/uwb_logger.launch.py new file mode 100644 index 0000000..72dd954 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/launch/uwb_logger.launch.py @@ -0,0 +1,28 @@ +"""uwb_logger.launch.py — Launch UWB position logger (Issue #634).""" + +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node +from ament_index_python.packages import get_package_share_directory +import os + + +def generate_launch_description() -> LaunchDescription: + pkg_dir = get_package_share_directory("saltybot_uwb_logger") + params_file = os.path.join(pkg_dir, "config", "uwb_logger_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("log_dir", default_value="~/uwb_logs"), + Node( + package="saltybot_uwb_logger", + executable="uwb_logger", + name="uwb_logger", + parameters=[ + params_file, + {"log_dir": LaunchConfiguration("log_dir")}, + ], + output="screen", + emulate_tty=True, + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/package.xml b/jetson/ros2_ws/src/saltybot_uwb_logger/package.xml new file mode 100644 index 0000000..eb8b9e2 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_uwb_logger + 0.1.0 + + UWB position logger and accuracy analyzer (Issue #634). + Logs fused pose, raw UWB pose, per-anchor ranges to CSV. + Provides /saltybot/uwb/start_accuracy_test: collects N samples at a + known position and computes CEP50, CEP95, RMSE, max error. + + sl-uwb + Apache-2.0 + + rclpy + geometry_msgs + std_msgs + saltybot_uwb_msgs + saltybot_uwb_logger_msgs + + python3-numpy + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/resource/saltybot_uwb_logger b/jetson/ros2_ws/src/saltybot_uwb_logger/resource/saltybot_uwb_logger new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__init__.py b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__pycache__/__init__.cpython-314.pyc b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..d40fd85 Binary files /dev/null and b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__pycache__/__init__.cpython-314.pyc differ diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__pycache__/accuracy_stats.cpython-314.pyc b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__pycache__/accuracy_stats.cpython-314.pyc new file mode 100644 index 0000000..02dec32 Binary files /dev/null and b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/__pycache__/accuracy_stats.cpython-314.pyc differ diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/accuracy_stats.py b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/accuracy_stats.py new file mode 100644 index 0000000..50d272a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/accuracy_stats.py @@ -0,0 +1,99 @@ +""" +accuracy_stats.py — UWB positioning accuracy statistics (Issue #634) + +Pure numpy, no ROS2 dependency. +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass, field + +import numpy as np + + +@dataclass +class AccuracyStats: + n_samples: int + mean_x: float + mean_y: float + std_x: float + std_y: float + std_2d: float + bias_x: float + bias_y: float + cep50: float + cep95: float + max_error: float + rmse: float + + +def compute_stats( + xs: np.ndarray, + ys: np.ndarray, + truth_x: float, + truth_y: float, +) -> AccuracyStats: + """ + Compute accuracy statistics from position samples. + + Parameters + ---------- + xs, ys : 1-D arrays of measured x / y positions (metres) + truth_x/y : known ground-truth position (metres) + + Raises + ------ + ValueError if fewer than 2 samples or mismatched lengths. + """ + xs = np.asarray(xs, dtype=float) + ys = np.asarray(ys, dtype=float) + n = len(xs) + if n < 2: + raise ValueError(f"Need at least 2 samples, got {n}") + if len(ys) != n: + raise ValueError("xs and ys must have the same length") + + errors = np.sqrt((xs - truth_x) ** 2 + (ys - truth_y) ** 2) + + mean_x = float(np.mean(xs)) + mean_y = float(np.mean(ys)) + std_x = float(np.std(xs, ddof=1)) + std_y = float(np.std(ys, ddof=1)) + + return AccuracyStats( + n_samples = n, + mean_x = mean_x, + mean_y = mean_y, + std_x = std_x, + std_y = std_y, + std_2d = math.sqrt(std_x ** 2 + std_y ** 2), + bias_x = mean_x - truth_x, + bias_y = mean_y - truth_y, + cep50 = float(np.percentile(errors, 50)), + cep95 = float(np.percentile(errors, 95)), + max_error = float(np.max(errors)), + rmse = float(np.sqrt(np.mean(errors ** 2))), + ) + + +@dataclass +class RangeAccum: + """Running accumulator for per-anchor range statistics.""" + anchor_id: int + _samples: list = field(default_factory=list, repr=False) + + def add(self, range_m: float) -> None: + self._samples.append(range_m) + + @property + def count(self) -> int: + return len(self._samples) + + @property + def mean(self) -> float: + return float(np.mean(self._samples)) if self._samples else 0.0 + + @property + def std(self) -> float: + return float(np.std(self._samples, ddof=1)) if len(self._samples) >= 2 else 0.0 diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/logger_node.py b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/logger_node.py new file mode 100644 index 0000000..1101d2f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/saltybot_uwb_logger/logger_node.py @@ -0,0 +1,409 @@ +""" +logger_node.py — UWB position logger and accuracy analyzer (Issue #634) + +Subscriptions +───────────── + /saltybot/pose/fused geometry_msgs/PoseStamped — EKF-fused pose (200 Hz) + /saltybot/uwb/pose geometry_msgs/PoseStamped — raw UWB pose (10 Hz) + /uwb/ranges saltybot_uwb_msgs/UwbRangeArray — per-anchor ranges + +Service +─────── + /saltybot/uwb/start_accuracy_test StartAccuracyTest + Collects N fused-pose samples at known position, computes accuracy + metrics, publishes AccuracyReport, writes JSON summary. + +Publishes +───────── + /saltybot/uwb/accuracy_report AccuracyReport + +CSV logging (continuous, to log_dir) +───────────────────────────────────── + fused_pose_.csv — timestamp_ns, x_m, y_m, heading_rad + uwb_pose_.csv — timestamp_ns, x_m, y_m + uwb_ranges_.csv — timestamp_ns, anchor_id, range_m, raw_mm, rssi, tag_id + accuracy_.json — accuracy test results + +Parameters +────────── + log_dir ~/uwb_logs + enable_csv true + default_n_samples 200 + default_timeout_s 30.0 + flush_interval_s 5.0 +""" + +from __future__ import annotations + +import csv +import json +import math +import os +import threading +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +import numpy as np + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from geometry_msgs.msg import PoseStamped +from saltybot_uwb_msgs.msg import UwbRangeArray +from saltybot_uwb_logger_msgs.msg import AccuracyReport +from saltybot_uwb_logger_msgs.srv import StartAccuracyTest + +from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=10, +) + + +class UwbLoggerNode(Node): + + def __init__(self) -> None: + super().__init__("uwb_logger") + + # ── Parameters ──────────────────────────────────────────────────── + self.declare_parameter("log_dir", os.path.expanduser("~/uwb_logs")) + self.declare_parameter("enable_csv", True) + self.declare_parameter("default_n_samples", 200) + self.declare_parameter("default_timeout_s", 30.0) + self.declare_parameter("flush_interval_s", 5.0) + + self._log_dir = Path(self.get_parameter("log_dir").value) + self._enable_csv = self.get_parameter("enable_csv").value + self._default_n = self.get_parameter("default_n_samples").value + self._default_to = self.get_parameter("default_timeout_s").value + self._flush_iv = self.get_parameter("flush_interval_s").value + + if self._enable_csv: + self._log_dir.mkdir(parents=True, exist_ok=True) + + # ── CSV writers ─────────────────────────────────────────────────── + date_tag = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + self._csv_lock = threading.Lock() + self._csv_files = [] + self._fused_csv = self._open_csv( + f"fused_pose_{date_tag}.csv", + ["timestamp_ns", "x_m", "y_m", "heading_rad"]) + self._uwb_csv = self._open_csv( + f"uwb_pose_{date_tag}.csv", + ["timestamp_ns", "x_m", "y_m"]) + self._ranges_csv = self._open_csv( + f"uwb_ranges_{date_tag}.csv", + ["timestamp_ns", "anchor_id", "range_m", "raw_mm", "rssi", "tag_id"]) + + # ── Accuracy test state ──────────────────────────────────────────── + self._test_lock = threading.Lock() + self._test_active = False + self._test_id = "" + self._test_truth_x = 0.0 + self._test_truth_y = 0.0 + self._test_n_target = 0 + self._test_xs: list = [] + self._test_ys: list = [] + self._test_ranges: dict[int, RangeAccum] = {} + self._test_t0 = 0.0 + self._test_timeout = 0.0 + self._test_done_event = threading.Event() + + # ── Publisher ───────────────────────────────────────────────────── + self._report_pub = self.create_publisher( + AccuracyReport, "/saltybot/uwb/accuracy_report", 10 + ) + + # ── Subscriptions ───────────────────────────────────────────────── + self.create_subscription( + PoseStamped, "/saltybot/pose/fused", + self._fused_cb, _SENSOR_QOS) + self.create_subscription( + PoseStamped, "/saltybot/uwb/pose", + self._uwb_pose_cb, _SENSOR_QOS) + self.create_subscription( + UwbRangeArray, "/uwb/ranges", + self._ranges_cb, _SENSOR_QOS) + + # ── Service ─────────────────────────────────────────────────────── + self._srv = self.create_service( + StartAccuracyTest, + "/saltybot/uwb/start_accuracy_test", + self._handle_start_test) + + # ── Periodic flush ──────────────────────────────────────────────── + self.create_timer(self._flush_iv, self._flush_csv) + + self.get_logger().info( + f"UWB logger ready — log_dir={self._log_dir} csv={self._enable_csv}" + ) + + def destroy_node(self) -> None: + self._flush_csv() + for fh in self._csv_files: + try: + fh.close() + except Exception: + pass + super().destroy_node() + + # ── Callbacks ───────────────────────────────────────────────────────── + + def _fused_cb(self, msg: PoseStamped) -> None: + ts = _stamp_ns(msg.header.stamp) + x = msg.pose.position.x + y = msg.pose.position.y + q = msg.pose.orientation + heading = 2.0 * math.atan2(float(q.z), float(q.w)) + + if self._enable_csv: + with self._csv_lock: + if self._fused_csv: + self._fused_csv.writerow([ts, x, y, heading]) + + with self._test_lock: + if self._test_active: + self._test_xs.append(x) + self._test_ys.append(y) + if len(self._test_xs) >= self._test_n_target: + self._test_active = False + self._test_done_event.set() + + def _uwb_pose_cb(self, msg: PoseStamped) -> None: + ts = _stamp_ns(msg.header.stamp) + x = msg.pose.position.x + y = msg.pose.position.y + if self._enable_csv: + with self._csv_lock: + if self._uwb_csv: + self._uwb_csv.writerow([ts, x, y]) + + def _ranges_cb(self, msg: UwbRangeArray) -> None: + ts = _stamp_ns(msg.header.stamp) + for r in msg.ranges: + if self._enable_csv: + with self._csv_lock: + if self._ranges_csv: + self._ranges_csv.writerow([ + ts, r.anchor_id, r.range_m, + r.raw_mm, r.rssi, r.tag_id, + ]) + with self._test_lock: + if self._test_active: + aid = int(r.anchor_id) + if aid not in self._test_ranges: + self._test_ranges[aid] = RangeAccum(anchor_id=aid) + self._test_ranges[aid].add(float(r.range_m)) + + # ── Service ──────────────────────────────────────────────────────────── + + def _handle_start_test( + self, + request: StartAccuracyTest.Request, + response: StartAccuracyTest.Response, + ) -> StartAccuracyTest.Response: + + with self._test_lock: + if self._test_active: + response.success = False + response.message = "Another test is already running" + response.test_id = self._test_id + return response + + n = int(request.n_samples) if request.n_samples > 0 else self._default_n + timeout = float(request.timeout_s) if request.timeout_s > 0.0 else self._default_to + test_id = ( + request.test_id.strip() + if request.test_id.strip() + else datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + ) + + self.get_logger().info( + f"Accuracy test '{test_id}' — " + f"truth=({request.truth_x_m:.3f}, {request.truth_y_m:.3f}) " + f"n={n} timeout={timeout}s" + ) + + with self._test_lock: + self._test_id = test_id + self._test_truth_x = float(request.truth_x_m) + self._test_truth_y = float(request.truth_y_m) + self._test_n_target = n + self._test_xs = [] + self._test_ys = [] + self._test_ranges = {} + self._test_t0 = time.monotonic() + self._test_timeout = timeout + self._test_done_event.clear() + self._test_active = True + + threading.Thread( + target=self._run_test, + args=(test_id, request.truth_x_m, request.truth_y_m, n, timeout), + daemon=True, + ).start() + + response.success = True + response.message = f"Test '{test_id}' started, collecting {n} samples" + response.test_id = test_id + return response + + def _run_test( + self, + test_id: str, + truth_x: float, + truth_y: float, + n_target: int, + timeout: float, + ) -> None: + finished = self._test_done_event.wait(timeout=timeout) + + with self._test_lock: + self._test_active = False + xs = list(self._test_xs) + ys = list(self._test_ys) + ranges_snap = dict(self._test_ranges) + t0 = self._test_t0 + + duration = time.monotonic() - t0 + n_got = len(xs) + + if n_got < 2: + self.get_logger().error( + f"Test '{test_id}' failed: only {n_got} samples in {duration:.1f}s" + ) + return + + if not finished: + self.get_logger().warn( + f"Test '{test_id}' timed out — {n_got}/{n_target} samples" + ) + + try: + stats = compute_stats(np.array(xs), np.array(ys), truth_x, truth_y) + except ValueError as exc: + self.get_logger().error(f"Stats failed: {exc}") + return + + self.get_logger().info( + f"Test '{test_id}' — n={stats.n_samples} " + f"CEP50={stats.cep50*100:.1f}cm CEP95={stats.cep95*100:.1f}cm " + f"RMSE={stats.rmse*100:.1f}cm max={stats.max_error*100:.1f}cm" + ) + + # Publish report + report = AccuracyReport() + report.header.stamp = self.get_clock().now().to_msg() + report.header.frame_id = "map" + report.truth_x_m = truth_x + report.truth_y_m = truth_y + report.n_samples = stats.n_samples + report.mean_x_m = stats.mean_x + report.mean_y_m = stats.mean_y + report.std_x_m = stats.std_x + report.std_y_m = stats.std_y + report.std_2d_m = stats.std_2d + report.bias_x_m = stats.bias_x + report.bias_y_m = stats.bias_y + report.cep50_m = stats.cep50 + report.cep95_m = stats.cep95 + report.max_error_m = stats.max_error + report.rmse_m = stats.rmse + report.test_id = test_id + report.duration_s = duration + + anchor_ids = sorted(ranges_snap.keys()) + report.anchor_ids = [int(a) for a in anchor_ids] + report.anchor_range_mean_m = [ranges_snap[a].mean for a in anchor_ids] + report.anchor_range_std_m = [ranges_snap[a].std for a in anchor_ids] + + self._report_pub.publish(report) + + # Write JSON + if self._enable_csv: + self._write_json(test_id, truth_x, truth_y, stats, + anchor_ids, ranges_snap, duration) + + # ── CSV / JSON helpers ───────────────────────────────────────────────── + + def _open_csv( + self, filename: str, headers: list + ) -> Optional[csv.writer]: + if not self._enable_csv: + return None + path = self._log_dir / filename + fh = open(path, "w", newline="", buffering=1) + writer = csv.writer(fh) + writer.writerow(headers) + self._csv_files.append(fh) + return writer + + def _flush_csv(self) -> None: + for fh in self._csv_files: + try: + fh.flush() + except Exception: + pass + + def _write_json(self, test_id, truth_x, truth_y, stats, + anchor_ids, ranges_snap, duration) -> None: + path = self._log_dir / f"accuracy_{test_id}.json" + data = { + "test_id": test_id, + "truth": {"x_m": truth_x, "y_m": truth_y}, + "n_samples": stats.n_samples, + "duration_s": round(duration, 3), + "mean": {"x_m": round(stats.mean_x, 4), + "y_m": round(stats.mean_y, 4)}, + "bias": {"x_m": round(stats.bias_x, 4), + "y_m": round(stats.bias_y, 4)}, + "std": {"x_m": round(stats.std_x, 4), + "y_m": round(stats.std_y, 4), + "2d_m": round(stats.std_2d, 4)}, + "cep50_m": round(stats.cep50, 4), + "cep95_m": round(stats.cep95, 4), + "rmse_m": round(stats.rmse, 4), + "max_error_m": round(stats.max_error, 4), + "anchors": { + str(aid): { + "range_mean_m": round(ranges_snap[aid].mean, 4), + "range_std_m": round(ranges_snap[aid].std, 4), + "n": ranges_snap[aid].count, + } + for aid in anchor_ids + }, + } + try: + with open(path, "w") as f: + json.dump(data, f, indent=2) + self.get_logger().info(f"Report written: {path}") + except OSError as exc: + self.get_logger().error(f"JSON write failed: {exc}") + + +# ── Helpers ──────────────────────────────────────────────────────────────── + +def _stamp_ns(stamp) -> int: + return stamp.sec * 1_000_000_000 + stamp.nanosec + + +def main(args=None) -> None: + rclpy.init(args=args) + node = UwbLoggerNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/setup.cfg b/jetson/ros2_ws/src/saltybot_uwb_logger/setup.cfg new file mode 100644 index 0000000..0b9e8c8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_uwb_logger +[install] +install_scripts=$base/lib/saltybot_uwb_logger diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/setup.py b/jetson/ros2_ws/src/saltybot_uwb_logger/setup.py new file mode 100644 index 0000000..0d654b7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/setup.py @@ -0,0 +1,29 @@ +import os +from glob import glob +from setuptools import setup + +package_name = "saltybot_uwb_logger" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", + [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (os.path.join("share", package_name, "launch"), glob("launch/*.py")), + (os.path.join("share", package_name, "config"), glob("config/*.yaml")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-uwb", + maintainer_email="sl-uwb@saltylab.local", + description="UWB position logger and accuracy analyzer (Issue #634)", + license="Apache-2.0", + entry_points={ + "console_scripts": [ + "uwb_logger = saltybot_uwb_logger.logger_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/test/__pycache__/test_accuracy_stats.cpython-314-pytest-9.0.2.pyc b/jetson/ros2_ws/src/saltybot_uwb_logger/test/__pycache__/test_accuracy_stats.cpython-314-pytest-9.0.2.pyc new file mode 100644 index 0000000..2444f41 Binary files /dev/null and b/jetson/ros2_ws/src/saltybot_uwb_logger/test/__pycache__/test_accuracy_stats.cpython-314-pytest-9.0.2.pyc differ diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger/test/test_accuracy_stats.py b/jetson/ros2_ws/src/saltybot_uwb_logger/test/test_accuracy_stats.py new file mode 100644 index 0000000..8d48d10 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger/test/test_accuracy_stats.py @@ -0,0 +1,116 @@ +"""Unit tests for saltybot_uwb_logger.accuracy_stats (Issue #634). No ROS2 needed.""" + +import math +import sys +import os + +import numpy as np +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from saltybot_uwb_logger.accuracy_stats import compute_stats, RangeAccum + + +def _circle(n, r, cx=0.0, cy=0.0): + a = np.linspace(0, 2 * math.pi, n, endpoint=False) + return cx + r * np.cos(a), cy + r * np.sin(a) + + +class TestComputeStats: + def test_zero_error(self): + xs = np.full(5, 2.0); ys = np.full(5, 3.0) + s = compute_stats(xs, ys, 2.0, 3.0) + assert abs(s.cep50) < 1e-9 + assert abs(s.rmse) < 1e-9 + assert abs(s.max_error) < 1e-9 + + def test_known_bias(self): + rng = np.random.default_rng(0) + xs = rng.normal(4.0, 0.01, 100); ys = rng.normal(2.0, 0.01, 100) + s = compute_stats(xs, ys, 3.0, 2.0) + assert abs(s.bias_x - 1.0) < 0.05 + assert abs(s.bias_y) < 0.05 + + def test_cep50_below_cep95(self): + rng = np.random.default_rng(1) + xs = rng.normal(0, 0.1, 500); ys = rng.normal(0, 0.1, 500) + s = compute_stats(xs, ys, 0.0, 0.0) + assert s.cep50 < s.cep95 + + def test_cep50_is_median(self): + rng = np.random.default_rng(2) + xs = rng.normal(0, 0.15, 1000); ys = rng.normal(0, 0.15, 1000) + errors = np.sqrt(xs**2 + ys**2) + s = compute_stats(xs, ys, 0.0, 0.0) + assert abs(s.cep50 - float(np.median(errors))) < 1e-9 + + def test_rmse(self): + xs = np.array([0.1, -0.1, 0.2, -0.2]); ys = np.zeros(4) + s = compute_stats(xs, ys, 0.0, 0.0) + assert abs(s.rmse - math.sqrt(np.mean(xs**2))) < 1e-9 + + def test_max_error(self): + xs = np.array([0.0, 0.0, 1.0]); ys = np.zeros(3) + s = compute_stats(xs, ys, 0.0, 0.0) + assert abs(s.max_error - 1.0) < 1e-9 + + def test_circle_symmetry(self): + xs, ys = _circle(360, 0.12) + s = compute_stats(xs, ys, 0.0, 0.0) + assert abs(s.bias_x) < 1e-6 + assert abs(s.cep50 - 0.12) < 1e-6 + + def test_std_2d_pythagorean(self): + rng = np.random.default_rng(3) + xs = rng.normal(0, 0.1, 200); ys = rng.normal(0, 0.2, 200) + s = compute_stats(xs, ys, 0.0, 0.0) + assert abs(s.std_2d - math.sqrt(s.std_x**2 + s.std_y**2)) < 1e-9 + + def test_n_samples(self): + xs = np.ones(42); ys = np.zeros(42) + s = compute_stats(xs, ys, 0.0, 0.0) + assert s.n_samples == 42 + + def test_too_few_raises(self): + with pytest.raises(ValueError): + compute_stats(np.array([1.0]), np.array([1.0]), 0.0, 0.0) + + def test_length_mismatch_raises(self): + with pytest.raises(ValueError): + compute_stats(np.array([1.0, 2.0]), np.array([1.0]), 0.0, 0.0) + + +class TestRealistic: + def test_10cm_cep50(self): + rng = np.random.default_rng(42) + xs = rng.normal(3.0, 0.07, 500); ys = rng.normal(2.0, 0.07, 500) + s = compute_stats(xs, ys, 3.0, 2.0) + assert 0.05 < s.cep50 < 0.20 + + def test_biased_uwb(self): + rng = np.random.default_rng(7) + xs = rng.normal(3.15, 0.05, 300); ys = rng.normal(2.0, 0.05, 300) + s = compute_stats(xs, ys, 3.0, 2.0) + assert abs(s.bias_x - 0.15) < 0.02 + + +class TestRangeAccum: + def test_empty(self): + a = RangeAccum(0) + assert a.mean == 0.0 and a.std == 0.0 and a.count == 0 + + def test_single_std_zero(self): + a = RangeAccum(0); a.add(2.0) + assert a.std == 0.0 + + def test_mean_std(self): + a = RangeAccum(0) + for v in [1.0, 2.0, 3.0, 4.0, 5.0]: + a.add(v) + assert abs(a.mean - 3.0) < 1e-9 + assert abs(a.std - float(np.std([1,2,3,4,5], ddof=1))) < 1e-9 + assert a.count == 5 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/CMakeLists.txt new file mode 100644 index 0000000..8d50687 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/CMakeLists.txt @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 3.8) +project(saltybot_uwb_logger_msgs) + +find_package(ament_cmake REQUIRED) +find_package(std_msgs REQUIRED) +find_package(rosidl_default_generators REQUIRED) + +rosidl_generate_interfaces(${PROJECT_NAME} + "msg/AccuracyReport.msg" + "srv/StartAccuracyTest.srv" + DEPENDENCIES std_msgs +) + +ament_export_dependencies(rosidl_default_runtime) +ament_package() diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/msg/AccuracyReport.msg b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/msg/AccuracyReport.msg new file mode 100644 index 0000000..47a9531 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/msg/AccuracyReport.msg @@ -0,0 +1,32 @@ +# AccuracyReport.msg — UWB positioning accuracy test results (Issue #634) +# +# Published on /saltybot/uwb/accuracy_report after StartAccuracyTest completes. + +std_msgs/Header header + +float64 truth_x_m +float64 truth_y_m + +uint32 n_samples + +float64 mean_x_m +float64 mean_y_m + +float64 std_x_m +float64 std_y_m +float64 std_2d_m + +float64 bias_x_m +float64 bias_y_m + +float64 cep50_m +float64 cep95_m +float64 max_error_m +float64 rmse_m + +uint8[] anchor_ids +float64[] anchor_range_mean_m +float64[] anchor_range_std_m + +string test_id +float64 duration_s diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/package.xml b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/package.xml new file mode 100644 index 0000000..d5db3ea --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/package.xml @@ -0,0 +1,21 @@ + + + + saltybot_uwb_logger_msgs + 0.1.0 + Message and service definitions for UWB logger (Issue #634). + sl-uwb + Apache-2.0 + + ament_cmake + rosidl_default_generators + + std_msgs + rosidl_default_runtime + + rosidl_interface_packages + + + ament_cmake + + diff --git a/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/srv/StartAccuracyTest.srv b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/srv/StartAccuracyTest.srv new file mode 100644 index 0000000..df33594 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_logger_msgs/srv/StartAccuracyTest.srv @@ -0,0 +1,15 @@ +# StartAccuracyTest.srv — trigger UWB accuracy test at a known position (Issue #634) + +# ── Request ──────────────────────────────────────────────────────────────── +float64 truth_x_m +float64 truth_y_m +uint32 n_samples +float64 timeout_s +string test_id + +--- + +# ── Response ─────────────────────────────────────────────────────────────── +bool success +string message +string test_id