diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_vo_drift.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_vo_drift.py new file mode 100644 index 0000000..d23f1d7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_vo_drift.py @@ -0,0 +1,150 @@ +""" +_vo_drift.py — Visual odometry drift detector helpers (no ROS2 deps). + +Algorithm +--------- +Two independent odometry streams (visual and wheel) are compared over a +sliding time window. Drift is measured as the absolute difference in +cumulative path length travelled by each source over that window: + + drift_m = |path_length(vo_window) − path_length(wheel_window)| + +Using cumulative path length (sum of inter-sample Euclidean steps) rather +than straight-line displacement makes the measure robust to circular motion +where start and end positions are the same. + +Drift is flagged when drift_m ≥ drift_threshold_m. + +Public API +---------- + OdomSample — namedtuple(t, x, y) + OdomBuffer — deque of OdomSamples with time-window trimming + compute_drift() — compare two OdomBuffers and return DriftResult + DriftResult — namedtuple(drift_m, vo_path_m, wheel_path_m, + is_drifting, window_s, n_vo, n_wheel) +""" + +from __future__ import annotations + +import math +from collections import deque +from typing import NamedTuple, Sequence + + +class OdomSample(NamedTuple): + t: float # monotonic timestamp (seconds) + x: float # position x (metres) + y: float # position y (metres) + + +class DriftResult(NamedTuple): + drift_m: float # |vo_path − wheel_path| (metres) + vo_path_m: float # cumulative path of VO source over window (metres) + wheel_path_m: float # cumulative path of wheel source over window (metres) + is_drifting: bool # True when drift_m >= threshold + window_s: float # actual time span of data used (seconds) + n_vo: int # number of VO samples in window + n_wheel: int # number of wheel samples in window + + +class OdomBuffer: + """ + Rolling buffer of OdomSamples trimmed to the last `max_age_s` seconds. + + Parameters + ---------- + max_age_s : float — samples older than this are discarded (seconds) + """ + + def __init__(self, max_age_s: float = 10.0) -> None: + self._max_age = max_age_s + self._buf: deque[OdomSample] = deque() + + # ── Public ──────────────────────────────────────────────────────────────── + + def push(self, sample: OdomSample) -> None: + """Append a sample and evict anything older than max_age_s.""" + self._buf.append(sample) + self._trim(sample.t) + + def window(self, window_s: float, now: float) -> list[OdomSample]: + """Return samples within the last window_s seconds of `now`.""" + cutoff = now - window_s + return [s for s in self._buf if s.t >= cutoff] + + def clear(self) -> None: + self._buf.clear() + + def __len__(self) -> int: + return len(self._buf) + + # ── Internal ────────────────────────────────────────────────────────────── + + def _trim(self, now: float) -> None: + cutoff = now - self._max_age + while self._buf and self._buf[0].t < cutoff: + self._buf.popleft() + + +# ── Core computation ────────────────────────────────────────────────────────── + +def compute_drift( + vo_buf: OdomBuffer, + wheel_buf: OdomBuffer, + window_s: float, + drift_threshold_m: float, + now: float, +) -> DriftResult: + """ + Compare VO and wheel odometry path lengths over the last `window_s`. + + Parameters + ---------- + vo_buf : OdomBuffer of visual odometry samples + wheel_buf : OdomBuffer of wheel odometry samples + window_s : comparison window width (seconds) + drift_threshold_m : drift_m threshold for is_drifting flag + now : current time (same scale as OdomSample.t) + + Returns + ------- + DriftResult — zero drift if either buffer has fewer than 2 samples. + """ + vo_samples = vo_buf.window(window_s, now) + wheel_samples = wheel_buf.window(window_s, now) + + if len(vo_samples) < 2 or len(wheel_samples) < 2: + return DriftResult( + drift_m=0.0, vo_path_m=0.0, wheel_path_m=0.0, + is_drifting=False, + window_s=0.0, n_vo=len(vo_samples), n_wheel=len(wheel_samples), + ) + + vo_path = _path_length(vo_samples) + wheel_path = _path_length(wheel_samples) + drift_m = abs(vo_path - wheel_path) + + # Actual data span = latest timestamp − earliest across both buffers + t_min = min(vo_samples[0].t, wheel_samples[0].t) + t_max = max(vo_samples[-1].t, wheel_samples[-1].t) + actual_window = t_max - t_min + + return DriftResult( + drift_m=drift_m, + vo_path_m=vo_path, + wheel_path_m=wheel_path, + is_drifting=drift_m >= drift_threshold_m, + window_s=actual_window, + n_vo=len(vo_samples), + n_wheel=len(wheel_samples), + ) + + +def _path_length(samples: Sequence[OdomSample]) -> float: + """Sum of Euclidean inter-sample distances.""" + total = 0.0 + for i in range(1, len(samples)): + dx = samples[i].x - samples[i - 1].x + dy = samples[i].y - samples[i - 1].y + total += math.sqrt(dx * dx + dy * dy) + return total diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/vo_drift_node.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/vo_drift_node.py new file mode 100644 index 0000000..aae8735 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/vo_drift_node.py @@ -0,0 +1,150 @@ +""" +vo_drift_node.py — Visual odometry drift detector (Issue #260). + +Compares the cumulative path lengths of visual odometry and wheel odometry +over a sliding window. When the absolute difference exceeds the configured +threshold the node flags drift, allowing the system to warn operators, +inflate VO covariance, or fall back to wheel-only localisation. + +Subscribes (BEST_EFFORT): + /camera/odom nav_msgs/Odometry visual odometry + /odom nav_msgs/Odometry wheel odometry + + → For this robot remap to /saltybot/visual_odom + /saltybot/rover_odom. + +Publishes: + /saltybot/vo_drift_detected std_msgs/Bool True while drifting + /saltybot/vo_drift_magnitude std_msgs/Float32 drift magnitude (metres) + +Parameters +---------- +vo_topic str /camera/odom Visual odometry source topic +wheel_topic str /odom Wheel odometry source topic +drift_threshold_m float 0.5 Drift flag threshold (metres) +window_s float 10.0 Comparison window (seconds) +publish_hz float 2.0 Output publication rate (Hz) +max_buffer_age_s float 30.0 Max age of stored samples (s) +""" + +from __future__ import annotations + +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from nav_msgs.msg import Odometry +from std_msgs.msg import Bool, Float32 + +from ._vo_drift import OdomBuffer, OdomSample, compute_drift + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=4, +) + + +class VoDriftNode(Node): + + def __init__(self) -> None: + super().__init__('vo_drift_node') + + self.declare_parameter('vo_topic', '/camera/odom') + self.declare_parameter('wheel_topic', '/odom') + self.declare_parameter('drift_threshold_m', 0.5) + self.declare_parameter('window_s', 10.0) + self.declare_parameter('publish_hz', 2.0) + self.declare_parameter('max_buffer_age_s', 30.0) + + vo_topic = self.get_parameter('vo_topic').value + wheel_topic = self.get_parameter('wheel_topic').value + self._thresh = self.get_parameter('drift_threshold_m').value + self._window_s = self.get_parameter('window_s').value + publish_hz = self.get_parameter('publish_hz').value + max_age = self.get_parameter('max_buffer_age_s').value + + self._vo_buf = OdomBuffer(max_age_s=max_age) + self._wheel_buf = OdomBuffer(max_age_s=max_age) + + self.create_subscription( + Odometry, vo_topic, self._on_vo, _SENSOR_QOS) + self.create_subscription( + Odometry, wheel_topic, self._on_wheel, _SENSOR_QOS) + + self._pub_detected = self.create_publisher( + Bool, '/saltybot/vo_drift_detected', 10) + self._pub_magnitude = self.create_publisher( + Float32, '/saltybot/vo_drift_magnitude', 10) + + self.create_timer(1.0 / publish_hz, self._tick) + + self.get_logger().info( + f'vo_drift_node ready — ' + f'vo={vo_topic} wheel={wheel_topic} ' + f'threshold={self._thresh}m window={self._window_s}s' + ) + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _on_vo(self, msg: Odometry) -> None: + s = _odom_to_sample(msg) + self._vo_buf.push(s) + + def _on_wheel(self, msg: Odometry) -> None: + s = _odom_to_sample(msg) + self._wheel_buf.push(s) + + # ── Publish tick ────────────────────────────────────────────────────────── + + def _tick(self) -> None: + now = time.monotonic() + result = compute_drift( + self._vo_buf, self._wheel_buf, + window_s=self._window_s, + drift_threshold_m=self._thresh, + now=now, + ) + + if result.is_drifting: + self.get_logger().warn( + f'VO drift detected: {result.drift_m:.3f}m ' + f'(vo={result.vo_path_m:.3f}m wheel={result.wheel_path_m:.3f}m ' + f'over {result.window_s:.1f}s)', + throttle_duration_sec=5.0, + ) + + det_msg = Bool() + det_msg.data = result.is_drifting + self._pub_detected.publish(det_msg) + + mag_msg = Float32() + mag_msg.data = float(result.drift_m) + self._pub_magnitude.publish(mag_msg) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _odom_to_sample(msg: Odometry) -> OdomSample: + """Convert nav_msgs/Odometry to OdomSample using monotonic clock.""" + return OdomSample( + t=time.monotonic(), + x=msg.pose.pose.position.x, + y=msg.pose.pose.position.y, + ) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = VoDriftNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_bringup/setup.py b/jetson/ros2_ws/src/saltybot_bringup/setup.py index 3b2b609..01a3191 100644 --- a/jetson/ros2_ws/src/saltybot_bringup/setup.py +++ b/jetson/ros2_ws/src/saltybot_bringup/setup.py @@ -35,6 +35,8 @@ setup( 'lidar_clustering = saltybot_bringup.lidar_clustering_node:main', # Floor surface type classifier (Issue #249) 'floor_classifier = saltybot_bringup.floor_classifier_node:main', + # Visual odometry drift detector (Issue #260) + 'vo_drift_detector = saltybot_bringup.vo_drift_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_bringup/test/test_vo_drift.py b/jetson/ros2_ws/src/saltybot_bringup/test/test_vo_drift.py new file mode 100644 index 0000000..d89f2bb --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/test/test_vo_drift.py @@ -0,0 +1,297 @@ +""" +test_vo_drift.py — Unit tests for VO drift detector helpers (no ROS2 required). + +Covers: + OdomBuffer: + - push/len + - window returns only samples within cutoff + - old samples are evicted beyond max_age_s + - clear empties the buffer + - window on empty buffer returns empty list + + _path_length (via compute_drift with crafted samples): + - stationary source → path = 0 + - straight-line motion → path = total distance + - L-shaped path → path = sum of two legs + + compute_drift: + - both empty → DriftResult with zeros, is_drifting=False + - one buffer < 2 samples → zero drift + - both move same distance → drift ≈ 0, not drifting + - VO moves 1m, wheel moves 0.5m → drift = 0.5m + - drift == threshold → is_drifting=True (>=) + - drift < threshold → is_drifting=False + - drift > threshold → is_drifting=True + - path lengths in result match expectation + - n_vo / n_wheel counts correct + - samples outside window ignored + - window_s in result reflects actual data span +""" + +import sys +import os +import math + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from saltybot_bringup._vo_drift import ( + OdomSample, + OdomBuffer, + DriftResult, + compute_drift, + _path_length, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _s(t, x, y) -> OdomSample: + return OdomSample(t=t, x=x, y=y) + + +def _straight_buf(n=5, speed=0.1, t_start=0.0, dt=1.0, + max_age_s=30.0) -> OdomBuffer: + """n samples moving along +x at `speed` m/s.""" + buf = OdomBuffer(max_age_s=max_age_s) + for i in range(n): + buf.push(_s(t_start + i * dt, x=i * speed * dt, y=0.0)) + return buf + + +def _stationary_buf(n=5, t_start=0.0, dt=1.0, + max_age_s=30.0) -> OdomBuffer: + buf = OdomBuffer(max_age_s=max_age_s) + for i in range(n): + buf.push(_s(t_start + i * dt, x=0.0, y=0.0)) + return buf + + +# ── OdomBuffer ──────────────────────────────────────────────────────────────── + +class TestOdomBuffer: + + def test_push_increases_len(self): + buf = OdomBuffer() + assert len(buf) == 0 + buf.push(_s(0.0, 0.0, 0.0)) + assert len(buf) == 1 + + def test_window_returns_all_within_cutoff(self): + buf = OdomBuffer(max_age_s=30.0) + for t in [0.0, 5.0, 10.0]: + buf.push(_s(t, 0.0, 0.0)) + samples = buf.window(window_s=10.0, now=10.0) + assert len(samples) == 3 + + def test_window_excludes_old_samples(self): + buf = OdomBuffer(max_age_s=30.0) + for t in [0.0, 5.0, 15.0]: + buf.push(_s(t, 0.0, 0.0)) + # window=5s from now=15 → only t=15 qualifies (t>=10) + samples = buf.window(window_s=5.0, now=15.0) + assert len(samples) == 1 + assert samples[0].t == 15.0 + + def test_evicts_samples_beyond_max_age(self): + buf = OdomBuffer(max_age_s=5.0) + buf.push(_s(0.0, 0.0, 0.0)) + buf.push(_s(10.0, 1.0, 0.0)) # now=10 → t=0 is 10s old > 5s max + assert len(buf) == 1 + + def test_clear_empties_buffer(self): + buf = _straight_buf(n=5) + buf.clear() + assert len(buf) == 0 + + def test_window_on_empty_buffer(self): + buf = OdomBuffer() + assert buf.window(window_s=10.0, now=100.0) == [] + + def test_window_boundary_inclusive(self): + """Sample exactly at window cutoff (t == now - window_s) is included.""" + buf = OdomBuffer(max_age_s=30.0) + buf.push(_s(0.0, 0.0, 0.0)) + # window=10, now=10 → cutoff=0.0, sample at t=0.0 should be included + samples = buf.window(window_s=10.0, now=10.0) + assert len(samples) == 1 + + +# ── _path_length ────────────────────────────────────────────────────────────── + +class TestPathLength: + + def test_stationary_path_zero(self): + samples = [_s(i, 0.0, 0.0) for i in range(5)] + assert _path_length(samples) == pytest.approx(0.0) + + def test_unit_step_path(self): + samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 0.0)] + assert _path_length(samples) == pytest.approx(1.0) + + def test_two_unit_steps(self): + samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 0.0), _s(2, 2.0, 0.0)] + assert _path_length(samples) == pytest.approx(2.0) + + def test_diagonal_step(self): + # (0,0) → (1,1): distance = sqrt(2) + samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 1.0)] + assert _path_length(samples) == pytest.approx(math.sqrt(2)) + + def test_l_shaped_path(self): + # Right 3m then up 4m → total path = 7m (not hypotenuse) + samples = [_s(0, 0.0, 0.0), _s(1, 3.0, 0.0), _s(2, 3.0, 4.0)] + assert _path_length(samples) == pytest.approx(7.0) + + def test_single_sample_returns_zero(self): + assert _path_length([_s(0, 5.0, 5.0)]) == pytest.approx(0.0) + + def test_empty_returns_zero(self): + assert _path_length([]) == pytest.approx(0.0) + + +# ── compute_drift ───────────────────────────────────────────────────────────── + +class TestComputeDrift: + + def test_both_empty_returns_zero_drift(self): + result = compute_drift( + OdomBuffer(), OdomBuffer(), + window_s=10.0, drift_threshold_m=0.5, now=10.0) + assert result.drift_m == pytest.approx(0.0) + assert not result.is_drifting + + def test_one_buffer_empty_returns_zero(self): + vo = _straight_buf(n=5, speed=0.1) + result = compute_drift( + vo, OdomBuffer(), + window_s=10.0, drift_threshold_m=0.5, now=5.0) + assert result.drift_m == pytest.approx(0.0) + assert not result.is_drifting + + def test_one_buffer_single_sample_returns_zero(self): + vo = _straight_buf(n=5, speed=0.1) + wheel = OdomBuffer() + wheel.push(_s(0.0, 0.0, 0.0)) # only 1 sample + result = compute_drift( + vo, wheel, + window_s=10.0, drift_threshold_m=0.5, now=5.0) + assert result.drift_m == pytest.approx(0.0) + assert not result.is_drifting + + def test_both_move_same_distance_zero_drift(self): + # Both move 0.1 m/s for 4 steps → 0.4 m each + vo = _straight_buf(n=5, speed=0.1, dt=1.0) + wheel = _straight_buf(n=5, speed=0.1, dt=1.0) + result = compute_drift( + vo, wheel, + window_s=10.0, drift_threshold_m=0.5, now=5.0) + assert result.drift_m == pytest.approx(0.0, abs=1e-9) + assert not result.is_drifting + + def test_both_stationary_zero_drift(self): + vo = _stationary_buf(n=5) + wheel = _stationary_buf(n=5) + result = compute_drift( + vo, wheel, + window_s=10.0, drift_threshold_m=0.5, now=5.0) + assert result.drift_m == pytest.approx(0.0) + assert not result.is_drifting + + def test_drift_equals_path_length_difference(self): + # VO moves 1.0 m total, wheel moves 0.5 m total + vo = _straight_buf(n=11, speed=0.1, dt=1.0) # 10 steps × 0.1 = 1.0m + wheel = _straight_buf(n=11, speed=0.05, dt=1.0) # 10 steps × 0.05 = 0.5m + result = compute_drift( + vo, wheel, + window_s=15.0, drift_threshold_m=0.5, now=11.0) + assert result.vo_path_m == pytest.approx(1.0, abs=1e-9) + assert result.wheel_path_m == pytest.approx(0.5, abs=1e-9) + assert result.drift_m == pytest.approx(0.5, abs=1e-9) + + def test_drift_at_threshold_is_drifting(self): + # drift == 0.5 → is_drifting = True (>= threshold) + vo = _straight_buf(n=11, speed=0.1, dt=1.0) + wheel = _straight_buf(n=11, speed=0.05, dt=1.0) + result = compute_drift( + vo, wheel, + window_s=15.0, drift_threshold_m=0.5, now=11.0) + assert result.is_drifting + + def test_drift_below_threshold_not_drifting(self): + vo = _straight_buf(n=11, speed=0.1, dt=1.0) + wheel = _straight_buf(n=11, speed=0.08, dt=1.0) + result = compute_drift( + vo, wheel, + window_s=15.0, drift_threshold_m=0.5, now=11.0) + # drift = |1.0 - 0.8| = 0.2 + assert result.drift_m == pytest.approx(0.2, abs=1e-9) + assert not result.is_drifting + + def test_drift_above_threshold_is_drifting(self): + vo = _straight_buf(n=11, speed=0.1, dt=1.0) + wheel = _stationary_buf(n=11, dt=1.0) + result = compute_drift( + vo, wheel, + window_s=15.0, drift_threshold_m=0.5, now=11.0) + # drift = |1.0 - 0.0| = 1.0 > 0.5 + assert result.drift_m > 0.5 + assert result.is_drifting + + def test_n_vo_n_wheel_counts(self): + vo = _straight_buf(n=8, dt=1.0) + wheel = _straight_buf(n=5, dt=1.0) + result = compute_drift( + vo, wheel, + window_s=15.0, drift_threshold_m=0.5, now=8.0) + assert result.n_vo == 8 + assert result.n_wheel == 5 + + def test_samples_outside_window_ignored(self): + # Push old samples far in the past; should not contribute to window + vo = OdomBuffer(max_age_s=60.0) + wheel = OdomBuffer(max_age_s=60.0) + # Old samples outside window (t=0..4, window is last 3s from now=10) + for t in range(5): + vo.push(_s(float(t), x=float(t), y=0.0)) + wheel.push(_s(float(t), x=float(t), y=0.0)) + # Recent samples inside window (t=7..10) + for t in range(7, 11): + vo.push(_s(float(t), x=float(t) * 0.1, y=0.0)) + wheel.push(_s(float(t), x=float(t) * 0.1, y=0.0)) + + result = compute_drift( + vo, wheel, + window_s=3.0, drift_threshold_m=0.5, now=10.0) + # Both sources move identically inside window → zero drift + assert result.drift_m == pytest.approx(0.0, abs=1e-9) + # Only the 4 recent samples (t=7,8,9,10) in window + assert result.n_vo == 4 + assert result.n_wheel == 4 + + def test_result_is_namedtuple(self): + result = compute_drift( + _straight_buf(), _straight_buf(), + window_s=10.0, drift_threshold_m=0.5, now=5.0) + assert hasattr(result, 'drift_m') + assert hasattr(result, 'vo_path_m') + assert hasattr(result, 'wheel_path_m') + assert hasattr(result, 'is_drifting') + assert hasattr(result, 'window_s') + assert hasattr(result, 'n_vo') + assert hasattr(result, 'n_wheel') + + def test_wheel_faster_than_vo_still_drifts(self): + """Drift is absolute difference — direction doesn't matter.""" + vo = _stationary_buf(n=11, dt=1.0) + wheel = _straight_buf(n=11, speed=0.1, dt=1.0) + result = compute_drift( + vo, wheel, + window_s=15.0, drift_threshold_m=0.5, now=11.0) + assert result.drift_m == pytest.approx(1.0, abs=1e-9) + assert result.is_drifting + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])