feat: Add Issue #216 - Odometry fusion node (complementary filter)

Fuses wheel, visual, and IMU odometry using complementary filtering. Publishes
fused /odom (nav_msgs/Odometry) and broadcasts odom→base_link TF at 50Hz.

Sensor Fusion Strategy:
  - Wheel odometry: High-frequency accurate linear displacement (weight: 0.6)
  - Visual odometry: Loop closure and long-term drift correction (weight: 0.3)
  - IMU: High-frequency attitude and angular velocity (weight: 0.1)

Complementary Filter Architecture:
  - Fast loop (IMU): High-frequency attitude updates, angular velocity
  - Slow loop (Vision): Low-frequency position/orientation correction
  - Integration: Velocity-based position updates with covariance weighting
  - Dropout handling: Continues with available sources if sensors drop

Fusion Algorithm:
  1. Extract velocities from wheel odometry (most reliable linear)
  2. Apply IMU angular velocity (highest frequency rotation)
  3. Update orientation from IMU with blending
  4. Integrate velocities to position (wheel odometry frame)
  5. Apply visual odometry drift correction (low-frequency)
  6. Update covariances based on available measurements
  7. Publish fused odometry with full covariance matrices

Published Topics:
  - /odom (nav_msgs/Odometry) - Fused pose/twist with covariance
  - /saltybot/odom_fusion_info (std_msgs/String) - JSON debug info

TF Broadcasts:
  - odom→base_link - Position (x, y) and orientation (yaw)

Subscribed Topics:
  - /saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry
  - /rtab_map/odom (nav_msgs/Odometry) - Visual/SLAM odometry
  - /imu/data (sensor_msgs/Imu) - IMU data

Package: saltybot_odom_fusion
Entry point: odom_fusion_node
Frequency: 50Hz (20ms cycle)

Features:
  ✓ Multi-source odometry fusion
  ✓ Complementary filtering with configurable weights
  ✓ Full covariance matrices for uncertainty tracking
  ✓ TF2 transform broadcasting
  ✓ Sensor dropout handling
  ✓ JSON telemetry with fusion status
  ✓ Configurable normalization of weights

Tests: 20+ unit tests covering:
  - OdomState initialization and covariances
  - Subscription handling for all three sensors
  - Position integration from velocity
  - Angular velocity updates
  - Velocity blending from multiple sources
  - Drift correction from visual odometry
  - Covariance updates based on measurement availability
  - Quaternion to Euler angle conversion
  - Realistic fusion scenarios (straight line, circles, drift correction)
  - Sensor dropout and recovery

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
sl-controls 2026-03-02 11:50:13 -05:00
parent a05c93669e
commit 938c2a33d4
10 changed files with 809 additions and 0 deletions

View File

@ -0,0 +1,16 @@
# Odometry fusion configuration (complementary filter)
odom_fusion:
ros__parameters:
# Publishing frequency in Hz
frequency: 50 # 50 Hz = 20ms cycle
# Sensor weights (will be normalized)
# Wheel odometry: reliable linear displacement
wheel_weight: 0.6
# Visual odometry: loop closure and long-term correction
visual_weight: 0.3
# IMU: high-frequency attitude and angular velocity
imu_weight: 0.1

View File

