feat(bringup): visual odometry drift detector (Issue #260) #265

Merged
sl-jetson merged 1 commits from sl-perception/issue-260-vo-drift into main 2026-03-02 14:13:35 -05:00
4 changed files with 599 additions and 0 deletions
Showing only changes of commit 9d12805843 - Show all commits

View File

@ -0,0 +1,150 @@
"""
_vo_drift.py Visual odometry drift detector helpers (no ROS2 deps).
Algorithm
---------
Two independent odometry streams (visual and wheel) are compared over a
sliding time window. Drift is measured as the absolute difference in
cumulative path length travelled by each source over that window:
drift_m = |path_length(vo_window) path_length(wheel_window)|
Using cumulative path length (sum of inter-sample Euclidean steps) rather
than straight-line displacement makes the measure robust to circular motion
where start and end positions are the same.
Drift is flagged when drift_m drift_threshold_m.
Public API
----------
OdomSample namedtuple(t, x, y)
OdomBuffer deque of OdomSamples with time-window trimming
compute_drift() compare two OdomBuffers and return DriftResult
DriftResult namedtuple(drift_m, vo_path_m, wheel_path_m,
is_drifting, window_s, n_vo, n_wheel)
"""
from __future__ import annotations
import math
from collections import deque
from typing import NamedTuple, Sequence
class OdomSample(NamedTuple):
t: float # monotonic timestamp (seconds)
x: float # position x (metres)
y: float # position y (metres)
class DriftResult(NamedTuple):
drift_m: float # |vo_path wheel_path| (metres)
vo_path_m: float # cumulative path of VO source over window (metres)
wheel_path_m: float # cumulative path of wheel source over window (metres)
is_drifting: bool # True when drift_m >= threshold
window_s: float # actual time span of data used (seconds)
n_vo: int # number of VO samples in window
n_wheel: int # number of wheel samples in window
class OdomBuffer:
"""
Rolling buffer of OdomSamples trimmed to the last `max_age_s` seconds.
Parameters
----------
max_age_s : float samples older than this are discarded (seconds)
"""
def __init__(self, max_age_s: float = 10.0) -> None:
self._max_age = max_age_s
self._buf: deque[OdomSample] = deque()
# ── Public ────────────────────────────────────────────────────────────────
def push(self, sample: OdomSample) -> None:
"""Append a sample and evict anything older than max_age_s."""
self._buf.append(sample)
self._trim(sample.t)
def window(self, window_s: float, now: float) -> list[OdomSample]:
"""Return samples within the last window_s seconds of `now`."""
cutoff = now - window_s
return [s for s in self._buf if s.t >= cutoff]
def clear(self) -> None:
self._buf.clear()
def __len__(self) -> int:
return len(self._buf)
# ── Internal ──────────────────────────────────────────────────────────────
def _trim(self, now: float) -> None:
cutoff = now - self._max_age
while self._buf and self._buf[0].t < cutoff:
self._buf.popleft()
# ── Core computation ──────────────────────────────────────────────────────────
def compute_drift(
vo_buf: OdomBuffer,
wheel_buf: OdomBuffer,
window_s: float,
drift_threshold_m: float,
now: float,
) -> DriftResult:
"""
Compare VO and wheel odometry path lengths over the last `window_s`.
Parameters
----------
vo_buf : OdomBuffer of visual odometry samples
wheel_buf : OdomBuffer of wheel odometry samples
window_s : comparison window width (seconds)
drift_threshold_m : drift_m threshold for is_drifting flag
now : current time (same scale as OdomSample.t)
Returns
-------
DriftResult zero drift if either buffer has fewer than 2 samples.
"""
vo_samples = vo_buf.window(window_s, now)
wheel_samples = wheel_buf.window(window_s, now)
if len(vo_samples) < 2 or len(wheel_samples) < 2:
return DriftResult(
drift_m=0.0, vo_path_m=0.0, wheel_path_m=0.0,
is_drifting=False,
window_s=0.0, n_vo=len(vo_samples), n_wheel=len(wheel_samples),
)
vo_path = _path_length(vo_samples)
wheel_path = _path_length(wheel_samples)
drift_m = abs(vo_path - wheel_path)
# Actual data span = latest timestamp earliest across both buffers
t_min = min(vo_samples[0].t, wheel_samples[0].t)
t_max = max(vo_samples[-1].t, wheel_samples[-1].t)
actual_window = t_max - t_min
return DriftResult(
drift_m=drift_m,
vo_path_m=vo_path,
wheel_path_m=wheel_path,
is_drifting=drift_m >= drift_threshold_m,
window_s=actual_window,
n_vo=len(vo_samples),
n_wheel=len(wheel_samples),
)
def _path_length(samples: Sequence[OdomSample]) -> float:
"""Sum of Euclidean inter-sample distances."""
total = 0.0
for i in range(1, len(samples)):
dx = samples[i].x - samples[i - 1].x
dy = samples[i].y - samples[i - 1].y
total += math.sqrt(dx * dx + dy * dy)
return total

View File

@ -0,0 +1,150 @@
"""
vo_drift_node.py Visual odometry drift detector (Issue #260).
Compares the cumulative path lengths of visual odometry and wheel odometry
over a sliding window. When the absolute difference exceeds the configured
threshold the node flags drift, allowing the system to warn operators,
inflate VO covariance, or fall back to wheel-only localisation.
Subscribes (BEST_EFFORT):
/camera/odom nav_msgs/Odometry visual odometry
/odom nav_msgs/Odometry wheel odometry
For this robot remap to /saltybot/visual_odom + /saltybot/rover_odom.
Publishes:
/saltybot/vo_drift_detected std_msgs/Bool True while drifting
/saltybot/vo_drift_magnitude std_msgs/Float32 drift magnitude (metres)
Parameters
----------
vo_topic str /camera/odom Visual odometry source topic
wheel_topic str /odom Wheel odometry source topic
drift_threshold_m float 0.5 Drift flag threshold (metres)
window_s float 10.0 Comparison window (seconds)
publish_hz float 2.0 Output publication rate (Hz)
max_buffer_age_s float 30.0 Max age of stored samples (s)
"""
from __future__ import annotations
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from nav_msgs.msg import Odometry
from std_msgs.msg import Bool, Float32
from ._vo_drift import OdomBuffer, OdomSample, compute_drift
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=4,
)
class VoDriftNode(Node):
def __init__(self) -> None:
super().__init__('vo_drift_node')
self.declare_parameter('vo_topic', '/camera/odom')
self.declare_parameter('wheel_topic', '/odom')
self.declare_parameter('drift_threshold_m', 0.5)
self.declare_parameter('window_s', 10.0)
self.declare_parameter('publish_hz', 2.0)
self.declare_parameter('max_buffer_age_s', 30.0)
vo_topic = self.get_parameter('vo_topic').value
wheel_topic = self.get_parameter('wheel_topic').value
self._thresh = self.get_parameter('drift_threshold_m').value
self._window_s = self.get_parameter('window_s').value
publish_hz = self.get_parameter('publish_hz').value
max_age = self.get_parameter('max_buffer_age_s').value
self._vo_buf = OdomBuffer(max_age_s=max_age)
self._wheel_buf = OdomBuffer(max_age_s=max_age)
self.create_subscription(
Odometry, vo_topic, self._on_vo, _SENSOR_QOS)
self.create_subscription(
Odometry, wheel_topic, self._on_wheel, _SENSOR_QOS)
self._pub_detected = self.create_publisher(
Bool, '/saltybot/vo_drift_detected', 10)
self._pub_magnitude = self.create_publisher(
Float32, '/saltybot/vo_drift_magnitude', 10)
self.create_timer(1.0 / publish_hz, self._tick)
self.get_logger().info(
f'vo_drift_node ready — '
f'vo={vo_topic} wheel={wheel_topic} '
f'threshold={self._thresh}m window={self._window_s}s'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _on_vo(self, msg: Odometry) -> None:
s = _odom_to_sample(msg)
self._vo_buf.push(s)
def _on_wheel(self, msg: Odometry) -> None:
s = _odom_to_sample(msg)
self._wheel_buf.push(s)
# ── Publish tick ──────────────────────────────────────────────────────────
def _tick(self) -> None:
now = time.monotonic()
result = compute_drift(
self._vo_buf, self._wheel_buf,
window_s=self._window_s,
drift_threshold_m=self._thresh,
now=now,
)
if result.is_drifting:
self.get_logger().warn(
f'VO drift detected: {result.drift_m:.3f}m '
f'(vo={result.vo_path_m:.3f}m wheel={result.wheel_path_m:.3f}m '
f'over {result.window_s:.1f}s)',
throttle_duration_sec=5.0,
)
det_msg = Bool()
det_msg.data = result.is_drifting
self._pub_detected.publish(det_msg)
mag_msg = Float32()
mag_msg.data = float(result.drift_m)
self._pub_magnitude.publish(mag_msg)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _odom_to_sample(msg: Odometry) -> OdomSample:
"""Convert nav_msgs/Odometry to OdomSample using monotonic clock."""
return OdomSample(
t=time.monotonic(),
x=msg.pose.pose.position.x,
y=msg.pose.pose.position.y,
)
def main(args=None) -> None:
rclpy.init(args=args)
node = VoDriftNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -35,6 +35,8 @@ setup(
'lidar_clustering = saltybot_bringup.lidar_clustering_node:main',
# Floor surface type classifier (Issue #249)
'floor_classifier = saltybot_bringup.floor_classifier_node:main',
# Visual odometry drift detector (Issue #260)
'vo_drift_detector = saltybot_bringup.vo_drift_node:main',
],
},
)

