diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/config/fusion_config.yaml b/jetson/ros2_ws/src/saltybot_odom_fusion/config/fusion_config.yaml new file mode 100644 index 0000000..09df089 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/config/fusion_config.yaml @@ -0,0 +1,16 @@ +# Odometry fusion configuration (complementary filter) + +odom_fusion: + ros__parameters: + # Publishing frequency in Hz + frequency: 50 # 50 Hz = 20ms cycle + + # Sensor weights (will be normalized) + # Wheel odometry: reliable linear displacement + wheel_weight: 0.6 + + # Visual odometry: loop closure and long-term correction + visual_weight: 0.3 + + # IMU: high-frequency attitude and angular velocity + imu_weight: 0.1 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/launch/odom_fusion.launch.py b/jetson/ros2_ws/src/saltybot_odom_fusion/launch/odom_fusion.launch.py new file mode 100644 index 0000000..76ca63f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/launch/odom_fusion.launch.py @@ -0,0 +1,36 @@ +"""Launch file for odom_fusion_node.""" + +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration +from launch.actions import DeclareLaunchArgument +import os +from ament_index_python.packages import get_package_share_directory + + +def generate_launch_description(): + """Generate launch description for odometry fusion.""" + # Package directory + pkg_dir = get_package_share_directory("saltybot_odom_fusion") + + # Parameters + config_file = os.path.join(pkg_dir, "config", "fusion_config.yaml") + + # Declare launch arguments + return LaunchDescription( + [ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to configuration YAML file", + ), + # Odometry fusion node + Node( + package="saltybot_odom_fusion", + executable="odom_fusion_node", + name="odom_fusion", + output="screen", + parameters=[LaunchConfiguration("config_file")], + ), + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/package.xml b/jetson/ros2_ws/src/saltybot_odom_fusion/package.xml new file mode 100644 index 0000000..96173b8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_odom_fusion + 0.1.0 + + Odometry fusion node for SaltyBot: fuses wheel, visual, and IMU odometry + using complementary filtering. Publishes fused /odom and broadcasts + odom→base_link TF at 50Hz. Handles sensor dropout and covariance weighting. + + sl-controls + MIT + + rclpy + geometry_msgs + nav_msgs + sensor_msgs + std_msgs + tf2_ros + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/resource/saltybot_odom_fusion b/jetson/ros2_ws/src/saltybot_odom_fusion/resource/saltybot_odom_fusion new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/__init__.py b/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/odom_fusion_node.py b/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/odom_fusion_node.py new file mode 100644 index 0000000..592bfd5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/saltybot_odom_fusion/odom_fusion_node.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python3 +"""Odometry fusion node for SaltyBot. + +Fuses wheel odometry, visual odometry, and IMU data using complementary filtering. +Publishes fused /odom and broadcasts odom→base_link TF at 50Hz. + +Published topics: + /odom (nav_msgs/Odometry) - Fused odometry with covariance + +TF broadcasts: + odom→base_link - Position and orientation transform + +Subscribed topics: + /saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry + /rtab_map/odom (nav_msgs/Odometry) - Visual odometry (SLAM) + /imu/data (sensor_msgs/Imu) - Inertial measurement unit +""" + +import math +import json +from typing import Optional, Tuple +from dataclasses import dataclass, field + +import numpy as np +import rclpy +from rclpy.node import Node +from rclpy.timer import Timer + +from geometry_msgs.msg import Pose, Twist, TransformStamped, Quaternion +from nav_msgs.msg import Odometry +from sensor_msgs.msg import Imu +from std_msgs.msg import String +from tf2_ros import TransformBroadcaster +from tf_transformations import quaternion_from_euler, euler_from_quaternion + + +@dataclass +class OdomState: + """Fused odometry state.""" + x: float = 0.0 # Position in odom frame + y: float = 0.0 + theta: float = 0.0 # Orientation (yaw) + + vx: float = 0.0 # Velocity in body frame + vy: float = 0.0 + omega: float = 0.0 # Angular velocity + + cov_pos: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Position covariance + cov_theta: float = 0.01 # Orientation covariance + cov_vel: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Velocity covariance + + +class OdomFusionNode(Node): + """ROS2 node for odometry fusion.""" + + def __init__(self): + super().__init__("odom_fusion") + + # Parameters + self.declare_parameter("frequency", 50) # Hz + self.declare_parameter("wheel_weight", 0.6) # Weight for wheel odometry + self.declare_parameter("visual_weight", 0.3) # Weight for visual odometry + self.declare_parameter("imu_weight", 0.1) # Weight for IMU + + frequency = self.get_parameter("frequency").value + self.wheel_weight = self.get_parameter("wheel_weight").value + self.visual_weight = self.get_parameter("visual_weight").value + self.imu_weight = self.get_parameter("imu_weight").value + + # Normalize weights + total_weight = self.wheel_weight + self.visual_weight + self.imu_weight + self.wheel_weight /= total_weight + self.visual_weight /= total_weight + self.imu_weight /= total_weight + + # State + self.state = OdomState() + self.last_time = None + + # Last received messages + self.last_wheel_odom: Optional[Odometry] = None + self.last_visual_odom: Optional[Odometry] = None + self.last_imu: Optional[Imu] = None + + # Subscriptions + self.create_subscription( + Odometry, "/saltybot/wheel_odom", self._on_wheel_odom, 10 + ) + self.create_subscription( + Odometry, "/rtab_map/odom", self._on_visual_odom, 10 + ) + self.create_subscription(Imu, "/imu/data", self._on_imu, 10) + + # Publications + self.pub_odom = self.create_publisher(Odometry, "/odom", 10) + self.pub_info = self.create_publisher(String, "/saltybot/odom_fusion_info", 10) + + # TF broadcaster + self.tf_broadcaster = TransformBroadcaster(self) + + # Timer for periodic fusion and publishing at 50Hz + period = 1.0 / frequency + self.timer: Timer = self.create_timer(period, self._timer_callback) + + self.get_logger().info( + f"Odometry fusion initialized at {frequency}Hz. " + f"Weights - wheel: {self.wheel_weight:.2f}, " + f"visual: {self.visual_weight:.2f}, " + f"imu: {self.imu_weight:.2f}" + ) + + def _on_wheel_odom(self, msg: Odometry) -> None: + """Update wheel odometry.""" + self.last_wheel_odom = msg + + def _on_visual_odom(self, msg: Odometry) -> None: + """Update visual odometry.""" + self.last_visual_odom = msg + + def _on_imu(self, msg: Imu) -> None: + """Update IMU data.""" + self.last_imu = msg + + def _timer_callback(self) -> None: + """Fuse odometry sources and publish at 50Hz.""" + now = self.get_clock().now() + + if self.last_time is None: + self.last_time = now + return + + dt = (now - self.last_time).nanoseconds / 1e9 + self.last_time = now + + # Fuse available sources + self._fuse_odometry(dt) + + # Publish fused odometry + self._publish_odometry(now) + + # Broadcast TF + self._broadcast_tf(now) + + # Publish debug info + self._publish_info() + + def _fuse_odometry(self, dt: float) -> None: + """Fuse odometry sources using complementary filtering.""" + if dt <= 0 or dt > 1.0: # Safety check on dt + return + + # Collect available measurements + measurements = {} + + if self.last_wheel_odom is not None: + measurements["wheel"] = self.last_wheel_odom + + if self.last_visual_odom is not None: + measurements["visual"] = self.last_visual_odom + + if self.last_imu is not None: + measurements["imu"] = self.last_imu + + if not measurements: + return # No data to fuse + + # Complementary filter approach + # High-frequency IMU for attitude, low-frequency vision for position correction + + # Extract velocities from wheel odometry (most reliable for linear motion) + if "wheel" in measurements: + wheel_odom = measurements["wheel"] + self.state.vx = wheel_odom.twist.twist.linear.x * self.wheel_weight + self.state.vy = wheel_odom.twist.twist.linear.y * self.wheel_weight + + # Integrate IMU angular velocity (high-frequency) + if "imu" in measurements: + imu = measurements["imu"] + # Use IMU for angular velocity (most accurate for rotation) + self.state.omega = imu.angular_velocity.z * self.imu_weight + + # Update orientation from IMU + roll, pitch, yaw = euler_from_quaternion([ + imu.orientation.x, imu.orientation.y, + imu.orientation.z, imu.orientation.w + ]) + # Blend with existing estimate + self.state.theta = (0.8 * self.state.theta + 0.2 * yaw) + + # Apply velocity-based position update + self.state.x += self.state.vx * dt * math.cos(self.state.theta) + self.state.x -= self.state.vy * dt * math.sin(self.state.theta) + + self.state.y += self.state.vx * dt * math.sin(self.state.theta) + self.state.y += self.state.vy * dt * math.cos(self.state.theta) + + # Visual odometry correction (low-frequency drift correction) + if "visual" in measurements and self.visual_weight > 0: + visual_odom = measurements["visual"] + visual_x = visual_odom.pose.pose.position.x + visual_y = visual_odom.pose.pose.position.y + + # Gradually correct position drift using visual odometry + correction_factor = self.visual_weight * 0.1 # Small correction rate + self.state.x += (visual_x - self.state.x) * correction_factor + self.state.y += (visual_y - self.state.y) * correction_factor + + # Correct orientation from visual odometry + visual_roll, visual_pitch, visual_yaw = euler_from_quaternion([ + visual_odom.pose.pose.orientation.x, + visual_odom.pose.pose.orientation.y, + visual_odom.pose.pose.orientation.z, + visual_odom.pose.pose.orientation.w + ]) + self.state.theta += (visual_yaw - self.state.theta) * correction_factor + + # Update covariances based on available measurements + self._update_covariances(measurements) + + def _update_covariances(self, measurements: dict) -> None: + """Update state covariance based on measurement sources.""" + # Start with baseline + base_pos_cov = 0.01 + base_theta_cov = 0.01 + + # Reduce covariance if we have good measurements + if "wheel" in measurements: + base_pos_cov *= 0.5 + if "visual" in measurements: + base_pos_cov *= 0.7 + if "imu" in measurements: + base_theta_cov *= 0.3 + + # Update covariance matrices + self.state.cov_pos = np.eye(2) * base_pos_cov + self.state.cov_theta = base_theta_cov + + def _publish_odometry(self, now) -> None: + """Publish fused odometry message.""" + odom_msg = Odometry() + odom_msg.header.stamp = now.to_msg() + odom_msg.header.frame_id = "odom" + odom_msg.child_frame_id = "base_link" + + # Position + odom_msg.pose.pose.position.x = self.state.x + odom_msg.pose.pose.position.y = self.state.y + odom_msg.pose.pose.position.z = 0.0 + + # Orientation (yaw only, roll/pitch from IMU) + q = quaternion_from_euler(0, 0, self.state.theta) + odom_msg.pose.pose.orientation = Quaternion( + x=q[0], y=q[1], z=q[2], w=q[3] + ) + + # Position covariance (6x6) + pose_cov = np.zeros((6, 6)) + pose_cov[0:2, 0:2] = self.state.cov_pos + pose_cov[5, 5] = self.state.cov_theta # Yaw variance + odom_msg.pose.covariance = pose_cov.flatten().tolist() + + # Velocity + odom_msg.twist.twist.linear.x = self.state.vx + odom_msg.twist.twist.linear.y = self.state.vy + odom_msg.twist.twist.linear.z = 0.0 + odom_msg.twist.twist.angular.z = self.state.omega + + # Velocity covariance (6x6) + twist_cov = np.zeros((6, 6)) + twist_cov[0:2, 0:2] = self.state.cov_vel + twist_cov[5, 5] = 0.01 # Angular velocity variance + odom_msg.twist.covariance = twist_cov.flatten().tolist() + + self.pub_odom.publish(odom_msg) + + def _broadcast_tf(self, now) -> None: + """Broadcast odom→base_link TF.""" + t = TransformStamped() + t.header.stamp = now.to_msg() + t.header.frame_id = "odom" + t.child_frame_id = "base_link" + + # Translation + t.transform.translation.x = self.state.x + t.transform.translation.y = self.state.y + t.transform.translation.z = 0.0 + + # Rotation (yaw only) + q = quaternion_from_euler(0, 0, self.state.theta) + t.transform.rotation = Quaternion( + x=q[0], y=q[1], z=q[2], w=q[3] + ) + + self.tf_broadcaster.sendTransform(t) + + def _publish_info(self) -> None: + """Publish fusion debug information.""" + info = { + "timestamp": self.get_clock().now().nanoseconds / 1e9, + "position": [self.state.x, self.state.y], + "theta_deg": math.degrees(self.state.theta), + "velocity": [self.state.vx, self.state.vy], + "omega": self.state.omega, + "sources": { + "wheel": self.last_wheel_odom is not None, + "visual": self.last_visual_odom is not None, + "imu": self.last_imu is not None, + }, + "weights": { + "wheel": f"{self.wheel_weight:.2f}", + "visual": f"{self.visual_weight:.2f}", + "imu": f"{self.imu_weight:.2f}", + }, + } + msg = String(data=json.dumps(info)) + self.pub_info.publish(msg) + + +def main(args=None): + rclpy.init(args=args) + node = OdomFusionNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg new file mode 100644 index 0000000..f115f95 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_odom_fusion +[egg_info] +tag_date = 0 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/setup.py b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.py new file mode 100644 index 0000000..4d54dec --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/setup.py @@ -0,0 +1,29 @@ +from setuptools import setup + +package_name = "saltybot_odom_fusion" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/odom_fusion.launch.py"]), + (f"share/{package_name}/config", ["config/fusion_config.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description=( + "Odometry fusion: wheel + visual + IMU complementary filter with TF broadcast" + ), + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "odom_fusion_node = saltybot_odom_fusion.odom_fusion_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/test/__init__.py b/jetson/ros2_ws/src/saltybot_odom_fusion/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py b/jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py new file mode 100644 index 0000000..a7f7d67 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py @@ -0,0 +1,361 @@ +"""Unit tests for odom_fusion_node.""" + +import pytest +import math +import json +import numpy as np +from geometry_msgs.msg import Pose, Quaternion, Twist, Vector3 +from nav_msgs.msg import Odometry +from sensor_msgs.msg import Imu +from std_msgs.msg import String, Header +from tf_transformations import quaternion_from_euler, euler_from_quaternion + +import rclpy + +# Import the node and classes under test +from saltybot_odom_fusion.odom_fusion_node import ( + OdomFusionNode, + OdomState, +) + + +@pytest.fixture +def rclpy_fixture(): + """Initialize and cleanup rclpy.""" + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def node(rclpy_fixture): + """Create an odometry fusion node instance.""" + node = OdomFusionNode() + yield node + node.destroy_node() + + +class TestOdomState: + """Test suite for OdomState dataclass.""" + + def test_odom_state_initialization(self): + """Test OdomState initializes with correct defaults.""" + state = OdomState() + + assert state.x == 0.0 + assert state.y == 0.0 + assert state.theta == 0.0 + assert state.vx == 0.0 + assert state.vy == 0.0 + assert state.omega == 0.0 + + def test_odom_state_covariance_initialization(self): + """Test OdomState covariance matrices.""" + state = OdomState() + + assert state.cov_pos.shape == (2, 2) + assert state.cov_theta == 0.01 + assert state.cov_vel.shape == (2, 2) + + +class TestOdomFusionNode: + """Test suite for OdomFusionNode.""" + + def test_node_initialization(self, node): + """Test that node initializes with correct defaults.""" + assert node.state.x == 0.0 + assert node.state.y == 0.0 + assert node.state.theta == 0.0 + assert node.last_wheel_odom is None + assert node.last_visual_odom is None + assert node.last_imu is None + + def test_weight_normalization(self, node): + """Test that weights are normalized to sum to 1.0.""" + total = node.wheel_weight + node.visual_weight + node.imu_weight + assert abs(total - 1.0) < 1e-6 + + def test_wheel_odom_subscription(self, node): + """Test wheel odometry message handling.""" + odom_msg = Odometry() + odom_msg.twist.twist.linear.x = 1.0 + odom_msg.twist.twist.linear.y = 0.5 + + node._on_wheel_odom(odom_msg) + assert node.last_wheel_odom is not None + assert node.last_wheel_odom.twist.twist.linear.x == 1.0 + + def test_visual_odom_subscription(self, node): + """Test visual odometry message handling.""" + odom_msg = Odometry() + odom_msg.pose.pose.position.x = 5.0 + odom_msg.pose.pose.position.y = 3.0 + + node._on_visual_odom(odom_msg) + assert node.last_visual_odom is not None + assert node.last_visual_odom.pose.pose.position.x == 5.0 + + def test_imu_subscription(self, node): + """Test IMU message handling.""" + imu_msg = Imu() + imu_msg.angular_velocity.z = 0.1 + + node._on_imu(imu_msg) + assert node.last_imu is not None + assert node.last_imu.angular_velocity.z == 0.1 + + def test_position_integration(self, node): + """Test position integration from velocity.""" + node.state.vx = 1.0 # 1 m/s + node.state.vy = 0.0 + node.state.theta = 0.0 # Facing +x direction + + dt = 1.0 # 1 second + node._fuse_odometry(dt) + + # Should move 1 meter in +x direction + assert abs(node.state.x - 1.0) < 0.1 + + def test_position_integration_rotated(self, node): + """Test position integration with rotation.""" + node.state.vx = 1.0 # 1 m/s + node.state.vy = 0.0 + node.state.theta = math.pi / 2 # Facing +y direction + + dt = 1.0 + node._fuse_odometry(dt) + + # Should move 1 meter in +y direction + assert abs(node.state.y - 1.0) < 0.1 + + def test_angular_velocity_update(self, node): + """Test angular velocity from IMU.""" + imu_msg = Imu() + imu_msg.angular_velocity.z = 0.2 + imu_msg.orientation.w = 1.0 + + node.last_imu = imu_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Angular velocity should be weighted by imu_weight + expected_omega = 0.2 * node.imu_weight + assert abs(node.state.omega - expected_omega) < 0.01 + + def test_velocity_blending_wheel_only(self, node): + """Test velocity blending with only wheel odometry.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 + wheel_msg.twist.twist.linear.y = 0.0 + + node.last_wheel_odom = wheel_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Velocity should be weighted by wheel_weight + expected_vx = 1.0 * node.wheel_weight + assert abs(node.state.vx - expected_vx) < 0.01 + + def test_visual_drift_correction(self, node): + """Test drift correction from visual odometry.""" + # Current state + node.state.x = 0.0 + node.state.y = 0.0 + + # Visual odometry says we're at (1.0, 1.0) + visual_msg = Odometry() + visual_msg.pose.pose.position.x = 1.0 + visual_msg.pose.pose.position.y = 1.0 + visual_msg.pose.pose.orientation.w = 1.0 + + node.last_visual_odom = visual_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Position should have moved toward visual estimate + assert node.state.x > 0.0 + assert node.state.y > 0.0 + + def test_covariance_updates(self, node): + """Test that covariances are updated based on measurements.""" + wheel_msg = Odometry() + visual_msg = Odometry() + visual_msg.pose.pose.orientation.w = 1.0 + + node.last_wheel_odom = wheel_msg + node.last_visual_odom = visual_msg + + measurements = { + "wheel": wheel_msg, + "visual": visual_msg, + } + + node._update_covariances(measurements) + + # Covariance should be reduced with good measurements + assert node.state.cov_pos[0, 0] < 0.01 + + def test_quaternion_to_yaw(self, node): + """Test quaternion to yaw conversion.""" + # Create 90 degree yaw rotation + q = quaternion_from_euler(0, 0, math.pi / 2) + + imu_msg = Imu() + imu_msg.orientation.x = q[0] + imu_msg.orientation.y = q[1] + imu_msg.orientation.z = q[2] + imu_msg.orientation.w = q[3] + + node.last_imu = imu_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Theta should be blended toward 90 degrees + assert node.state.theta > 0.0 + + def test_velocity_near_zero(self, node): + """Test handling of near-zero velocities.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 0.001 # Very small + + node.last_wheel_odom = wheel_msg + dt = 0.1 + + node._fuse_odometry(dt) + + # Position change should be minimal + assert abs(node.state.x) < 0.0001 + + def test_invalid_dt_ignored(self, node): + """Test that invalid dt values are ignored.""" + initial_x = node.state.x + + # Zero dt + node._fuse_odometry(0.0) + assert node.state.x == initial_x + + # Negative dt + node._fuse_odometry(-1.0) + assert node.state.x == initial_x + + # Very large dt + node._fuse_odometry(10.0) + assert node.state.x == initial_x + + +class TestOdomFusionScenarios: + """Integration-style tests for realistic scenarios.""" + + def test_scenario_straight_line(self, node): + """Scenario: robot moving in straight line.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward + wheel_msg.twist.twist.linear.y = 0.0 + + node.last_wheel_odom = wheel_msg + node.state.theta = 0.0 # Facing +x + + # Move for 5 seconds + for _ in range(5): + node._fuse_odometry(1.0) + + # Should be approximately 5 meters in +x direction + assert 4.0 < node.state.x < 6.0 + assert abs(node.state.y) < 0.5 + + def test_scenario_circular_motion(self, node): + """Scenario: robot moving in circle.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward + + imu_msg = Imu() + imu_msg.angular_velocity.z = 0.5 # 0.5 rad/s rotation + imu_msg.orientation.w = 1.0 + + node.last_wheel_odom = wheel_msg + node.last_imu = imu_msg + + # Simulate circular motion for 4 cycles + for i in range(4): + node._fuse_odometry(1.0) + + # Should have moved distance and rotated + distance = math.sqrt(node.state.x**2 + node.state.y**2) + assert distance > 1.0 + + def test_scenario_drift_correction(self, node): + """Scenario: wheel odometry drifts, vision corrects.""" + # Wheel says we're at 10, 10 + node.state.x = 10.0 + node.state.y = 10.0 + + # Vision says we're actually at 9, 9 + visual_msg = Odometry() + visual_msg.pose.pose.position.x = 9.0 + visual_msg.pose.pose.position.y = 9.0 + visual_msg.pose.pose.orientation.w = 1.0 + + node.last_visual_odom = visual_msg + + # Before correction + initial_x = node.state.x + initial_y = node.state.y + + node._fuse_odometry(1.0) + + # Position should move toward visual estimate + assert node.state.x < initial_x + assert node.state.y < initial_y + + def test_scenario_all_sensors_available(self, node): + """Scenario: all three sensor sources available.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 0.5 + + visual_msg = Odometry() + visual_msg.pose.pose.position.x = 2.0 + visual_msg.pose.pose.position.y = 1.0 + visual_msg.pose.pose.orientation.w = 1.0 + + imu_msg = Imu() + q = quaternion_from_euler(0, 0, 0.1) # Small yaw + imu_msg.orientation.x = q[0] + imu_msg.orientation.y = q[1] + imu_msg.orientation.z = q[2] + imu_msg.orientation.w = q[3] + imu_msg.angular_velocity.z = 0.05 + + node.last_wheel_odom = wheel_msg + node.last_visual_odom = visual_msg + node.last_imu = imu_msg + + measurements = { + "wheel": wheel_msg, + "visual": visual_msg, + "imu": imu_msg, + } + + node._update_covariances(measurements) + + # Covariance should be low with all sources + assert node.state.cov_pos[0, 0] < 0.01 + assert node.state.cov_theta < 0.01 + + def test_scenario_sensor_dropout(self, node): + """Scenario: visual sensor drops out, wheel continues.""" + wheel_msg = Odometry() + wheel_msg.twist.twist.linear.x = 1.0 + + node.last_wheel_odom = wheel_msg + node.last_visual_odom = None # Visual dropout + node.last_imu = None + + # Should continue with wheel only + node._fuse_odometry(1.0) + + # Position should still integrate + assert node.state.x > 0.0