feat(bringup): visual odometry drift detector (Issue #260) #265
@ -0,0 +1,150 @@
|
||||
"""
|
||||
_vo_drift.py — Visual odometry drift detector helpers (no ROS2 deps).
|
||||
|
||||
Algorithm
|
||||
---------
|
||||
Two independent odometry streams (visual and wheel) are compared over a
|
||||
sliding time window. Drift is measured as the absolute difference in
|
||||
cumulative path length travelled by each source over that window:
|
||||
|
||||
drift_m = |path_length(vo_window) − path_length(wheel_window)|
|
||||
|
||||
Using cumulative path length (sum of inter-sample Euclidean steps) rather
|
||||
than straight-line displacement makes the measure robust to circular motion
|
||||
where start and end positions are the same.
|
||||
|
||||
Drift is flagged when drift_m ≥ drift_threshold_m.
|
||||
|
||||
Public API
|
||||
----------
|
||||
OdomSample — namedtuple(t, x, y)
|
||||
OdomBuffer — deque of OdomSamples with time-window trimming
|
||||
compute_drift() — compare two OdomBuffers and return DriftResult
|
||||
DriftResult — namedtuple(drift_m, vo_path_m, wheel_path_m,
|
||||
is_drifting, window_s, n_vo, n_wheel)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from collections import deque
|
||||
from typing import NamedTuple, Sequence
|
||||
|
||||
|
||||
class OdomSample(NamedTuple):
|
||||
t: float # monotonic timestamp (seconds)
|
||||
x: float # position x (metres)
|
||||
y: float # position y (metres)
|
||||
|
||||
|
||||
class DriftResult(NamedTuple):
|
||||
drift_m: float # |vo_path − wheel_path| (metres)
|
||||
vo_path_m: float # cumulative path of VO source over window (metres)
|
||||
wheel_path_m: float # cumulative path of wheel source over window (metres)
|
||||
is_drifting: bool # True when drift_m >= threshold
|
||||
window_s: float # actual time span of data used (seconds)
|
||||
n_vo: int # number of VO samples in window
|
||||
n_wheel: int # number of wheel samples in window
|
||||
|
||||
|
||||
class OdomBuffer:
|
||||
"""
|
||||
Rolling buffer of OdomSamples trimmed to the last `max_age_s` seconds.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
max_age_s : float — samples older than this are discarded (seconds)
|
||||
"""
|
||||
|
||||
def __init__(self, max_age_s: float = 10.0) -> None:
|
||||
self._max_age = max_age_s
|
||||
self._buf: deque[OdomSample] = deque()
|
||||
|
||||
# ── Public ────────────────────────────────────────────────────────────────
|
||||
|
||||
def push(self, sample: OdomSample) -> None:
|
||||
"""Append a sample and evict anything older than max_age_s."""
|
||||
self._buf.append(sample)
|
||||
self._trim(sample.t)
|
||||
|
||||
def window(self, window_s: float, now: float) -> list[OdomSample]:
|
||||
"""Return samples within the last window_s seconds of `now`."""
|
||||
cutoff = now - window_s
|
||||
return [s for s in self._buf if s.t >= cutoff]
|
||||
|
||||
def clear(self) -> None:
|
||||
self._buf.clear()
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self._buf)
|
||||
|
||||
# ── Internal ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _trim(self, now: float) -> None:
|
||||
cutoff = now - self._max_age
|
||||
while self._buf and self._buf[0].t < cutoff:
|
||||
self._buf.popleft()
|
||||
|
||||
|
||||
# ── Core computation ──────────────────────────────────────────────────────────
|
||||
|
||||
def compute_drift(
|
||||
vo_buf: OdomBuffer,
|
||||
wheel_buf: OdomBuffer,
|
||||
window_s: float,
|
||||
drift_threshold_m: float,
|
||||
now: float,
|
||||
) -> DriftResult:
|
||||
"""
|
||||
Compare VO and wheel odometry path lengths over the last `window_s`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
vo_buf : OdomBuffer of visual odometry samples
|
||||
wheel_buf : OdomBuffer of wheel odometry samples
|
||||
window_s : comparison window width (seconds)
|
||||
drift_threshold_m : drift_m threshold for is_drifting flag
|
||||
now : current time (same scale as OdomSample.t)
|
||||
|
||||
Returns
|
||||
-------
|
||||
DriftResult — zero drift if either buffer has fewer than 2 samples.
|
||||
"""
|
||||
vo_samples = vo_buf.window(window_s, now)
|
||||
wheel_samples = wheel_buf.window(window_s, now)
|
||||
|
||||
if len(vo_samples) < 2 or len(wheel_samples) < 2:
|
||||
return DriftResult(
|
||||
drift_m=0.0, vo_path_m=0.0, wheel_path_m=0.0,
|
||||
is_drifting=False,
|
||||
window_s=0.0, n_vo=len(vo_samples), n_wheel=len(wheel_samples),
|
||||
)
|
||||
|
||||
vo_path = _path_length(vo_samples)
|
||||
wheel_path = _path_length(wheel_samples)
|
||||
drift_m = abs(vo_path - wheel_path)
|
||||
|
||||
# Actual data span = latest timestamp − earliest across both buffers
|
||||
t_min = min(vo_samples[0].t, wheel_samples[0].t)
|
||||
t_max = max(vo_samples[-1].t, wheel_samples[-1].t)
|
||||
actual_window = t_max - t_min
|
||||
|
||||
return DriftResult(
|
||||
drift_m=drift_m,
|
||||
vo_path_m=vo_path,
|
||||
wheel_path_m=wheel_path,
|
||||
is_drifting=drift_m >= drift_threshold_m,
|
||||
window_s=actual_window,
|
||||
n_vo=len(vo_samples),
|
||||
n_wheel=len(wheel_samples),
|
||||
)
|
||||
|
||||
|
||||
def _path_length(samples: Sequence[OdomSample]) -> float:
|
||||
"""Sum of Euclidean inter-sample distances."""
|
||||
total = 0.0
|
||||
for i in range(1, len(samples)):
|
||||
dx = samples[i].x - samples[i - 1].x
|
||||
dy = samples[i].y - samples[i - 1].y
|
||||
total += math.sqrt(dx * dx + dy * dy)
|
||||
return total
|
||||
@ -0,0 +1,150 @@
|
||||
"""
|
||||
vo_drift_node.py — Visual odometry drift detector (Issue #260).
|
||||
|
||||
Compares the cumulative path lengths of visual odometry and wheel odometry
|
||||
over a sliding window. When the absolute difference exceeds the configured
|
||||
threshold the node flags drift, allowing the system to warn operators,
|
||||
inflate VO covariance, or fall back to wheel-only localisation.
|
||||
|
||||
Subscribes (BEST_EFFORT):
|
||||
/camera/odom nav_msgs/Odometry visual odometry
|
||||
/odom nav_msgs/Odometry wheel odometry
|
||||
|
||||
→ For this robot remap to /saltybot/visual_odom + /saltybot/rover_odom.
|
||||
|
||||
Publishes:
|
||||
/saltybot/vo_drift_detected std_msgs/Bool True while drifting
|
||||
/saltybot/vo_drift_magnitude std_msgs/Float32 drift magnitude (metres)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
vo_topic str /camera/odom Visual odometry source topic
|
||||
wheel_topic str /odom Wheel odometry source topic
|
||||
drift_threshold_m float 0.5 Drift flag threshold (metres)
|
||||
window_s float 10.0 Comparison window (seconds)
|
||||
publish_hz float 2.0 Output publication rate (Hz)
|
||||
max_buffer_age_s float 30.0 Max age of stored samples (s)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from nav_msgs.msg import Odometry
|
||||
from std_msgs.msg import Bool, Float32
|
||||
|
||||
from ._vo_drift import OdomBuffer, OdomSample, compute_drift
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=4,
|
||||
)
|
||||
|
||||
|
||||
class VoDriftNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('vo_drift_node')
|
||||
|
||||
self.declare_parameter('vo_topic', '/camera/odom')
|
||||
self.declare_parameter('wheel_topic', '/odom')
|
||||
self.declare_parameter('drift_threshold_m', 0.5)
|
||||
self.declare_parameter('window_s', 10.0)
|
||||
self.declare_parameter('publish_hz', 2.0)
|
||||
self.declare_parameter('max_buffer_age_s', 30.0)
|
||||
|
||||
vo_topic = self.get_parameter('vo_topic').value
|
||||
wheel_topic = self.get_parameter('wheel_topic').value
|
||||
self._thresh = self.get_parameter('drift_threshold_m').value
|
||||
self._window_s = self.get_parameter('window_s').value
|
||||
publish_hz = self.get_parameter('publish_hz').value
|
||||
max_age = self.get_parameter('max_buffer_age_s').value
|
||||
|
||||
self._vo_buf = OdomBuffer(max_age_s=max_age)
|
||||
self._wheel_buf = OdomBuffer(max_age_s=max_age)
|
||||
|
||||
self.create_subscription(
|
||||
Odometry, vo_topic, self._on_vo, _SENSOR_QOS)
|
||||
self.create_subscription(
|
||||
Odometry, wheel_topic, self._on_wheel, _SENSOR_QOS)
|
||||
|
||||
self._pub_detected = self.create_publisher(
|
||||
Bool, '/saltybot/vo_drift_detected', 10)
|
||||
self._pub_magnitude = self.create_publisher(
|
||||
Float32, '/saltybot/vo_drift_magnitude', 10)
|
||||
|
||||
self.create_timer(1.0 / publish_hz, self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f'vo_drift_node ready — '
|
||||
f'vo={vo_topic} wheel={wheel_topic} '
|
||||
f'threshold={self._thresh}m window={self._window_s}s'
|
||||
)
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_vo(self, msg: Odometry) -> None:
|
||||
s = _odom_to_sample(msg)
|
||||
self._vo_buf.push(s)
|
||||
|
||||
def _on_wheel(self, msg: Odometry) -> None:
|
||||
s = _odom_to_sample(msg)
|
||||
self._wheel_buf.push(s)
|
||||
|
||||
# ── Publish tick ──────────────────────────────────────────────────────────
|
||||
|
||||
def _tick(self) -> None:
|
||||
now = time.monotonic()
|
||||
result = compute_drift(
|
||||
self._vo_buf, self._wheel_buf,
|
||||
window_s=self._window_s,
|
||||
drift_threshold_m=self._thresh,
|
||||
now=now,
|
||||
)
|
||||
|
||||
if result.is_drifting:
|
||||
self.get_logger().warn(
|
||||
f'VO drift detected: {result.drift_m:.3f}m '
|
||||
f'(vo={result.vo_path_m:.3f}m wheel={result.wheel_path_m:.3f}m '
|
||||
f'over {result.window_s:.1f}s)',
|
||||
throttle_duration_sec=5.0,
|
||||
)
|
||||
|
||||
det_msg = Bool()
|
||||
det_msg.data = result.is_drifting
|
||||
self._pub_detected.publish(det_msg)
|
||||
|
||||
mag_msg = Float32()
|
||||
mag_msg.data = float(result.drift_m)
|
||||
self._pub_magnitude.publish(mag_msg)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _odom_to_sample(msg: Odometry) -> OdomSample:
|
||||
"""Convert nav_msgs/Odometry to OdomSample using monotonic clock."""
|
||||
return OdomSample(
|
||||
t=time.monotonic(),
|
||||
x=msg.pose.pose.position.x,
|
||||
y=msg.pose.pose.position.y,
|
||||
)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = VoDriftNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -35,6 +35,8 @@ setup(
|
||||
'lidar_clustering = saltybot_bringup.lidar_clustering_node:main',
|
||||
# Floor surface type classifier (Issue #249)
|
||||
'floor_classifier = saltybot_bringup.floor_classifier_node:main',
|
||||
# Visual odometry drift detector (Issue #260)
|
||||
'vo_drift_detector = saltybot_bringup.vo_drift_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
297
jetson/ros2_ws/src/saltybot_bringup/test/test_vo_drift.py
Normal file
297
jetson/ros2_ws/src/saltybot_bringup/test/test_vo_drift.py
Normal file
@ -0,0 +1,297 @@
|
||||
"""
|
||||
test_vo_drift.py — Unit tests for VO drift detector helpers (no ROS2 required).
|
||||
|
||||
Covers:
|
||||
OdomBuffer:
|
||||
- push/len
|
||||
- window returns only samples within cutoff
|
||||
- old samples are evicted beyond max_age_s
|
||||
- clear empties the buffer
|
||||
- window on empty buffer returns empty list
|
||||
|
||||
_path_length (via compute_drift with crafted samples):
|
||||
- stationary source → path = 0
|
||||
- straight-line motion → path = total distance
|
||||
- L-shaped path → path = sum of two legs
|
||||
|
||||
compute_drift:
|
||||
- both empty → DriftResult with zeros, is_drifting=False
|
||||
- one buffer < 2 samples → zero drift
|
||||
- both move same distance → drift ≈ 0, not drifting
|
||||
- VO moves 1m, wheel moves 0.5m → drift = 0.5m
|
||||
- drift == threshold → is_drifting=True (>=)
|
||||
- drift < threshold → is_drifting=False
|
||||
- drift > threshold → is_drifting=True
|
||||
- path lengths in result match expectation
|
||||
- n_vo / n_wheel counts correct
|
||||
- samples outside window ignored
|
||||
- window_s in result reflects actual data span
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import math
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_bringup._vo_drift import (
|
||||
OdomSample,
|
||||
OdomBuffer,
|
||||
DriftResult,
|
||||
compute_drift,
|
||||
_path_length,
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _s(t, x, y) -> OdomSample:
|
||||
return OdomSample(t=t, x=x, y=y)
|
||||
|
||||
|
||||
def _straight_buf(n=5, speed=0.1, t_start=0.0, dt=1.0,
|
||||
max_age_s=30.0) -> OdomBuffer:
|
||||
"""n samples moving along +x at `speed` m/s."""
|
||||
buf = OdomBuffer(max_age_s=max_age_s)
|
||||
for i in range(n):
|
||||
buf.push(_s(t_start + i * dt, x=i * speed * dt, y=0.0))
|
||||
return buf
|
||||
|
||||
|
||||
def _stationary_buf(n=5, t_start=0.0, dt=1.0,
|
||||
max_age_s=30.0) -> OdomBuffer:
|
||||
buf = OdomBuffer(max_age_s=max_age_s)
|
||||
for i in range(n):
|
||||
buf.push(_s(t_start + i * dt, x=0.0, y=0.0))
|
||||
return buf
|
||||
|
||||
|
||||
# ── OdomBuffer ────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestOdomBuffer:
|
||||
|
||||
def test_push_increases_len(self):
|
||||
buf = OdomBuffer()
|
||||
assert len(buf) == 0
|
||||
buf.push(_s(0.0, 0.0, 0.0))
|
||||
assert len(buf) == 1
|
||||
|
||||
def test_window_returns_all_within_cutoff(self):
|
||||
buf = OdomBuffer(max_age_s=30.0)
|
||||
for t in [0.0, 5.0, 10.0]:
|
||||
buf.push(_s(t, 0.0, 0.0))
|
||||
samples = buf.window(window_s=10.0, now=10.0)
|
||||
assert len(samples) == 3
|
||||
|
||||
def test_window_excludes_old_samples(self):
|
||||
buf = OdomBuffer(max_age_s=30.0)
|
||||
for t in [0.0, 5.0, 15.0]:
|
||||
buf.push(_s(t, 0.0, 0.0))
|
||||
# window=5s from now=15 → only t=15 qualifies (t>=10)
|
||||
samples = buf.window(window_s=5.0, now=15.0)
|
||||
assert len(samples) == 1
|
||||
assert samples[0].t == 15.0
|
||||
|
||||
def test_evicts_samples_beyond_max_age(self):
|
||||
buf = OdomBuffer(max_age_s=5.0)
|
||||
buf.push(_s(0.0, 0.0, 0.0))
|
||||
buf.push(_s(10.0, 1.0, 0.0)) # now=10 → t=0 is 10s old > 5s max
|
||||
assert len(buf) == 1
|
||||
|
||||
def test_clear_empties_buffer(self):
|
||||
buf = _straight_buf(n=5)
|
||||
buf.clear()
|
||||
assert len(buf) == 0
|
||||
|
||||
def test_window_on_empty_buffer(self):
|
||||
buf = OdomBuffer()
|
||||
assert buf.window(window_s=10.0, now=100.0) == []
|
||||
|
||||
def test_window_boundary_inclusive(self):
|
||||
"""Sample exactly at window cutoff (t == now - window_s) is included."""
|
||||
buf = OdomBuffer(max_age_s=30.0)
|
||||
buf.push(_s(0.0, 0.0, 0.0))
|
||||
# window=10, now=10 → cutoff=0.0, sample at t=0.0 should be included
|
||||
samples = buf.window(window_s=10.0, now=10.0)
|
||||
assert len(samples) == 1
|
||||
|
||||
|
||||
# ── _path_length ──────────────────────────────────────────────────────────────
|
||||
|
||||
class TestPathLength:
|
||||
|
||||
def test_stationary_path_zero(self):
|
||||
samples = [_s(i, 0.0, 0.0) for i in range(5)]
|
||||
assert _path_length(samples) == pytest.approx(0.0)
|
||||
|
||||
def test_unit_step_path(self):
|
||||
samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 0.0)]
|
||||
assert _path_length(samples) == pytest.approx(1.0)
|
||||
|
||||
def test_two_unit_steps(self):
|
||||
samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 0.0), _s(2, 2.0, 0.0)]
|
||||
assert _path_length(samples) == pytest.approx(2.0)
|
||||
|
||||
def test_diagonal_step(self):
|
||||
# (0,0) → (1,1): distance = sqrt(2)
|
||||
samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 1.0)]
|
||||
assert _path_length(samples) == pytest.approx(math.sqrt(2))
|
||||
|
||||
def test_l_shaped_path(self):
|
||||
# Right 3m then up 4m → total path = 7m (not hypotenuse)
|
||||
samples = [_s(0, 0.0, 0.0), _s(1, 3.0, 0.0), _s(2, 3.0, 4.0)]
|
||||
assert _path_length(samples) == pytest.approx(7.0)
|
||||
|
||||
def test_single_sample_returns_zero(self):
|
||||
assert _path_length([_s(0, 5.0, 5.0)]) == pytest.approx(0.0)
|
||||
|
||||
def test_empty_returns_zero(self):
|
||||
assert _path_length([]) == pytest.approx(0.0)
|
||||
|
||||
|
||||
# ── compute_drift ─────────────────────────────────────────────────────────────
|
||||
|
||||
class TestComputeDrift:
|
||||
|
||||
def test_both_empty_returns_zero_drift(self):
|
||||
result = compute_drift(
|
||||
OdomBuffer(), OdomBuffer(),
|
||||
window_s=10.0, drift_threshold_m=0.5, now=10.0)
|
||||
assert result.drift_m == pytest.approx(0.0)
|
||||
assert not result.is_drifting
|
||||
|
||||
def test_one_buffer_empty_returns_zero(self):
|
||||
vo = _straight_buf(n=5, speed=0.1)
|
||||
result = compute_drift(
|
||||
vo, OdomBuffer(),
|
||||
window_s=10.0, drift_threshold_m=0.5, now=5.0)
|
||||
assert result.drift_m == pytest.approx(0.0)
|
||||
assert not result.is_drifting
|
||||
|
||||
def test_one_buffer_single_sample_returns_zero(self):
|
||||
vo = _straight_buf(n=5, speed=0.1)
|
||||
wheel = OdomBuffer()
|
||||
wheel.push(_s(0.0, 0.0, 0.0)) # only 1 sample
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=10.0, drift_threshold_m=0.5, now=5.0)
|
||||
assert result.drift_m == pytest.approx(0.0)
|
||||
assert not result.is_drifting
|
||||
|
||||
def test_both_move_same_distance_zero_drift(self):
|
||||
# Both move 0.1 m/s for 4 steps → 0.4 m each
|
||||
vo = _straight_buf(n=5, speed=0.1, dt=1.0)
|
||||
wheel = _straight_buf(n=5, speed=0.1, dt=1.0)
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=10.0, drift_threshold_m=0.5, now=5.0)
|
||||
assert result.drift_m == pytest.approx(0.0, abs=1e-9)
|
||||
assert not result.is_drifting
|
||||
|
||||
def test_both_stationary_zero_drift(self):
|
||||
vo = _stationary_buf(n=5)
|
||||
wheel = _stationary_buf(n=5)
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=10.0, drift_threshold_m=0.5, now=5.0)
|
||||
assert result.drift_m == pytest.approx(0.0)
|
||||
assert not result.is_drifting
|
||||
|
||||
def test_drift_equals_path_length_difference(self):
|
||||
# VO moves 1.0 m total, wheel moves 0.5 m total
|
||||
vo = _straight_buf(n=11, speed=0.1, dt=1.0) # 10 steps × 0.1 = 1.0m
|
||||
wheel = _straight_buf(n=11, speed=0.05, dt=1.0) # 10 steps × 0.05 = 0.5m
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=15.0, drift_threshold_m=0.5, now=11.0)
|
||||
assert result.vo_path_m == pytest.approx(1.0, abs=1e-9)
|
||||
assert result.wheel_path_m == pytest.approx(0.5, abs=1e-9)
|
||||
assert result.drift_m == pytest.approx(0.5, abs=1e-9)
|
||||
|
||||
def test_drift_at_threshold_is_drifting(self):
|
||||
# drift == 0.5 → is_drifting = True (>= threshold)
|
||||
vo = _straight_buf(n=11, speed=0.1, dt=1.0)
|
||||
wheel = _straight_buf(n=11, speed=0.05, dt=1.0)
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=15.0, drift_threshold_m=0.5, now=11.0)
|
||||
assert result.is_drifting
|
||||
|
||||
def test_drift_below_threshold_not_drifting(self):
|
||||
vo = _straight_buf(n=11, speed=0.1, dt=1.0)
|
||||
wheel = _straight_buf(n=11, speed=0.08, dt=1.0)
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=15.0, drift_threshold_m=0.5, now=11.0)
|
||||
# drift = |1.0 - 0.8| = 0.2
|
||||
assert result.drift_m == pytest.approx(0.2, abs=1e-9)
|
||||
assert not result.is_drifting
|
||||
|
||||
def test_drift_above_threshold_is_drifting(self):
|
||||
vo = _straight_buf(n=11, speed=0.1, dt=1.0)
|
||||
wheel = _stationary_buf(n=11, dt=1.0)
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=15.0, drift_threshold_m=0.5, now=11.0)
|
||||
# drift = |1.0 - 0.0| = 1.0 > 0.5
|
||||
assert result.drift_m > 0.5
|
||||
assert result.is_drifting
|
||||
|
||||
def test_n_vo_n_wheel_counts(self):
|
||||
vo = _straight_buf(n=8, dt=1.0)
|
||||
wheel = _straight_buf(n=5, dt=1.0)
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=15.0, drift_threshold_m=0.5, now=8.0)
|
||||
assert result.n_vo == 8
|
||||
assert result.n_wheel == 5
|
||||
|
||||
def test_samples_outside_window_ignored(self):
|
||||
# Push old samples far in the past; should not contribute to window
|
||||
vo = OdomBuffer(max_age_s=60.0)
|
||||
wheel = OdomBuffer(max_age_s=60.0)
|
||||
# Old samples outside window (t=0..4, window is last 3s from now=10)
|
||||
for t in range(5):
|
||||
vo.push(_s(float(t), x=float(t), y=0.0))
|
||||
wheel.push(_s(float(t), x=float(t), y=0.0))
|
||||
# Recent samples inside window (t=7..10)
|
||||
for t in range(7, 11):
|
||||
vo.push(_s(float(t), x=float(t) * 0.1, y=0.0))
|
||||
wheel.push(_s(float(t), x=float(t) * 0.1, y=0.0))
|
||||
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=3.0, drift_threshold_m=0.5, now=10.0)
|
||||
# Both sources move identically inside window → zero drift
|
||||
assert result.drift_m == pytest.approx(0.0, abs=1e-9)
|
||||
# Only the 4 recent samples (t=7,8,9,10) in window
|
||||
assert result.n_vo == 4
|
||||
assert result.n_wheel == 4
|
||||
|
||||
def test_result_is_namedtuple(self):
|
||||
result = compute_drift(
|
||||
_straight_buf(), _straight_buf(),
|
||||
window_s=10.0, drift_threshold_m=0.5, now=5.0)
|
||||
assert hasattr(result, 'drift_m')
|
||||
assert hasattr(result, 'vo_path_m')
|
||||
assert hasattr(result, 'wheel_path_m')
|
||||
assert hasattr(result, 'is_drifting')
|
||||
assert hasattr(result, 'window_s')
|
||||
assert hasattr(result, 'n_vo')
|
||||
assert hasattr(result, 'n_wheel')
|
||||
|
||||
def test_wheel_faster_than_vo_still_drifts(self):
|
||||
"""Drift is absolute difference — direction doesn't matter."""
|
||||
vo = _stationary_buf(n=11, dt=1.0)
|
||||
wheel = _straight_buf(n=11, speed=0.1, dt=1.0)
|
||||
result = compute_drift(
|
||||
vo, wheel,
|
||||
window_s=15.0, drift_threshold_m=0.5, now=11.0)
|
||||
assert result.drift_m == pytest.approx(1.0, abs=1e-9)
|
||||
assert result.is_drifting
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
Loading…
x
Reference in New Issue
Block a user