View File

@ -0,0 +1,297 @@
"""
test_vo_drift.py Unit tests for VO drift detector helpers (no ROS2 required).
Covers:
OdomBuffer:
- push/len
- window returns only samples within cutoff
- old samples are evicted beyond max_age_s
- clear empties the buffer
- window on empty buffer returns empty list
_path_length (via compute_drift with crafted samples):
- stationary source path = 0
- straight-line motion path = total distance
- L-shaped path path = sum of two legs
compute_drift:
- both empty DriftResult with zeros, is_drifting=False
- one buffer < 2 samples zero drift
- both move same distance drift 0, not drifting
- VO moves 1m, wheel moves 0.5m drift = 0.5m
- drift == threshold is_drifting=True (>=)
- drift < threshold is_drifting=False
- drift > threshold is_drifting=True
- path lengths in result match expectation
- n_vo / n_wheel counts correct
- samples outside window ignored
- window_s in result reflects actual data span
"""
import sys
import os
import math
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._vo_drift import (
OdomSample,
OdomBuffer,
DriftResult,
compute_drift,
_path_length,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _s(t, x, y) -> OdomSample:
return OdomSample(t=t, x=x, y=y)
def _straight_buf(n=5, speed=0.1, t_start=0.0, dt=1.0,
max_age_s=30.0) -> OdomBuffer:
"""n samples moving along +x at `speed` m/s."""
buf = OdomBuffer(max_age_s=max_age_s)
for i in range(n):
buf.push(_s(t_start + i * dt, x=i * speed * dt, y=0.0))
return buf
def _stationary_buf(n=5, t_start=0.0, dt=1.0,
max_age_s=30.0) -> OdomBuffer:
buf = OdomBuffer(max_age_s=max_age_s)
for i in range(n):
buf.push(_s(t_start + i * dt, x=0.0, y=0.0))
return buf
# ── OdomBuffer ────────────────────────────────────────────────────────────────
class TestOdomBuffer:
def test_push_increases_len(self):
buf = OdomBuffer()
assert len(buf) == 0
buf.push(_s(0.0, 0.0, 0.0))
assert len(buf) == 1
def test_window_returns_all_within_cutoff(self):
buf = OdomBuffer(max_age_s=30.0)
for t in [0.0, 5.0, 10.0]:
buf.push(_s(t, 0.0, 0.0))
samples = buf.window(window_s=10.0, now=10.0)
assert len(samples) == 3
def test_window_excludes_old_samples(self):
buf = OdomBuffer(max_age_s=30.0)
for t in [0.0, 5.0, 15.0]:
buf.push(_s(t, 0.0, 0.0))
# window=5s from now=15 → only t=15 qualifies (t>=10)
samples = buf.window(window_s=5.0, now=15.0)
assert len(samples) == 1
assert samples[0].t == 15.0
def test_evicts_samples_beyond_max_age(self):
buf = OdomBuffer(max_age_s=5.0)
buf.push(_s(0.0, 0.0, 0.0))
buf.push(_s(10.0, 1.0, 0.0)) # now=10 → t=0 is 10s old > 5s max
assert len(buf) == 1
def test_clear_empties_buffer(self):
buf = _straight_buf(n=5)
buf.clear()
assert len(buf) == 0
def test_window_on_empty_buffer(self):
buf = OdomBuffer()
assert buf.window(window_s=10.0, now=100.0) == []
def test_window_boundary_inclusive(self):
"""Sample exactly at window cutoff (t == now - window_s) is included."""
buf = OdomBuffer(max_age_s=30.0)
buf.push(_s(0.0, 0.0, 0.0))
# window=10, now=10 → cutoff=0.0, sample at t=0.0 should be included
samples = buf.window(window_s=10.0, now=10.0)
assert len(samples) == 1
# ── _path_length ──────────────────────────────────────────────────────────────
class TestPathLength:
def test_stationary_path_zero(self):
samples = [_s(i, 0.0, 0.0) for i in range(5)]
assert _path_length(samples) == pytest.approx(0.0)
def test_unit_step_path(self):
samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 0.0)]
assert _path_length(samples) == pytest.approx(1.0)
def test_two_unit_steps(self):
samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 0.0), _s(2, 2.0, 0.0)]
assert _path_length(samples) == pytest.approx(2.0)
def test_diagonal_step(self):
# (0,0) → (1,1): distance = sqrt(2)
samples = [_s(0, 0.0, 0.0), _s(1, 1.0, 1.0)]
assert _path_length(samples) == pytest.approx(math.sqrt(2))
def test_l_shaped_path(self):
# Right 3m then up 4m → total path = 7m (not hypotenuse)
samples = [_s(0, 0.0, 0.0), _s(1, 3.0, 0.0), _s(2, 3.0, 4.0)]
assert _path_length(samples) == pytest.approx(7.0)
def test_single_sample_returns_zero(self):
assert _path_length([_s(0, 5.0, 5.0)]) == pytest.approx(0.0)
def test_empty_returns_zero(self):
assert _path_length([]) == pytest.approx(0.0)
# ── compute_drift ─────────────────────────────────────────────────────────────
class TestComputeDrift:
def test_both_empty_returns_zero_drift(self):
result = compute_drift(
OdomBuffer(), OdomBuffer(),
window_s=10.0, drift_threshold_m=0.5, now=10.0)
assert result.drift_m == pytest.approx(0.0)
assert not result.is_drifting
def test_one_buffer_empty_returns_zero(self):
vo = _straight_buf(n=5, speed=0.1)
result = compute_drift(
vo, OdomBuffer(),
window_s=10.0, drift_threshold_m=0.5, now=5.0)
assert result.drift_m == pytest.approx(0.0)
assert not result.is_drifting
def test_one_buffer_single_sample_returns_zero(self):
vo = _straight_buf(n=5, speed=0.1)
wheel = OdomBuffer()
wheel.push(_s(0.0, 0.0, 0.0)) # only 1 sample
result = compute_drift(
vo, wheel,
window_s=10.0, drift_threshold_m=0.5, now=5.0)
assert result.drift_m == pytest.approx(0.0)
assert not result.is_drifting
def test_both_move_same_distance_zero_drift(self):
# Both move 0.1 m/s for 4 steps → 0.4 m each
vo = _straight_buf(n=5, speed=0.1, dt=1.0)
wheel = _straight_buf(n=5, speed=0.1, dt=1.0)
result = compute_drift(
vo, wheel,
window_s=10.0, drift_threshold_m=0.5, now=5.0)
assert result.drift_m == pytest.approx(0.0, abs=1e-9)
assert not result.is_drifting
def test_both_stationary_zero_drift(self):
vo = _stationary_buf(n=5)
wheel = _stationary_buf(n=5)
result = compute_drift(
vo, wheel,
window_s=10.0, drift_threshold_m=0.5, now=5.0)
assert result.drift_m == pytest.approx(0.0)
assert not result.is_drifting
def test_drift_equals_path_length_difference(self):
# VO moves 1.0 m total, wheel moves 0.5 m total
vo = _straight_buf(n=11, speed=0.1, dt=1.0) # 10 steps × 0.1 = 1.0m
wheel = _straight_buf(n=11, speed=0.05, dt=1.0) # 10 steps × 0.05 = 0.5m
result = compute_drift(
vo, wheel,
window_s=15.0, drift_threshold_m=0.5, now=11.0)
assert result.vo_path_m == pytest.approx(1.0, abs=1e-9)
assert result.wheel_path_m == pytest.approx(0.5, abs=1e-9)
assert result.drift_m == pytest.approx(0.5, abs=1e-9)
def test_drift_at_threshold_is_drifting(self):
# drift == 0.5 → is_drifting = True (>= threshold)
vo = _straight_buf(n=11, speed=0.1, dt=1.0)
wheel = _straight_buf(n=11, speed=0.05, dt=1.0)
result = compute_drift(
vo, wheel,
window_s=15.0, drift_threshold_m=0.5, now=11.0)
assert result.is_drifting
def test_drift_below_threshold_not_drifting(self):
vo = _straight_buf(n=11, speed=0.1, dt=1.0)
wheel = _straight_buf(n=11, speed=0.08, dt=1.0)
result = compute_drift(
vo, wheel,
window_s=15.0, drift_threshold_m=0.5, now=11.0)
# drift = |1.0 - 0.8| = 0.2
assert result.drift_m == pytest.approx(0.2, abs=1e-9)
assert not result.is_drifting
def test_drift_above_threshold_is_drifting(self):
vo = _straight_buf(n=11, speed=0.1, dt=1.0)
wheel = _stationary_buf(n=11, dt=1.0)
result = compute_drift(
vo, wheel,
window_s=15.0, drift_threshold_m=0.5, now=11.0)
# drift = |1.0 - 0.0| = 1.0 > 0.5
assert result.drift_m > 0.5
assert result.is_drifting
def test_n_vo_n_wheel_counts(self):
vo = _straight_buf(n=8, dt=1.0)
wheel = _straight_buf(n=5, dt=1.0)
result = compute_drift(
vo, wheel,
window_s=15.0, drift_threshold_m=0.5, now=8.0)
assert result.n_vo == 8
assert result.n_wheel == 5
def test_samples_outside_window_ignored(self):
# Push old samples far in the past; should not contribute to window
vo = OdomBuffer(max_age_s=60.0)
wheel = OdomBuffer(max_age_s=60.0)
# Old samples outside window (t=0..4, window is last 3s from now=10)
for t in range(5):
vo.push(_s(float(t), x=float(t), y=0.0))
wheel.push(_s(float(t), x=float(t), y=0.0))
# Recent samples inside window (t=7..10)
for t in range(7, 11):
vo.push(_s(float(t), x=float(t) * 0.1, y=0.0))
wheel.push(_s(float(t), x=float(t) * 0.1, y=0.0))
result = compute_drift(
vo, wheel,
window_s=3.0, drift_threshold_m=0.5, now=10.0)
# Both sources move identically inside window → zero drift
assert result.drift_m == pytest.approx(0.0, abs=1e-9)
# Only the 4 recent samples (t=7,8,9,10) in window
assert result.n_vo == 4
assert result.n_wheel == 4
def test_result_is_namedtuple(self):
result = compute_drift(
_straight_buf(), _straight_buf(),
window_s=10.0, drift_threshold_m=0.5, now=5.0)
assert hasattr(result, 'drift_m')
assert hasattr(result, 'vo_path_m')
assert hasattr(result, 'wheel_path_m')
assert hasattr(result, 'is_drifting')
assert hasattr(result, 'window_s')
assert hasattr(result, 'n_vo')
assert hasattr(result, 'n_wheel')
def test_wheel_faster_than_vo_still_drifts(self):
"""Drift is absolute difference — direction doesn't matter."""
vo = _stationary_buf(n=11, dt=1.0)
wheel = _straight_buf(n=11, speed=0.1, dt=1.0)
result = compute_drift(
vo, wheel,
window_s=15.0, drift_threshold_m=0.5, now=11.0)
assert result.drift_m == pytest.approx(1.0, abs=1e-9)
assert result.is_drifting
if __name__ == '__main__':
pytest.main([__file__, '-v'])