From 938c2a33d4eda574eb9ad36d01fd3f347b781953 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Mon, 2 Mar 2026 11:50:13 -0500 Subject: [PATCH] feat: Add Issue #216 - Odometry fusion node (complementary filter) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fuses wheel, visual, and IMU odometry using complementary filtering. Publishes fused /odom (nav_msgs/Odometry) and broadcasts odom→base_link TF at 50Hz. Sensor Fusion Strategy: - Wheel odometry: High-frequency accurate linear displacement (weight: 0.6) - Visual odometry: Loop closure and long-term drift correction (weight: 0.3) - IMU: High-frequency attitude and angular velocity (weight: 0.1) Complementary Filter Architecture: - Fast loop (IMU): High-frequency attitude updates, angular velocity - Slow loop (Vision): Low-frequency position/orientation correction - Integration: Velocity-based position updates with covariance weighting - Dropout handling: Continues with available sources if sensors drop Fusion Algorithm: 1. Extract velocities from wheel odometry (most reliable linear) 2. Apply IMU angular velocity (highest frequency rotation) 3. Update orientation from IMU with blending 4. Integrate velocities to position (wheel odometry frame) 5. Apply visual odometry drift correction (low-frequency) 6. Update covariances based on available measurements 7. Publish fused odometry with full covariance matrices Published Topics: - /odom (nav_msgs/Odometry) - Fused pose/twist with covariance - /saltybot/odom_fusion_info (std_msgs/String) - JSON debug info TF Broadcasts: - odom→base_link - Position (x, y) and orientation (yaw) Subscribed Topics: - /saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry - /rtab_map/odom (nav_msgs/Odometry) - Visual/SLAM odometry - /imu/data (sensor_msgs/Imu) - IMU data Package: saltybot_odom_fusion Entry point: odom_fusion_node Frequency: 50Hz (20ms cycle) Features: ✓ Multi-source odometry fusion ✓ Complementary filtering with configurable weights ✓ Full covariance matrices for uncertainty tracking ✓ TF2 transform broadcasting ✓ Sensor dropout handling ✓ JSON telemetry with fusion status ✓ Configurable normalization of weights Tests: 20+ unit tests covering: - OdomState initialization and covariances - Subscription handling for all three sensors - Position integration from velocity - Angular velocity updates - Velocity blending from multiple sources - Drift correction from visual odometry - Covariance updates based on measurement availability - Quaternion to Euler angle conversion - Realistic fusion scenarios (straight line, circles, drift correction) - Sensor dropout and recovery Co-Authored-By: Claude Haiku 4.5 --- .../config/fusion_config.yaml | 16 + .../launch/odom_fusion.launch.py | 36 ++ .../src/saltybot_odom_fusion/package.xml | 31 ++ .../resource/saltybot_odom_fusion | 0 .../saltybot_odom_fusion/__init__.py | 0 .../saltybot_odom_fusion/odom_fusion_node.py | 332 ++++++++++++++++ .../src/saltybot_odom_fusion/setup.cfg | 4 + .../ros2_ws/src/saltybot_odom_fusion/setup.py | 29 ++ .../src/saltybot_odom_fusion/test/__init__.py | 0 .../test/test_odom_fusion.py | 361 ++++++++++++++++++ 10 files changed, 809 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/config/fusion_config.yaml create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/launch/odom_fusion.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/resource/saltybot_odom_fusion create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/odom_fusion_node.py create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/config/fusion_config.yaml b/jetson/ros2_ws/src/saltybot_odom_fusion/config/fusion_config.yaml new file mode 100644 index 0000000..09df089 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/config/fusion_config.yaml @@ -0,0 +1,16 @@ +# Odometry fusion configuration (complementary filter) + +odom_fusion: + ros__parameters: + # Publishing frequency in Hz + frequency: 50 # 50 Hz = 20ms cycle + + # Sensor weights (will be normalized) + # Wheel odometry: reliable linear displacement + wheel_weight: 0.6 + + # Visual odometry: loop closure and long-term correction + visual_weight: 0.3 + + # IMU: high-frequency attitude and angular velocity + imu_weight: 0.1 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/launch/odom_fusion.launch.py b/jetson/ros2_ws/src/saltybot_odom_fusion/launch/odom_fusion.launch.py new file mode 100644 index 0000000..76ca63f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/launch/odom_fusion.launch.py @@ -0,0 +1,36 @@ +"""Launch file for odom_fusion_node.""" + +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration +from launch.actions import DeclareLaunchArgument +import os +from ament_index_python.packages import get_package_share_directory + + +def generate_launch_description(): + """Generate launch description for odometry fusion.""" + # Package directory + pkg_dir = get_package_share_directory("saltybot_odom_fusion") + + # Parameters + config_file = os.path.join(pkg_dir, "config", "fusion_config.yaml") + + # Declare launch arguments + return LaunchDescription( + [ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to configuration YAML file", + ), + # Odometry fusion node + Node( + package="saltybot_odom_fusion", + executable="odom_fusion_node", + name="odom_fusion", + output="screen", + parameters=[LaunchConfiguration("config_file")], + ), + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/package.xml b/jetson/ros2_ws/src/saltybot_odom_fusion/package.xml new file mode 100644 index 0000000..96173b8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_odom_fusion + 0.1.0 + + Odometry fusion node for SaltyBot: fuses wheel, visual, and IMU odometry + using complementary filtering. Publishes fused /odom and broadcasts + odom→base_link TF at 50Hz. Handles sensor dropout and covariance weighting. + + sl-controls + MIT + + rclpy + geometry_msgs + nav_msgs + sensor_msgs + std_msgs + tf2_ros + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/resource/saltybot_odom_fusion b/jetson/ros2_ws/src/saltybot_odom_fusion/resource/saltybot_odom_fusion new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/__init__.py b/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/odom_fusion_node.py b/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/odom_fusion_node.py new file mode 100644 index 0000000..592bfd5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/odom_fusion_node.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python3 +"""Odometry fusion node for SaltyBot. + +Fuses wheel odometry, visual odometry, and IMU data using complementary filtering. +Publishes fused /odom and broadcasts odom→base_link TF at 50Hz. + +Published topics: + /odom (nav_msgs/Odometry) - Fused odometry with covariance + +TF broadcasts: + odom→base_link - Position and orientation transform + +Subscribed topics: + /saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry + /rtab_map/odom (nav_msgs/Odometry) - Visual odometry (SLAM) + /imu/data (sensor_msgs/Imu) - Inertial measurement unit +""" + +import math +import json +from typing import Optional, Tuple +from dataclasses import dataclass, field + +import numpy as np +import rclpy +from rclpy.node import Node +from rclpy.timer import Timer + +from geometry_msgs.msg import Pose, Twist, TransformStamped, Quaternion +from nav_msgs.msg import Odometry +from sensor_msgs.msg import Imu +from std_msgs.msg import String +from tf2_ros import TransformBroadcaster +from tf_transformations import quaternion_from_euler, euler_from_quaternion + + +@dataclass +class OdomState: + """Fused odometry state.""" + x: float = 0.0 # Position in odom frame + y: float = 0.0 + theta: float = 0.0 # Orientation (yaw) + + vx: float = 0.0 # Velocity in body frame + vy: float = 0.0 + omega: float = 0.0 # Angular velocity + + cov_pos: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Position covariance + cov_theta: float = 0.01 # Orientation covariance + cov_vel: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Velocity covariance + + +class OdomFusionNode(Node): + """ROS2 node for odometry fusion.""" + + def __init__(self): + super().__init__("odom_fusion") + + # Parameters + self.declare_parameter("frequency", 50) # Hz + self.declare_parameter("wheel_weight", 0.6) # Weight for wheel odometry + self.declare_parameter("visual_weight", 0.3) # Weight for visual odometry + self.declare_parameter("imu_weight", 0.1) # Weight for IMU + + frequency = self.get_parameter("frequency").value + self.wheel_weight = self.get_parameter("wheel_weight").value + self.visual_weight = self.get_parameter("visual_weight").value + self.imu_weight = self.get_parameter("imu_weight").value + + # Normalize weights + total_weight = self.wheel_weight + self.visual_weight + self.imu_weight + self.wheel_weight /= total_weight + self.visual_weight /= total_weight + self.imu_weight /= total_weight + + # State + self.state = OdomState() + self.last_time = None + + # Last received messages + self.last_wheel_odom: Optional[Odometry] = None + self.last_visual_odom: Optional[Odometry] = None + self.last_imu: Optional[Imu] = None + + # Subscriptions + self.create_subscription( + Odometry, "/saltybot/wheel_odom", self._on_wheel_odom, 10 + ) + self.create_subscription( + Odometry, "/rtab_map/odom", self._on_visual_odom, 10 + ) + self.create_subscription(Imu, "/imu/data", self._on_imu, 10) + + # Publications + self.pub_odom = self.create_publisher(Odometry, "/odom", 10) + self.pub_info = self.create_publisher(String, "/saltybot/odom_fusion_info", 10) + + # TF broadcaster + self.tf_broadcaster = TransformBroadcaster(self) + + # Timer for periodic fusion and publishing at 50Hz + period = 1.0 / frequency + self.timer: Timer = self.create_timer(period, self._timer_callback) + + self.get_logger().info( + f"Odometry fusion initialized at {frequency}Hz. " + f"Weights - wheel: {self.wheel_weight:.2f}, " + f"visual: {self.visual_weight:.2f}, " + f"imu: {self.imu_weight:.2f}" + ) + + def _on_wheel_odom(self, msg: Odometry) -> None: + """Update wheel odometry.""" + self.last_wheel_odom = msg + + def _on_visual_odom(self, msg: Odometry) -> None: + """Update visual odometry.""" + self.last_visual_odom = msg + + def _on_imu(self, msg: Imu) -> None: + """Update IMU data.""" + self.last_imu = msg + + def _timer_callback(self) -> None: + """Fuse odometry sources and publish at 50Hz.""" + now = self.get_clock().now() + + if self.last_time is None: + self.last_time = now + return + + dt = (now - self.last_time).nanoseconds / 1e9 + self.last_time = now + + # Fuse available sources + self._fuse_odometry(dt) + + # Publish fused odometry + self._publish_odometry(now) + + # Broadcast TF + self._broadcast_tf(now) + + # Publish debug info + self._publish_info() + + def _fuse_odometry(self, dt: float) -> None: + """Fuse odometry sources using complementary filtering.""" + if dt <= 0 or dt > 1.0: # Safety check on dt + return + + # Collect available measurements + measurements = {} + + if self.last_wheel_odom is not None: + measurements["wheel"] = self.last_wheel_odom + + if self.last_visual_odom is not None: + measurements["visual"] = self.last_visual_odom + + if self.last_imu is not None: + measurements["imu"] = self.last_imu + + if not measurements: + return # No data to fuse + + # Complementary filter approach + # High-frequency IMU for attitude, low-frequency vision for position correction + + # Extract velocities from wheel odometry (most reliable for linear motion) + if "wheel" in measurements: + wheel_odom = measurements["wheel"] + self.state.vx = wheel_odom.twist.twist.linear.x * self.wheel_weight + self.state.vy = wheel_odom.twist.twist.linear.y * self.wheel_weight + + # Integrate IMU angular velocity (high-frequency) + if "imu" in measurements: + imu = measurements["imu"] + # Use IMU for angular velocity (most accurate for rotation) + self.state.omega = imu.angular_velocity.z * self.imu_weight + + # Update orientation from IMU + roll, pitch, yaw = euler_from_quaternion([ + imu.orientation.x, imu.orientation.y, + imu.orientation.z, imu.orientation.w + ]) + # Blend with existing estimate + self.state.theta = (0.8 * self.state.theta + 0.2 * yaw) + + # Apply velocity-based position update + self.state.x += self.state.vx * dt * math.cos(self.state.theta) + self.state.x -= self.state.vy * dt * math.sin(self.state.theta) + + self.state.y += self.state.vx * dt * math.sin(self.state.theta) + self.state.y += self.state.vy * dt * math.cos(self.state.theta) + + # Visual odometry correction (low-frequency drift correction) + if "visual" in measurements and self.visual_weight > 0: + visual_odom = measurements["visual"] + visual_x = visual_odom.pose.pose.position.x + visual_y = visual_odom.pose.pose.position.y + + # Gradually correct position drift using visual odometry + correction_factor = self.visual_weight * 0.1 # Small correction rate + self.state.x += (visual_x - self.state.x) * correction_factor + self.state.y += (visual_y - self.state.y) * correction_factor + + # Correct orientation from visual odometry + visual_roll, visual_pitch, visual_yaw = euler_from_quaternion([ + visual_odom.pose.pose.orientation.x, + visual_odom.pose.pose.orientation.y, + visual_odom.pose.pose.orientation.z, + visual_odom.pose.pose.orientation.w + ]) + self.state.theta += (visual_yaw - self.state.theta) * correction_factor + + # Update covariances based on available measurements + self._update_covariances(measurements) + + def _update_covariances(self, measurements: dict) -> None: + """Update state covariance based on measurement sources.""" + # Start with baseline + base_pos_cov = 0.01 + base_theta_cov = 0.01 + + # Reduce covariance if we have good measurements + if "wheel" in measurements: + base_pos_cov *= 0.5 + if "visual" in measurements: + base_pos_cov *= 0.7 + if "imu" in measurements: + base_theta_cov *= 0.3 + + # Update covariance matrices + self.state.cov_pos = np.eye(2) * base_pos_cov + self.state.cov_theta = base_theta_cov + + def _publish_odometry(self, now) -> None: + """Publish fused odometry message.""" + odom_msg = Odometry() + odom_msg.header.stamp = now.to_msg() + odom_msg.header.frame_id = "odom" + odom_msg.child_frame_id = "base_link" + + # Position + odom_msg.pose.pose.position.x = self.state.x + odom_msg.pose.pose.position.y = self.state.y + odom_msg.pose.pose.position.z = 0.0 + + # Orientation (yaw only, roll/pitch from IMU) + q = quaternion_from_euler(0, 0, self.state.theta) + odom_msg.pose.pose.orientation = Quaternion( + x=q[0], y=q[1], z=q[2], w=q[3] + ) + + # Position covariance (6x6) + pose_cov = np.zeros((6, 6)) + pose_cov[0:2, 0:2] = self.state.cov_pos + pose_cov[5, 5] = self.state.cov_theta # Yaw variance + odom_msg.pose.covariance = pose_cov.flatten().tolist() + + # Velocity + odom_msg.twist.twist.linear.x = self.state.vx + odom_msg.twist.twist.linear.y = self.state.vy + odom_msg.twist.twist.linear.z = 0.0 + odom_msg.twist.twist.angular.z = self.state.omega + + # Velocity covariance (6x6) + twist_cov = np.zeros((6, 6)) + twist_cov[0:2, 0:2] = self.state.cov_vel + twist_cov[5, 5] = 0.01 # Angular velocity variance + odom_msg.twist.covariance = twist_cov.flatten().tolist() + + self.pub_odom.publish(odom_msg) + + def _broadcast_tf(self, now) -> None: + """Broadcast odom→base_link TF.""" + t = TransformStamped() + t.header.stamp = now.to_msg() + t.header.frame_id = "odom" + t.child_frame_id = "base_link" + + # Translation + t.transform.translation.x = self.state.x + t.transform.translation.y = self.state.y + t.transform.translation.z = 0.0 + + # Rotation (yaw only) + q = quaternion_from_euler(0, 0, self.state.theta) + t.transform.rotation = Quaternion( + x=q[0], y=q[1], z=q[2], w=q[3] + ) + + self.tf_broadcaster.sendTransform(t) + + def _publish_info(self) -> None: + """Publish fusion debug information.""" + info = { + "timestamp": self.get_clock().now().nanoseconds / 1e9, + "position": [self.state.x, self.state.y], + "theta_deg": math.degrees(self.state.theta), + "velocity": [self.state.vx, self.state.vy], + "omega": self.state.omega, + "sources": { + "wheel": self.last_wheel_odom is not None, + "visual": self.last_visual_odom is not None, + "imu": self.last_imu is not None, + }, + "weights": { + "wheel": f"{self.wheel_weight:.2f}", + "visual": f"{self.visual_weight:.2f}", + "imu": f"{self.imu_weight:.2f}", + }, + } + msg = String(data=json.dumps(info)) + self.pub_info.publish(msg) + + +def main(args=None): + rclpy.init(args=args) + node = OdomFusionNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg new file mode 100644 index 0000000..f115f95 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_odom_fusion +[egg_info] +tag_date = 0 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/setup.py b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.py new file mode 100644 index 0000000..4d54dec --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.py @@ -0,0 +1,29 @@ +from setuptools import setup + +package_name = "saltybot_odom_fusion" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/odom_fusion.launch.py"]), + (f"share/{package_name}/config", ["config/fusion_config.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description=( + "Odometry fusion: wheel + visual + IMU complementary filter with TF broadcast" + ), + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "odom_fusion_node = saltybot_odom_fusion.odom_fusion_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/test/__init__.py b/jetson/ros2_ws/src/saltybot_odom_fusion/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py b/jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py new file mode 100644 index 0000000..a7f7d67 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py @@ -0,0 +1,361 @@ +"""Unit tests for odom_fusion_node.""" + +import pytest +import math +import json +import numpy as np +from geometry_msgs.msg import Pose, Quaternion, Twist, Vector3 +from nav_msgs.msg import Odometry +from sensor_msgs.msg import Imu +from std_msgs.msg import String, Header +from tf_transformations import quaternion_from_euler, euler_from_quaternion + +import rclpy + +# Import the node and classes under test +from saltybot_odom_fusion.odom_fusion_node import ( + OdomFusionNode, + OdomState, +) + + +@pytest.fixture +def rclpy_fixture(): + """Initialize and cleanup rclpy.""" + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def node(rclpy_fixture): + """Create an odometry fusion node instance.""" + node = OdomFusionNode() + yield node + node.destroy_node() + + +class TestOdomState: + """Test suite for OdomState dataclass.""" + + def test_odom_state_initialization(self): + """Test OdomState initializes with correct defaults.""" + state = OdomState() + + assert state.x == 0.0 + assert state.y == 0.0 + assert state.theta == 0.0 + assert state.vx == 0.0 + assert state.vy == 0.0 + assert state.omega == 0.0 + + def test_odom_state_covariance_initialization(self): + """Test OdomState covariance matrices.""" + state = OdomState() + + assert state.cov_pos.shape == (2, 2) + assert state.cov_theta == 0.01 + assert state.cov_vel.shape == (2, 2) + + +class TestOdomFusionNode: + """Test suite for OdomFusionNode.""" + + def test_node_initialization(self, node): + """Test that node initializes with correct defaults.""" + assert node.state.x == 0.0 + assert node.state.y == 0.0 + assert node.state.theta == 0.0 + assert node.last_wheel_odom is None + assert node.last_visual_odom is None + assert node.last_imu is None + + def test_weight_normalization(self, node): + """Test that weights are normalized to sum to 1.0.""" + total = node.wheel_weight + node.visual_weight + node.imu_weight + assert abs(total - 1.0) < 1e-6 + + def test_wheel_odom_subscription(self, node): + """Test wheel odometry message handling.""" + odom_msg = Odometry() + odom_msg.twist.twist.linear.x = 1.0 + odom_msg.twist.twist.linear.y = 0.5 + + node._on_wheel_odom(odom_msg) + assert node.last_wheel_odom is not None + assert node.last_wheel_odom.twist.twist.linear.x == 1.0 + + def test_visual_odom_subscription(self, node): + """Test visual odometry message handling.""" + odom_msg = Odometry() + odom_msg.pose.pose.position.x = 5.0 + odom_msg.pose.pose.position.y = 3.0 + + node._on_visual_odom(odom_msg) + assert node.last_visual_odom is not None + assert node.last_visual_odom.pose.pose.position.x == 5.0 + + def test_imu_subscription(self, node): + """Test IMU message handling.""" + imu_msg = Imu() + imu_msg.angular_velocity.z = 0.1 + + node._on_imu(imu_msg) + assert node.last_imu is not None + assert node.last_imu.angular_velocity.z == 0.1 + + def test_position_integration(self, node): + """Test position integration from velocity.""" + node.state.vx = 1.0 # 1 m/s + node.state.vy = 0.0 + node.state.theta = 0.0 # Facing +x direction + + dt = 1.0 # 1 second + node._fuse_odometry(dt) + + # Should move 1 meter in +x direction + assert abs(node.state.x - 1.0) < 0.1 + + def test_position_integration_rotated(self, node): + """Test position integration with rotation.""" + node.state.vx = 1.0 # 1 m/s + node.state.vy = 0.0 + node.state.theta = math.pi / 2 # Facing +y direction + + dt = 1.0 + node._fuse_odometry(dt) + + # Should move 1 meter in +y direction + assert abs(node.state.y - 1.0) < 0.1 + + def test_angular_velocity_update(self, node): + """Test angular velocity from IMU.""" + imu_msg = Imu() + imu_msg.angular_velocity.z = 0.2 + imu_msg.orientation.w = 1.0 + + node.last_imu = imu_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Angular velocity should be weighted by imu_weight + expected_omega = 0.2 * node.imu_weight + assert abs(node.state.omega - expected_omega) < 0.01 + + def test_velocity_blending_wheel_only(self, node): + """Test velocity blending with only wheel odometry.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 + wheel_msg.twist.twist.linear.y = 0.0 + + node.last_wheel_odom = wheel_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Velocity should be weighted by wheel_weight + expected_vx = 1.0 * node.wheel_weight + assert abs(node.state.vx - expected_vx) < 0.01 + + def test_visual_drift_correction(self, node): + """Test drift correction from visual odometry.""" + # Current state + node.state.x = 0.0 + node.state.y = 0.0 + + # Visual odometry says we're at (1.0, 1.0) + visual_msg = Odometry() + visual_msg.pose.pose.position.x = 1.0 + visual_msg.pose.pose.position.y = 1.0 + visual_msg.pose.pose.orientation.w = 1.0 + + node.last_visual_odom = visual_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Position should have moved toward visual estimate + assert node.state.x > 0.0 + assert node.state.y > 0.0 + + def test_covariance_updates(self, node): + """Test that covariances are updated based on measurements.""" + wheel_msg = Odometry() + visual_msg = Odometry() + visual_msg.pose.pose.orientation.w = 1.0 + + node.last_wheel_odom = wheel_msg + node.last_visual_odom = visual_msg + + measurements = { + "wheel": wheel_msg, + "visual": visual_msg, + } + + node._update_covariances(measurements) + + # Covariance should be reduced with good measurements + assert node.state.cov_pos[0, 0] < 0.01 + + def test_quaternion_to_yaw(self, node): + """Test quaternion to yaw conversion.""" + # Create 90 degree yaw rotation + q = quaternion_from_euler(0, 0, math.pi / 2) + + imu_msg = Imu() + imu_msg.orientation.x = q[0] + imu_msg.orientation.y = q[1] + imu_msg.orientation.z = q[2] + imu_msg.orientation.w = q[3] + + node.last_imu = imu_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Theta should be blended toward 90 degrees + assert node.state.theta > 0.0 + + def test_velocity_near_zero(self, node): + """Test handling of near-zero velocities.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 0.001 # Very small + + node.last_wheel_odom = wheel_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Position change should be minimal + assert abs(node.state.x) < 0.0001 + + def test_invalid_dt_ignored(self, node): + """Test that invalid dt values are ignored.""" + initial_x = node.state.x + + # Zero dt + node._fuse_odometry(0.0) + assert node.state.x == initial_x + + # Negative dt + node._fuse_odometry(-1.0) + assert node.state.x == initial_x + + # Very large dt + node._fuse_odometry(10.0) + assert node.state.x == initial_x + + +class TestOdomFusionScenarios: + """Integration-style tests for realistic scenarios.""" + + def test_scenario_straight_line(self, node): + """Scenario: robot moving in straight line.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward + wheel_msg.twist.twist.linear.y = 0.0 + + node.last_wheel_odom = wheel_msg + node.state.theta = 0.0 # Facing +x + + # Move for 5 seconds + for _ in range(5): + node._fuse_odometry(1.0) + + # Should be approximately 5 meters in +x direction + assert 4.0 < node.state.x < 6.0 + assert abs(node.state.y) < 0.5 + + def test_scenario_circular_motion(self, node): + """Scenario: robot moving in circle.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward + + imu_msg = Imu() + imu_msg.angular_velocity.z = 0.5 # 0.5 rad/s rotation + imu_msg.orientation.w = 1.0 + + node.last_wheel_odom = wheel_msg + node.last_imu = imu_msg + + # Simulate circular motion for 4 cycles + for i in range(4): + node._fuse_odometry(1.0) + + # Should have moved distance and rotated + distance = math.sqrt(node.state.x**2 + node.state.y**2) + assert distance > 1.0 + + def test_scenario_drift_correction(self, node): + """Scenario: wheel odometry drifts, vision corrects.""" + # Wheel says we're at 10, 10 + node.state.x = 10.0 + node.state.y = 10.0 + + # Vision says we're actually at 9, 9 + visual_msg = Odometry() + visual_msg.pose.pose.position.x = 9.0 + visual_msg.pose.pose.position.y = 9.0 + visual_msg.pose.pose.orientation.w = 1.0 + + node.last_visual_odom = visual_msg + + # Before correction + initial_x = node.state.x + initial_y = node.state.y + + node._fuse_odometry(1.0) + + # Position should move toward visual estimate + assert node.state.x < initial_x + assert node.state.y < initial_y + + def test_scenario_all_sensors_available(self, node): + """Scenario: all three sensor sources available.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 0.5 + + visual_msg = Odometry() + visual_msg.pose.pose.position.x = 2.0 + visual_msg.pose.pose.position.y = 1.0 + visual_msg.pose.pose.orientation.w = 1.0 + + imu_msg = Imu() + q = quaternion_from_euler(0, 0, 0.1) # Small yaw + imu_msg.orientation.x = q[0] + imu_msg.orientation.y = q[1] + imu_msg.orientation.z = q[2] + imu_msg.orientation.w = q[3] + imu_msg.angular_velocity.z = 0.05 + + node.last_wheel_odom = wheel_msg + node.last_visual_odom = visual_msg + node.last_imu = imu_msg + + measurements = { + "wheel": wheel_msg, + "visual": visual_msg, + "imu": imu_msg, + } + + node._update_covariances(measurements) + + # Covariance should be low with all sources + assert node.state.cov_pos[0, 0] < 0.01 + assert node.state.cov_theta < 0.01 + + def test_scenario_sensor_dropout(self, node): + """Scenario: visual sensor drops out, wheel continues.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 + + node.last_wheel_odom = wheel_msg + node.last_visual_odom = None # Visual dropout + node.last_imu = None + + # Should continue with wheel only + node._fuse_odometry(1.0) + + # Position should still integrate + assert node.state.x > 0.0