@ -0,0 +1,36 @@
"""Launch file for odom_fusion_node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for odometry fusion."""
# Package directory
pkg_dir = get_package_share_directory("saltybot_odom_fusion")
# Parameters
config_file = os.path.join(pkg_dir, "config", "fusion_config.yaml")
# Declare launch arguments
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
# Odometry fusion node
Node(
package="saltybot_odom_fusion",
executable="odom_fusion_node",
name="odom_fusion",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_odom_fusion</name>
<version>0.1.0</version>
<description>
Odometry fusion node for SaltyBot: fuses wheel, visual, and IMU odometry
using complementary filtering. Publishes fused /odom and broadcasts
odom→base_link TF at 50Hz. Handles sensor dropout and covariance weighting.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>tf2_ros</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,332 @@
#!/usr/bin/env python3
"""Odometry fusion node for SaltyBot.
Fuses wheel odometry, visual odometry, and IMU data using complementary filtering.
Publishes fused /odom and broadcasts odombase_link TF at 50Hz.
Published topics:
/odom (nav_msgs/Odometry) - Fused odometry with covariance
TF broadcasts:
odombase_link - Position and orientation transform
Subscribed topics:
/saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry
/rtab_map/odom (nav_msgs/Odometry) - Visual odometry (SLAM)
/imu/data (sensor_msgs/Imu) - Inertial measurement unit
"""
import math
import json
from typing import Optional, Tuple
from dataclasses import dataclass, field
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.timer import Timer
from geometry_msgs.msg import Pose, Twist, TransformStamped, Quaternion
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from tf2_ros import TransformBroadcaster
from tf_transformations import quaternion_from_euler, euler_from_quaternion
@dataclass
class OdomState:
"""Fused odometry state."""
x: float = 0.0 # Position in odom frame
y: float = 0.0
theta: float = 0.0 # Orientation (yaw)
vx: float = 0.0 # Velocity in body frame
vy: float = 0.0
omega: float = 0.0 # Angular velocity
cov_pos: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Position covariance
cov_theta: float = 0.01 # Orientation covariance
cov_vel: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Velocity covariance
class OdomFusionNode(Node):
"""ROS2 node for odometry fusion."""
def __init__(self):
super().__init__("odom_fusion")
# Parameters
self.declare_parameter("frequency", 50) # Hz
self.declare_parameter("wheel_weight", 0.6) # Weight for wheel odometry
self.declare_parameter("visual_weight", 0.3) # Weight for visual odometry
self.declare_parameter("imu_weight", 0.1) # Weight for IMU
frequency = self.get_parameter("frequency").value
self.wheel_weight = self.get_parameter("wheel_weight").value
self.visual_weight = self.get_parameter("visual_weight").value
self.imu_weight = self.get_parameter("imu_weight").value
# Normalize weights
total_weight = self.wheel_weight + self.visual_weight + self.imu_weight
self.wheel_weight /= total_weight
self.visual_weight /= total_weight
self.imu_weight /= total_weight
# State
self.state = OdomState()
self.last_time = None
# Last received messages
self.last_wheel_odom: Optional[Odometry] = None
self.last_visual_odom: Optional[Odometry] = None
self.last_imu: Optional[Imu] = None
# Subscriptions
self.create_subscription(
Odometry, "/saltybot/wheel_odom", self._on_wheel_odom, 10
)
self.create_subscription(
Odometry, "/rtab_map/odom", self._on_visual_odom, 10
)
self.create_subscription(Imu, "/imu/data", self._on_imu, 10)
# Publications
self.pub_odom = self.create_publisher(Odometry, "/odom", 10)
self.pub_info = self.create_publisher(String, "/saltybot/odom_fusion_info", 10)
# TF broadcaster
self.tf_broadcaster = TransformBroadcaster(self)
# Timer for periodic fusion and publishing at 50Hz
period = 1.0 / frequency
self.timer: Timer = self.create_timer(period, self._timer_callback)
self.get_logger().info(
f"Odometry fusion initialized at {frequency}Hz. "
f"Weights - wheel: {self.wheel_weight:.2f}, "
f"visual: {self.visual_weight:.2f}, "
f"imu: {self.imu_weight:.2f}"
)
def _on_wheel_odom(self, msg: Odometry) -> None:
"""Update wheel odometry."""
self.last_wheel_odom = msg
def _on_visual_odom(self, msg: Odometry) -> None:
"""Update visual odometry."""
self.last_visual_odom = msg
def _on_imu(self, msg: Imu) -> None:
"""Update IMU data."""
self.last_imu = msg
def _timer_callback(self) -> None:
"""Fuse odometry sources and publish at 50Hz."""
now = self.get_clock().now()
if self.last_time is None:
self.last_time = now
return
dt = (now - self.last_time).nanoseconds / 1e9
self.last_time = now
# Fuse available sources
self._fuse_odometry(dt)
# Publish fused odometry
self._publish_odometry(now)
# Broadcast TF
self._broadcast_tf(now)
# Publish debug info
self._publish_info()
def _fuse_odometry(self, dt: float) -> None:
"""Fuse odometry sources using complementary filtering."""
if dt <= 0 or dt > 1.0: # Safety check on dt
return
# Collect available measurements
measurements = {}
if self.last_wheel_odom is not None:
measurements["wheel"] = self.last_wheel_odom
if self.last_visual_odom is not None:
measurements["visual"] = self.last_visual_odom
if self.last_imu is not None:
measurements["imu"] = self.last_imu
if not measurements:
return # No data to fuse
# Complementary filter approach
# High-frequency IMU for attitude, low-frequency vision for position correction
# Extract velocities from wheel odometry (most reliable for linear motion)
if "wheel" in measurements:
wheel_odom = measurements["wheel"]
self.state.vx = wheel_odom.twist.twist.linear.x * self.wheel_weight
self.state.vy = wheel_odom.twist.twist.linear.y * self.wheel_weight
# Integrate IMU angular velocity (high-frequency)
if "imu" in measurements:
imu = measurements["imu"]
# Use IMU for angular velocity (most accurate for rotation)
self.state.omega = imu.angular_velocity.z * self.imu_weight
# Update orientation from IMU
roll, pitch, yaw = euler_from_quaternion([
imu.orientation.x, imu.orientation.y,
imu.orientation.z, imu.orientation.w
])
# Blend with existing estimate
self.state.theta = (0.8 * self.state.theta + 0.2 * yaw)
# Apply velocity-based position update
self.state.x += self.state.vx * dt * math.cos(self.state.theta)
self.state.x -= self.state.vy * dt * math.sin(self.state.theta)
self.state.y += self.state.vx * dt * math.sin(self.state.theta)
self.state.y += self.state.vy * dt * math.cos(self.state.theta)
# Visual odometry correction (low-frequency drift correction)
if "visual" in measurements and self.visual_weight > 0:
visual_odom = measurements["visual"]
visual_x = visual_odom.pose.pose.position.x
visual_y = visual_odom.pose.pose.position.y
# Gradually correct position drift using visual odometry
correction_factor = self.visual_weight * 0.1 # Small correction rate
self.state.x += (visual_x - self.state.x) * correction_factor
self.state.y += (visual_y - self.state.y) * correction_factor
# Correct orientation from visual odometry
visual_roll, visual_pitch, visual_yaw = euler_from_quaternion([
visual_odom.pose.pose.orientation.x,
visual_odom.pose.pose.orientation.y,
visual_odom.pose.pose.orientation.z,
visual_odom.pose.pose.orientation.w
])
self.state.theta += (visual_yaw - self.state.theta) * correction_factor
# Update covariances based on available measurements
self._update_covariances(measurements)
def _update_covariances(self, measurements: dict) -> None:
"""Update state covariance based on measurement sources."""
# Start with baseline
base_pos_cov = 0.01
base_theta_cov = 0.01
# Reduce covariance if we have good measurements
if "wheel" in measurements:
base_pos_cov *= 0.5
if "visual" in measurements:
base_pos_cov *= 0.7
if "imu" in measurements:
base_theta_cov *= 0.3
# Update covariance matrices
self.state.cov_pos = np.eye(2) * base_pos_cov
self.state.cov_theta = base_theta_cov
def _publish_odometry(self, now) -> None:
"""Publish fused odometry message."""
odom_msg = Odometry()
odom_msg.header.stamp = now.to_msg()
odom_msg.header.frame_id = "odom"
odom_msg.child_frame_id = "base_link"
# Position
odom_msg.pose.pose.position.x = self.state.x
odom_msg.pose.pose.position.y = self.state.y
odom_msg.pose.pose.position.z = 0.0
# Orientation (yaw only, roll/pitch from IMU)
q = quaternion_from_euler(0, 0, self.state.theta)
odom_msg.pose.pose.orientation = Quaternion(
x=q[0], y=q[1], z=q[2], w=q[3]
)
# Position covariance (6x6)
pose_cov = np.zeros((6, 6))
pose_cov[0:2, 0:2] = self.state.cov_pos
pose_cov[5, 5] = self.state.cov_theta # Yaw variance
odom_msg.pose.covariance = pose_cov.flatten().tolist()
# Velocity
odom_msg.twist.twist.linear.x = self.state.vx
odom_msg.twist.twist.linear.y = self.state.vy
odom_msg.twist.twist.linear.z = 0.0
odom_msg.twist.twist.angular.z = self.state.omega
# Velocity covariance (6x6)
twist_cov = np.zeros((6, 6))
twist_cov[0:2, 0:2] = self.state.cov_vel
twist_cov[5, 5] = 0.01 # Angular velocity variance
odom_msg.twist.covariance = twist_cov.flatten().tolist()
self.pub_odom.publish(odom_msg)
def _broadcast_tf(self, now) -> None:
"""Broadcast odom→base_link TF."""
t = TransformStamped()
t.header.stamp = now.to_msg()
t.header.frame_id = "odom"
t.child_frame_id = "base_link"
# Translation
t.transform.translation.x = self.state.x
t.transform.translation.y = self.state.y
t.transform.translation.z = 0.0
# Rotation (yaw only)
q = quaternion_from_euler(0, 0, self.state.theta)
t.transform.rotation = Quaternion(
x=q[0], y=q[1], z=q[2], w=q[3]
)
self.tf_broadcaster.sendTransform(t)
def _publish_info(self) -> None:
"""Publish fusion debug information."""
info = {
"timestamp": self.get_clock().now().nanoseconds / 1e9,
"position": [self.state.x, self.state.y],
"theta_deg": math.degrees(self.state.theta),
"velocity": [self.state.vx, self.state.vy],
"omega": self.state.omega,
"sources": {
"wheel": self.last_wheel_odom is not None,
"visual": self.last_visual_odom is not None,
"imu": self.last_imu is not None,
},
"weights": {
"wheel": f"{self.wheel_weight:.2f}",
"visual": f"{self.visual_weight:.2f}",
"imu": f"{self.imu_weight:.2f}",
},
}
msg = String(data=json.dumps(info))
self.pub_info.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = OdomFusionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_odom_fusion
[egg_info]
tag_date = 0

View File

@ -0,0 +1,29 @@
from setuptools import setup
package_name = "saltybot_odom_fusion"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/odom_fusion.launch.py"]),
(f"share/{package_name}/config", ["config/fusion_config.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description=(
"Odometry fusion: wheel + visual + IMU complementary filter with TF broadcast"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"odom_fusion_node = saltybot_odom_fusion.odom_fusion_node:main",
],
},
)

View File

@ -0,0 +1,361 @@
"""Unit tests for odom_fusion_node."""
import pytest
import math
import json
import numpy as np
from geometry_msgs.msg import Pose, Quaternion, Twist, Vector3
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu
from std_msgs.msg import String, Header
from tf_transformations import quaternion_from_euler, euler_from_quaternion
import rclpy
# Import the node and classes under test
from saltybot_odom_fusion.odom_fusion_node import (
OdomFusionNode,
OdomState,
)
@pytest.fixture
def rclpy_fixture():
"""Initialize and cleanup rclpy."""
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
"""Create an odometry fusion node instance."""
node = OdomFusionNode()
yield node
node.destroy_node()
class TestOdomState:
"""Test suite for OdomState dataclass."""
def test_odom_state_initialization(self):
"""Test OdomState initializes with correct defaults."""
state = OdomState()
assert state.x == 0.0
assert state.y == 0.0
assert state.theta == 0.0
assert state.vx == 0.0
assert state.vy == 0.0
assert state.omega == 0.0
def test_odom_state_covariance_initialization(self):
"""Test OdomState covariance matrices."""
state = OdomState()
assert state.cov_pos.shape == (2, 2)
assert state.cov_theta == 0.01
assert state.cov_vel.shape == (2, 2)
class TestOdomFusionNode:
"""Test suite for OdomFusionNode."""
def test_node_initialization(self, node):
"""Test that node initializes with correct defaults."""
assert node.state.x == 0.0
assert node.state.y == 0.0
assert node.state.theta == 0.0
assert node.last_wheel_odom is None
assert node.last_visual_odom is None
assert node.last_imu is None
def test_weight_normalization(self, node):
"""Test that weights are normalized to sum to 1.0."""
total = node.wheel_weight + node.visual_weight + node.imu_weight
assert abs(total - 1.0) < 1e-6
def test_wheel_odom_subscription(self, node):
"""Test wheel odometry message handling."""
odom_msg = Odometry()
odom_msg.twist.twist.linear.x = 1.0
odom_msg.twist.twist.linear.y = 0.5
node._on_wheel_odom(odom_msg)
assert node.last_wheel_odom is not None
assert node.last_wheel_odom.twist.twist.linear.x == 1.0
def test_visual_odom_subscription(self, node):
"""Test visual odometry message handling."""
odom_msg = Odometry()
odom_msg.pose.pose.position.x = 5.0
odom_msg.pose.pose.position.y = 3.0
node._on_visual_odom(odom_msg)
assert node.last_visual_odom is not None
assert node.last_visual_odom.pose.pose.position.x == 5.0
def test_imu_subscription(self, node):
"""Test IMU message handling."""
imu_msg = Imu()
imu_msg.angular_velocity.z = 0.1
node._on_imu(imu_msg)
assert node.last_imu is not None
assert node.last_imu.angular_velocity.z == 0.1
def test_position_integration(self, node):
"""Test position integration from velocity."""
node.state.vx = 1.0 # 1 m/s
node.state.vy = 0.0
node.state.theta = 0.0 # Facing +x direction
dt = 1.0 # 1 second
node._fuse_odometry(dt)
# Should move 1 meter in +x direction
assert abs(node.state.x - 1.0) < 0.1
def test_position_integration_rotated(self, node):
"""Test position integration with rotation."""
node.state.vx = 1.0 # 1 m/s
node.state.vy = 0.0
node.state.theta = math.pi / 2 # Facing +y direction
dt = 1.0
node._fuse_odometry(dt)
# Should move 1 meter in +y direction
assert abs(node.state.y - 1.0) < 0.1
def test_angular_velocity_update(self, node):
"""Test angular velocity from IMU."""
imu_msg = Imu()
imu_msg.angular_velocity.z = 0.2
imu_msg.orientation.w = 1.0
node.last_imu = imu_msg
dt = 0.1
node._fuse_odometry(dt)
# Angular velocity should be weighted by imu_weight
expected_omega = 0.2 * node.imu_weight
assert abs(node.state.omega - expected_omega) < 0.01
def test_velocity_blending_wheel_only(self, node):
"""Test velocity blending with only wheel odometry."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0
wheel_msg.twist.twist.linear.y = 0.0
node.last_wheel_odom = wheel_msg
dt = 0.1
node._fuse_odometry(dt)
# Velocity should be weighted by wheel_weight
expected_vx = 1.0 * node.wheel_weight
assert abs(node.state.vx - expected_vx) < 0.01
def test_visual_drift_correction(self, node):
"""Test drift correction from visual odometry."""
# Current state
node.state.x = 0.0
node.state.y = 0.0
# Visual odometry says we're at (1.0, 1.0)
visual_msg = Odometry()
visual_msg.pose.pose.position.x = 1.0
visual_msg.pose.pose.position.y = 1.0
visual_msg.pose.pose.orientation.w = 1.0
node.last_visual_odom = visual_msg
dt = 0.1
node._fuse_odometry(dt)
# Position should have moved toward visual estimate
assert node.state.x > 0.0
assert node.state.y > 0.0
def test_covariance_updates(self, node):
"""Test that covariances are updated based on measurements."""
wheel_msg = Odometry()
visual_msg = Odometry()
visual_msg.pose.pose.orientation.w = 1.0
node.last_wheel_odom = wheel_msg
node.last_visual_odom = visual_msg
measurements = {
"wheel": wheel_msg,
"visual": visual_msg,
}
node._update_covariances(measurements)
# Covariance should be reduced with good measurements
assert node.state.cov_pos[0, 0] < 0.01
def test_quaternion_to_yaw(self, node):
"""Test quaternion to yaw conversion."""
# Create 90 degree yaw rotation
q = quaternion_from_euler(0, 0, math.pi / 2)
imu_msg = Imu()
imu_msg.orientation.x = q[0]
imu_msg.orientation.y = q[1]
imu_msg.orientation.z = q[2]
imu_msg.orientation.w = q[3]
node.last_imu = imu_msg
dt = 0.1
node._fuse_odometry(dt)
# Theta should be blended toward 90 degrees
assert node.state.theta > 0.0
def test_velocity_near_zero(self, node):
"""Test handling of near-zero velocities."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 0.001 # Very small
node.last_wheel_odom = wheel_msg
dt = 0.1
node._fuse_odometry(dt)
# Position change should be minimal
assert abs(node.state.x) < 0.0001
def test_invalid_dt_ignored(self, node):
"""Test that invalid dt values are ignored."""
initial_x = node.state.x
# Zero dt
node._fuse_odometry(0.0)
assert node.state.x == initial_x
# Negative dt
node._fuse_odometry(-1.0)
assert node.state.x == initial_x
# Very large dt
node._fuse_odometry(10.0)
assert node.state.x == initial_x
class TestOdomFusionScenarios:
"""Integration-style tests for realistic scenarios."""
def test_scenario_straight_line(self, node):
"""Scenario: robot moving in straight line."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
wheel_msg.twist.twist.linear.y = 0.0
node.last_wheel_odom = wheel_msg
node.state.theta = 0.0 # Facing +x
# Move for 5 seconds
for _ in range(5):
node._fuse_odometry(1.0)
# Should be approximately 5 meters in +x direction
assert 4.0 < node.state.x < 6.0
assert abs(node.state.y) < 0.5
def test_scenario_circular_motion(self, node):
"""Scenario: robot moving in circle."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
imu_msg = Imu()
imu_msg.angular_velocity.z = 0.5 # 0.5 rad/s rotation
imu_msg.orientation.w = 1.0
node.last_wheel_odom = wheel_msg
node.last_imu = imu_msg
# Simulate circular motion for 4 cycles
for i in range(4):
node._fuse_odometry(1.0)
# Should have moved distance and rotated
distance = math.sqrt(node.state.x**2 + node.state.y**2)
assert distance > 1.0
def test_scenario_drift_correction(self, node):
"""Scenario: wheel odometry drifts, vision corrects."""
# Wheel says we're at 10, 10
node.state.x = 10.0
node.state.y = 10.0
# Vision says we're actually at 9, 9
visual_msg = Odometry()
visual_msg.pose.pose.position.x = 9.0
visual_msg.pose.pose.position.y = 9.0
visual_msg.pose.pose.orientation.w = 1.0
node.last_visual_odom = visual_msg
# Before correction
initial_x = node.state.x
initial_y = node.state.y
node._fuse_odometry(1.0)
# Position should move toward visual estimate
assert node.state.x < initial_x
assert node.state.y < initial_y
def test_scenario_all_sensors_available(self, node):
"""Scenario: all three sensor sources available."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 0.5
visual_msg = Odometry()
visual_msg.pose.pose.position.x = 2.0
visual_msg.pose.pose.position.y = 1.0
visual_msg.pose.pose.orientation.w = 1.0
imu_msg = Imu()
q = quaternion_from_euler(0, 0, 0.1) # Small yaw
imu_msg.orientation.x = q[0]
imu_msg.orientation.y = q[1]
imu_msg.orientation.z = q[2]
imu_msg.orientation.w = q[3]
imu_msg.angular_velocity.z = 0.05
node.last_wheel_odom = wheel_msg
node.last_visual_odom = visual_msg
node.last_imu = imu_msg
measurements = {
"wheel": wheel_msg,
"visual": visual_msg,
"imu": imu_msg,
}
node._update_covariances(measurements)
# Covariance should be low with all sources
assert node.state.cov_pos[0, 0] < 0.01
assert node.state.cov_theta < 0.01
def test_scenario_sensor_dropout(self, node):
"""Scenario: visual sensor drops out, wheel continues."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0
node.last_wheel_odom = wheel_msg
node.last_visual_odom = None # Visual dropout
node.last_imu = None
# Should continue with wheel only
node._fuse_odometry(1.0)
# Position should still integrate
assert node.state.x > 0.0