Compare commits
1 Commits
eceda99bb5
...
b9d01762e2
| Author | SHA1 | Date | |
|---|---|---|---|
| b9d01762e2 |
@ -1,16 +0,0 @@
|
|||||||
# Odometry fusion configuration (complementary filter)
|
|
||||||
|
|
||||||
odom_fusion:
|
|
||||||
ros__parameters:
|
|
||||||
# Publishing frequency in Hz
|
|
||||||
frequency: 50 # 50 Hz = 20ms cycle
|
|
||||||
|
|
||||||
# Sensor weights (will be normalized)
|
|
||||||
# Wheel odometry: reliable linear displacement
|
|
||||||
wheel_weight: 0.6
|
|
||||||
|
|
||||||
# Visual odometry: loop closure and long-term correction
|
|
||||||
visual_weight: 0.3
|
|
||||||
|
|
||||||
# IMU: high-frequency attitude and angular velocity
|
|
||||||
imu_weight: 0.1
|
|
||||||
@ -1,36 +0,0 @@
|
|||||||
"""Launch file for odom_fusion_node."""
|
|
||||||
|
|
||||||
from launch import LaunchDescription
|
|
||||||
from launch_ros.actions import Node
|
|
||||||
from launch.substitutions import LaunchConfiguration
|
|
||||||
from launch.actions import DeclareLaunchArgument
|
|
||||||
import os
|
|
||||||
from ament_index_python.packages import get_package_share_directory
|
|
||||||
|
|
||||||
|
|
||||||
def generate_launch_description():
|
|
||||||
"""Generate launch description for odometry fusion."""
|
|
||||||
# Package directory
|
|
||||||
pkg_dir = get_package_share_directory("saltybot_odom_fusion")
|
|
||||||
|
|
||||||
# Parameters
|
|
||||||
config_file = os.path.join(pkg_dir, "config", "fusion_config.yaml")
|
|
||||||
|
|
||||||
# Declare launch arguments
|
|
||||||
return LaunchDescription(
|
|
||||||
[
|
|
||||||
DeclareLaunchArgument(
|
|
||||||
"config_file",
|
|
||||||
default_value=config_file,
|
|
||||||
description="Path to configuration YAML file",
|
|
||||||
),
|
|
||||||
# Odometry fusion node
|
|
||||||
Node(
|
|
||||||
package="saltybot_odom_fusion",
|
|
||||||
executable="odom_fusion_node",
|
|
||||||
name="odom_fusion",
|
|
||||||
output="screen",
|
|
||||||
parameters=[LaunchConfiguration("config_file")],
|
|
||||||
),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
@ -1,31 +0,0 @@
|
|||||||
<?xml version="1.0"?>
|
|
||||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
|
||||||
<package format="3">
|
|
||||||
<name>saltybot_odom_fusion</name>
|
|
||||||
<version>0.1.0</version>
|
|
||||||
<description>
|
|
||||||
Odometry fusion node for SaltyBot: fuses wheel, visual, and IMU odometry
|
|
||||||
using complementary filtering. Publishes fused /odom and broadcasts
|
|
||||||
odom→base_link TF at 50Hz. Handles sensor dropout and covariance weighting.
|
|
||||||
</description>
|
|
||||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
|
||||||
<license>MIT</license>
|
|
||||||
|
|
||||||
<depend>rclpy</depend>
|
|
||||||
<depend>geometry_msgs</depend>
|
|
||||||
<depend>nav_msgs</depend>
|
|
||||||
<depend>sensor_msgs</depend>
|
|
||||||
<depend>std_msgs</depend>
|
|
||||||
<depend>tf2_ros</depend>
|
|
||||||
|
|
||||||
<buildtool_depend>ament_python</buildtool_depend>
|
|
||||||
|
|
||||||
<test_depend>ament_copyright</test_depend>
|
|
||||||
<test_depend>ament_flake8</test_depend>
|
|
||||||
<test_depend>ament_pep257</test_depend>
|
|
||||||
<test_depend>python3-pytest</test_depend>
|
|
||||||
|
|
||||||
<export>
|
|
||||||
<build_type>ament_python</build_type>
|
|
||||||
</export>
|
|
||||||
</package>
|
|
||||||
@ -1,332 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""Odometry fusion node for SaltyBot.
|
|
||||||
|
|
||||||
Fuses wheel odometry, visual odometry, and IMU data using complementary filtering.
|
|
||||||
Publishes fused /odom and broadcasts odom→base_link TF at 50Hz.
|
|
||||||
|
|
||||||
Published topics:
|
|
||||||
/odom (nav_msgs/Odometry) - Fused odometry with covariance
|
|
||||||
|
|
||||||
TF broadcasts:
|
|
||||||
odom→base_link - Position and orientation transform
|
|
||||||
|
|
||||||
Subscribed topics:
|
|
||||||
/saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry
|
|
||||||
/rtab_map/odom (nav_msgs/Odometry) - Visual odometry (SLAM)
|
|
||||||
/imu/data (sensor_msgs/Imu) - Inertial measurement unit
|
|
||||||
"""
|
|
||||||
|
|
||||||
import math
|
|
||||||
import json
|
|
||||||
from typing import Optional, Tuple
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy.timer import Timer
|
|
||||||
|
|
||||||
from geometry_msgs.msg import Pose, Twist, TransformStamped, Quaternion
|
|
||||||
from nav_msgs.msg import Odometry
|
|
||||||
from sensor_msgs.msg import Imu
|
|
||||||
from std_msgs.msg import String
|
|
||||||
from tf2_ros import TransformBroadcaster
|
|
||||||
from tf_transformations import quaternion_from_euler, euler_from_quaternion
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class OdomState:
|
|
||||||
"""Fused odometry state."""
|
|
||||||
x: float = 0.0 # Position in odom frame
|
|
||||||
y: float = 0.0
|
|
||||||
theta: float = 0.0 # Orientation (yaw)
|
|
||||||
|
|
||||||
vx: float = 0.0 # Velocity in body frame
|
|
||||||
vy: float = 0.0
|
|
||||||
omega: float = 0.0 # Angular velocity
|
|
||||||
|
|
||||||
cov_pos: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Position covariance
|
|
||||||
cov_theta: float = 0.01 # Orientation covariance
|
|
||||||
cov_vel: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Velocity covariance
|
|
||||||
|
|
||||||
|
|
||||||
class OdomFusionNode(Node):
|
|
||||||
"""ROS2 node for odometry fusion."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__("odom_fusion")
|
|
||||||
|
|
||||||
# Parameters
|
|
||||||
self.declare_parameter("frequency", 50) # Hz
|
|
||||||
self.declare_parameter("wheel_weight", 0.6) # Weight for wheel odometry
|
|
||||||
self.declare_parameter("visual_weight", 0.3) # Weight for visual odometry
|
|
||||||
self.declare_parameter("imu_weight", 0.1) # Weight for IMU
|
|
||||||
|
|
||||||
frequency = self.get_parameter("frequency").value
|
|
||||||
self.wheel_weight = self.get_parameter("wheel_weight").value
|
|
||||||
self.visual_weight = self.get_parameter("visual_weight").value
|
|
||||||
self.imu_weight = self.get_parameter("imu_weight").value
|
|
||||||
|
|
||||||
# Normalize weights
|
|
||||||
total_weight = self.wheel_weight + self.visual_weight + self.imu_weight
|
|
||||||
self.wheel_weight /= total_weight
|
|
||||||
self.visual_weight /= total_weight
|
|
||||||
self.imu_weight /= total_weight
|
|
||||||
|
|
||||||
# State
|
|
||||||
self.state = OdomState()
|
|
||||||
self.last_time = None
|
|
||||||
|
|
||||||
# Last received messages
|
|
||||||
self.last_wheel_odom: Optional[Odometry] = None
|
|
||||||
self.last_visual_odom: Optional[Odometry] = None
|
|
||||||
self.last_imu: Optional[Imu] = None
|
|
||||||
|
|
||||||
# Subscriptions
|
|
||||||
self.create_subscription(
|
|
||||||
Odometry, "/saltybot/wheel_odom", self._on_wheel_odom, 10
|
|
||||||
)
|
|
||||||
self.create_subscription(
|
|
||||||
Odometry, "/rtab_map/odom", self._on_visual_odom, 10
|
|
||||||
)
|
|
||||||
self.create_subscription(Imu, "/imu/data", self._on_imu, 10)
|
|
||||||
|
|
||||||
# Publications
|
|
||||||
self.pub_odom = self.create_publisher(Odometry, "/odom", 10)
|
|
||||||
self.pub_info = self.create_publisher(String, "/saltybot/odom_fusion_info", 10)
|
|
||||||
|
|
||||||
# TF broadcaster
|
|
||||||
self.tf_broadcaster = TransformBroadcaster(self)
|
|
||||||
|
|
||||||
# Timer for periodic fusion and publishing at 50Hz
|
|
||||||
period = 1.0 / frequency
|
|
||||||
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"Odometry fusion initialized at {frequency}Hz. "
|
|
||||||
f"Weights - wheel: {self.wheel_weight:.2f}, "
|
|
||||||
f"visual: {self.visual_weight:.2f}, "
|
|
||||||
f"imu: {self.imu_weight:.2f}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _on_wheel_odom(self, msg: Odometry) -> None:
|
|
||||||
"""Update wheel odometry."""
|
|
||||||
self.last_wheel_odom = msg
|
|
||||||
|
|
||||||
def _on_visual_odom(self, msg: Odometry) -> None:
|
|
||||||
"""Update visual odometry."""
|
|
||||||
self.last_visual_odom = msg
|
|
||||||
|
|
||||||
def _on_imu(self, msg: Imu) -> None:
|
|
||||||
"""Update IMU data."""
|
|
||||||
self.last_imu = msg
|
|
||||||
|
|
||||||
def _timer_callback(self) -> None:
|
|
||||||
"""Fuse odometry sources and publish at 50Hz."""
|
|
||||||
now = self.get_clock().now()
|
|
||||||
|
|
||||||
if self.last_time is None:
|
|
||||||
self.last_time = now
|
|
||||||
return
|
|
||||||
|
|
||||||
dt = (now - self.last_time).nanoseconds / 1e9
|
|
||||||
self.last_time = now
|
|
||||||
|
|
||||||
# Fuse available sources
|
|
||||||
self._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Publish fused odometry
|
|
||||||
self._publish_odometry(now)
|
|
||||||
|
|
||||||
# Broadcast TF
|
|
||||||
self._broadcast_tf(now)
|
|
||||||
|
|
||||||
# Publish debug info
|
|
||||||
self._publish_info()
|
|
||||||
|
|
||||||
def _fuse_odometry(self, dt: float) -> None:
|
|
||||||
"""Fuse odometry sources using complementary filtering."""
|
|
||||||
if dt <= 0 or dt > 1.0: # Safety check on dt
|
|
||||||
return
|
|
||||||
|
|
||||||
# Collect available measurements
|
|
||||||
measurements = {}
|
|
||||||
|
|
||||||
if self.last_wheel_odom is not None:
|
|
||||||
measurements["wheel"] = self.last_wheel_odom
|
|
||||||
|
|
||||||
if self.last_visual_odom is not None:
|
|
||||||
measurements["visual"] = self.last_visual_odom
|
|
||||||
|
|
||||||
if self.last_imu is not None:
|
|
||||||
measurements["imu"] = self.last_imu
|
|
||||||
|
|
||||||
if not measurements:
|
|
||||||
return # No data to fuse
|
|
||||||
|
|
||||||
# Complementary filter approach
|
|
||||||
# High-frequency IMU for attitude, low-frequency vision for position correction
|
|
||||||
|
|
||||||
# Extract velocities from wheel odometry (most reliable for linear motion)
|
|
||||||
if "wheel" in measurements:
|
|
||||||
wheel_odom = measurements["wheel"]
|
|
||||||
self.state.vx = wheel_odom.twist.twist.linear.x * self.wheel_weight
|
|
||||||
self.state.vy = wheel_odom.twist.twist.linear.y * self.wheel_weight
|
|
||||||
|
|
||||||
# Integrate IMU angular velocity (high-frequency)
|
|
||||||
if "imu" in measurements:
|
|
||||||
imu = measurements["imu"]
|
|
||||||
# Use IMU for angular velocity (most accurate for rotation)
|
|
||||||
self.state.omega = imu.angular_velocity.z * self.imu_weight
|
|
||||||
|
|
||||||
# Update orientation from IMU
|
|
||||||
roll, pitch, yaw = euler_from_quaternion([
|
|
||||||
imu.orientation.x, imu.orientation.y,
|
|
||||||
imu.orientation.z, imu.orientation.w
|
|
||||||
])
|
|
||||||
# Blend with existing estimate
|
|
||||||
self.state.theta = (0.8 * self.state.theta + 0.2 * yaw)
|
|
||||||
|
|
||||||
# Apply velocity-based position update
|
|
||||||
self.state.x += self.state.vx * dt * math.cos(self.state.theta)
|
|
||||||
self.state.x -= self.state.vy * dt * math.sin(self.state.theta)
|
|
||||||
|
|
||||||
self.state.y += self.state.vx * dt * math.sin(self.state.theta)
|
|
||||||
self.state.y += self.state.vy * dt * math.cos(self.state.theta)
|
|
||||||
|
|
||||||
# Visual odometry correction (low-frequency drift correction)
|
|
||||||
if "visual" in measurements and self.visual_weight > 0:
|
|
||||||
visual_odom = measurements["visual"]
|
|
||||||
visual_x = visual_odom.pose.pose.position.x
|
|
||||||
visual_y = visual_odom.pose.pose.position.y
|
|
||||||
|
|
||||||
# Gradually correct position drift using visual odometry
|
|
||||||
correction_factor = self.visual_weight * 0.1 # Small correction rate
|
|
||||||
self.state.x += (visual_x - self.state.x) * correction_factor
|
|
||||||
self.state.y += (visual_y - self.state.y) * correction_factor
|
|
||||||
|
|
||||||
# Correct orientation from visual odometry
|
|
||||||
visual_roll, visual_pitch, visual_yaw = euler_from_quaternion([
|
|
||||||
visual_odom.pose.pose.orientation.x,
|
|
||||||
visual_odom.pose.pose.orientation.y,
|
|
||||||
visual_odom.pose.pose.orientation.z,
|
|
||||||
visual_odom.pose.pose.orientation.w
|
|
||||||
])
|
|
||||||
self.state.theta += (visual_yaw - self.state.theta) * correction_factor
|
|
||||||
|
|
||||||
# Update covariances based on available measurements
|
|
||||||
self._update_covariances(measurements)
|
|
||||||
|
|
||||||
def _update_covariances(self, measurements: dict) -> None:
|
|
||||||
"""Update state covariance based on measurement sources."""
|
|
||||||
# Start with baseline
|
|
||||||
base_pos_cov = 0.01
|
|
||||||
base_theta_cov = 0.01
|
|
||||||
|
|
||||||
# Reduce covariance if we have good measurements
|
|
||||||
if "wheel" in measurements:
|
|
||||||
base_pos_cov *= 0.5
|
|
||||||
if "visual" in measurements:
|
|
||||||
base_pos_cov *= 0.7
|
|
||||||
if "imu" in measurements:
|
|
||||||
base_theta_cov *= 0.3
|
|
||||||
|
|
||||||
# Update covariance matrices
|
|
||||||
self.state.cov_pos = np.eye(2) * base_pos_cov
|
|
||||||
self.state.cov_theta = base_theta_cov
|
|
||||||
|
|
||||||
def _publish_odometry(self, now) -> None:
|
|
||||||
"""Publish fused odometry message."""
|
|
||||||
odom_msg = Odometry()
|
|
||||||
odom_msg.header.stamp = now.to_msg()
|
|
||||||
odom_msg.header.frame_id = "odom"
|
|
||||||
odom_msg.child_frame_id = "base_link"
|
|
||||||
|
|
||||||
# Position
|
|
||||||
odom_msg.pose.pose.position.x = self.state.x
|
|
||||||
odom_msg.pose.pose.position.y = self.state.y
|
|
||||||
odom_msg.pose.pose.position.z = 0.0
|
|
||||||
|
|
||||||
# Orientation (yaw only, roll/pitch from IMU)
|
|
||||||
q = quaternion_from_euler(0, 0, self.state.theta)
|
|
||||||
odom_msg.pose.pose.orientation = Quaternion(
|
|
||||||
x=q[0], y=q[1], z=q[2], w=q[3]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Position covariance (6x6)
|
|
||||||
pose_cov = np.zeros((6, 6))
|
|
||||||
pose_cov[0:2, 0:2] = self.state.cov_pos
|
|
||||||
pose_cov[5, 5] = self.state.cov_theta # Yaw variance
|
|
||||||
odom_msg.pose.covariance = pose_cov.flatten().tolist()
|
|
||||||
|
|
||||||
# Velocity
|
|
||||||
odom_msg.twist.twist.linear.x = self.state.vx
|
|
||||||
odom_msg.twist.twist.linear.y = self.state.vy
|
|
||||||
odom_msg.twist.twist.linear.z = 0.0
|
|
||||||
odom_msg.twist.twist.angular.z = self.state.omega
|
|
||||||
|
|
||||||
# Velocity covariance (6x6)
|
|
||||||
twist_cov = np.zeros((6, 6))
|
|
||||||
twist_cov[0:2, 0:2] = self.state.cov_vel
|
|
||||||
twist_cov[5, 5] = 0.01 # Angular velocity variance
|
|
||||||
odom_msg.twist.covariance = twist_cov.flatten().tolist()
|
|
||||||
|
|
||||||
self.pub_odom.publish(odom_msg)
|
|
||||||
|
|
||||||
def _broadcast_tf(self, now) -> None:
|
|
||||||
"""Broadcast odom→base_link TF."""
|
|
||||||
t = TransformStamped()
|
|
||||||
t.header.stamp = now.to_msg()
|
|
||||||
t.header.frame_id = "odom"
|
|
||||||
t.child_frame_id = "base_link"
|
|
||||||
|
|
||||||
# Translation
|
|
||||||
t.transform.translation.x = self.state.x
|
|
||||||
t.transform.translation.y = self.state.y
|
|
||||||
t.transform.translation.z = 0.0
|
|
||||||
|
|
||||||
# Rotation (yaw only)
|
|
||||||
q = quaternion_from_euler(0, 0, self.state.theta)
|
|
||||||
t.transform.rotation = Quaternion(
|
|
||||||
x=q[0], y=q[1], z=q[2], w=q[3]
|
|
||||||
)
|
|
||||||
|
|
||||||
self.tf_broadcaster.sendTransform(t)
|
|
||||||
|
|
||||||
def _publish_info(self) -> None:
|
|
||||||
"""Publish fusion debug information."""
|
|
||||||
info = {
|
|
||||||
"timestamp": self.get_clock().now().nanoseconds / 1e9,
|
|
||||||
"position": [self.state.x, self.state.y],
|
|
||||||
"theta_deg": math.degrees(self.state.theta),
|
|
||||||
"velocity": [self.state.vx, self.state.vy],
|
|
||||||
"omega": self.state.omega,
|
|
||||||
"sources": {
|
|
||||||
"wheel": self.last_wheel_odom is not None,
|
|
||||||
"visual": self.last_visual_odom is not None,
|
|
||||||
"imu": self.last_imu is not None,
|
|
||||||
},
|
|
||||||
"weights": {
|
|
||||||
"wheel": f"{self.wheel_weight:.2f}",
|
|
||||||
"visual": f"{self.visual_weight:.2f}",
|
|
||||||
"imu": f"{self.imu_weight:.2f}",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
msg = String(data=json.dumps(info))
|
|
||||||
self.pub_info.publish(msg)
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = OdomFusionNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@ -1,4 +0,0 @@
|
|||||||
[develop]
|
|
||||||
script_dir=$base/lib/saltybot_odom_fusion
|
|
||||||
[egg_info]
|
|
||||||
tag_date = 0
|
|
||||||
@ -1,29 +0,0 @@
|
|||||||
from setuptools import setup
|
|
||||||
|
|
||||||
package_name = "saltybot_odom_fusion"
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name=package_name,
|
|
||||||
version="0.1.0",
|
|
||||||
packages=[package_name],
|
|
||||||
data_files=[
|
|
||||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
|
||||||
(f"share/{package_name}", ["package.xml"]),
|
|
||||||
(f"share/{package_name}/launch", ["launch/odom_fusion.launch.py"]),
|
|
||||||
(f"share/{package_name}/config", ["config/fusion_config.yaml"]),
|
|
||||||
],
|
|
||||||
install_requires=["setuptools"],
|
|
||||||
zip_safe=True,
|
|
||||||
maintainer="sl-controls",
|
|
||||||
maintainer_email="sl-controls@saltylab.local",
|
|
||||||
description=(
|
|
||||||
"Odometry fusion: wheel + visual + IMU complementary filter with TF broadcast"
|
|
||||||
),
|
|
||||||
license="MIT",
|
|
||||||
tests_require=["pytest"],
|
|
||||||
entry_points={
|
|
||||||
"console_scripts": [
|
|
||||||
"odom_fusion_node = saltybot_odom_fusion.odom_fusion_node:main",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
@ -1,361 +0,0 @@
|
|||||||
"""Unit tests for odom_fusion_node."""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import math
|
|
||||||
import json
|
|
||||||
import numpy as np
|
|
||||||
from geometry_msgs.msg import Pose, Quaternion, Twist, Vector3
|
|
||||||
from nav_msgs.msg import Odometry
|
|
||||||
from sensor_msgs.msg import Imu
|
|
||||||
from std_msgs.msg import String, Header
|
|
||||||
from tf_transformations import quaternion_from_euler, euler_from_quaternion
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
|
|
||||||
# Import the node and classes under test
|
|
||||||
from saltybot_odom_fusion.odom_fusion_node import (
|
|
||||||
OdomFusionNode,
|
|
||||||
OdomState,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def rclpy_fixture():
|
|
||||||
"""Initialize and cleanup rclpy."""
|
|
||||||
rclpy.init()
|
|
||||||
yield
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def node(rclpy_fixture):
|
|
||||||
"""Create an odometry fusion node instance."""
|
|
||||||
node = OdomFusionNode()
|
|
||||||
yield node
|
|
||||||
node.destroy_node()
|
|
||||||
|
|
||||||
|
|
||||||
class TestOdomState:
|
|
||||||
"""Test suite for OdomState dataclass."""
|
|
||||||
|
|
||||||
def test_odom_state_initialization(self):
|
|
||||||
"""Test OdomState initializes with correct defaults."""
|
|
||||||
state = OdomState()
|
|
||||||
|
|
||||||
assert state.x == 0.0
|
|
||||||
assert state.y == 0.0
|
|
||||||
assert state.theta == 0.0
|
|
||||||
assert state.vx == 0.0
|
|
||||||
assert state.vy == 0.0
|
|
||||||
assert state.omega == 0.0
|
|
||||||
|
|
||||||
def test_odom_state_covariance_initialization(self):
|
|
||||||
"""Test OdomState covariance matrices."""
|
|
||||||
state = OdomState()
|
|
||||||
|
|
||||||
assert state.cov_pos.shape == (2, 2)
|
|
||||||
assert state.cov_theta == 0.01
|
|
||||||
assert state.cov_vel.shape == (2, 2)
|
|
||||||
|
|
||||||
|
|
||||||
class TestOdomFusionNode:
|
|
||||||
"""Test suite for OdomFusionNode."""
|
|
||||||
|
|
||||||
def test_node_initialization(self, node):
|
|
||||||
"""Test that node initializes with correct defaults."""
|
|
||||||
assert node.state.x == 0.0
|
|
||||||
assert node.state.y == 0.0
|
|
||||||
assert node.state.theta == 0.0
|
|
||||||
assert node.last_wheel_odom is None
|
|
||||||
assert node.last_visual_odom is None
|
|
||||||
assert node.last_imu is None
|
|
||||||
|
|
||||||
def test_weight_normalization(self, node):
|
|
||||||
"""Test that weights are normalized to sum to 1.0."""
|
|
||||||
total = node.wheel_weight + node.visual_weight + node.imu_weight
|
|
||||||
assert abs(total - 1.0) < 1e-6
|
|
||||||
|
|
||||||
def test_wheel_odom_subscription(self, node):
|
|
||||||
"""Test wheel odometry message handling."""
|
|
||||||
odom_msg = Odometry()
|
|
||||||
odom_msg.twist.twist.linear.x = 1.0
|
|
||||||
odom_msg.twist.twist.linear.y = 0.5
|
|
||||||
|
|
||||||
node._on_wheel_odom(odom_msg)
|
|
||||||
assert node.last_wheel_odom is not None
|
|
||||||
assert node.last_wheel_odom.twist.twist.linear.x == 1.0
|
|
||||||
|
|
||||||
def test_visual_odom_subscription(self, node):
|
|
||||||
"""Test visual odometry message handling."""
|
|
||||||
odom_msg = Odometry()
|
|
||||||
odom_msg.pose.pose.position.x = 5.0
|
|
||||||
odom_msg.pose.pose.position.y = 3.0
|
|
||||||
|
|
||||||
node._on_visual_odom(odom_msg)
|
|
||||||
assert node.last_visual_odom is not None
|
|
||||||
assert node.last_visual_odom.pose.pose.position.x == 5.0
|
|
||||||
|
|
||||||
def test_imu_subscription(self, node):
|
|
||||||
"""Test IMU message handling."""
|
|
||||||
imu_msg = Imu()
|
|
||||||
imu_msg.angular_velocity.z = 0.1
|
|
||||||
|
|
||||||
node._on_imu(imu_msg)
|
|
||||||
assert node.last_imu is not None
|
|
||||||
assert node.last_imu.angular_velocity.z == 0.1
|
|
||||||
|
|
||||||
def test_position_integration(self, node):
|
|
||||||
"""Test position integration from velocity."""
|
|
||||||
node.state.vx = 1.0 # 1 m/s
|
|
||||||
node.state.vy = 0.0
|
|
||||||
node.state.theta = 0.0 # Facing +x direction
|
|
||||||
|
|
||||||
dt = 1.0 # 1 second
|
|
||||||
node._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Should move 1 meter in +x direction
|
|
||||||
assert abs(node.state.x - 1.0) < 0.1
|
|
||||||
|
|
||||||
def test_position_integration_rotated(self, node):
|
|
||||||
"""Test position integration with rotation."""
|
|
||||||
node.state.vx = 1.0 # 1 m/s
|
|
||||||
node.state.vy = 0.0
|
|
||||||
node.state.theta = math.pi / 2 # Facing +y direction
|
|
||||||
|
|
||||||
dt = 1.0
|
|
||||||
node._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Should move 1 meter in +y direction
|
|
||||||
assert abs(node.state.y - 1.0) < 0.1
|
|
||||||
|
|
||||||
def test_angular_velocity_update(self, node):
|
|
||||||
"""Test angular velocity from IMU."""
|
|
||||||
imu_msg = Imu()
|
|
||||||
imu_msg.angular_velocity.z = 0.2
|
|
||||||
imu_msg.orientation.w = 1.0
|
|
||||||
|
|
||||||
node.last_imu = imu_msg
|
|
||||||
dt = 0.1
|
|
||||||
|
|
||||||
node._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Angular velocity should be weighted by imu_weight
|
|
||||||
expected_omega = 0.2 * node.imu_weight
|
|
||||||
assert abs(node.state.omega - expected_omega) < 0.01
|
|
||||||
|
|
||||||
def test_velocity_blending_wheel_only(self, node):
|
|
||||||
"""Test velocity blending with only wheel odometry."""
|
|
||||||
wheel_msg = Odometry()
|
|
||||||
wheel_msg.twist.twist.linear.x = 1.0
|
|
||||||
wheel_msg.twist.twist.linear.y = 0.0
|
|
||||||
|
|
||||||
node.last_wheel_odom = wheel_msg
|
|
||||||
dt = 0.1
|
|
||||||
|
|
||||||
node._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Velocity should be weighted by wheel_weight
|
|
||||||
expected_vx = 1.0 * node.wheel_weight
|
|
||||||
assert abs(node.state.vx - expected_vx) < 0.01
|
|
||||||
|
|
||||||
def test_visual_drift_correction(self, node):
|
|
||||||
"""Test drift correction from visual odometry."""
|
|
||||||
# Current state
|
|
||||||
node.state.x = 0.0
|
|
||||||
node.state.y = 0.0
|
|
||||||
|
|
||||||
# Visual odometry says we're at (1.0, 1.0)
|
|
||||||
visual_msg = Odometry()
|
|
||||||
visual_msg.pose.pose.position.x = 1.0
|
|
||||||
visual_msg.pose.pose.position.y = 1.0
|
|
||||||
visual_msg.pose.pose.orientation.w = 1.0
|
|
||||||
|
|
||||||
node.last_visual_odom = visual_msg
|
|
||||||
dt = 0.1
|
|
||||||
|
|
||||||
node._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Position should have moved toward visual estimate
|
|
||||||
assert node.state.x > 0.0
|
|
||||||
assert node.state.y > 0.0
|
|
||||||
|
|
||||||
def test_covariance_updates(self, node):
|
|
||||||
"""Test that covariances are updated based on measurements."""
|
|
||||||
wheel_msg = Odometry()
|
|
||||||
visual_msg = Odometry()
|
|
||||||
visual_msg.pose.pose.orientation.w = 1.0
|
|
||||||
|
|
||||||
node.last_wheel_odom = wheel_msg
|
|
||||||
node.last_visual_odom = visual_msg
|
|
||||||
|
|
||||||
measurements = {
|
|
||||||
"wheel": wheel_msg,
|
|
||||||
"visual": visual_msg,
|
|
||||||
}
|
|
||||||
|
|
||||||
node._update_covariances(measurements)
|
|
||||||
|
|
||||||
# Covariance should be reduced with good measurements
|
|
||||||
assert node.state.cov_pos[0, 0] < 0.01
|
|
||||||
|
|
||||||
def test_quaternion_to_yaw(self, node):
|
|
||||||
"""Test quaternion to yaw conversion."""
|
|
||||||
# Create 90 degree yaw rotation
|
|
||||||
q = quaternion_from_euler(0, 0, math.pi / 2)
|
|
||||||
|
|
||||||
imu_msg = Imu()
|
|
||||||
imu_msg.orientation.x = q[0]
|
|
||||||
imu_msg.orientation.y = q[1]
|
|
||||||
imu_msg.orientation.z = q[2]
|
|
||||||
imu_msg.orientation.w = q[3]
|
|
||||||
|
|
||||||
node.last_imu = imu_msg
|
|
||||||
dt = 0.1
|
|
||||||
|
|
||||||
node._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Theta should be blended toward 90 degrees
|
|
||||||
assert node.state.theta > 0.0
|
|
||||||
|
|
||||||
def test_velocity_near_zero(self, node):
|
|
||||||
"""Test handling of near-zero velocities."""
|
|
||||||
wheel_msg = Odometry()
|
|
||||||
wheel_msg.twist.twist.linear.x = 0.001 # Very small
|
|
||||||
|
|
||||||
node.last_wheel_odom = wheel_msg
|
|
||||||
dt = 0.1
|
|
||||||
|
|
||||||
node._fuse_odometry(dt)
|
|
||||||
|
|
||||||
# Position change should be minimal
|
|
||||||
assert abs(node.state.x) < 0.0001
|
|
||||||
|
|
||||||
def test_invalid_dt_ignored(self, node):
|
|
||||||
"""Test that invalid dt values are ignored."""
|
|
||||||
initial_x = node.state.x
|
|
||||||
|
|
||||||
# Zero dt
|
|
||||||
node._fuse_odometry(0.0)
|
|
||||||
assert node.state.x == initial_x
|
|
||||||
|
|
||||||
# Negative dt
|
|
||||||
node._fuse_odometry(-1.0)
|
|
||||||
assert node.state.x == initial_x
|
|
||||||
|
|
||||||
# Very large dt
|
|
||||||
node._fuse_odometry(10.0)
|
|
||||||
assert node.state.x == initial_x
|
|
||||||
|
|
||||||
|
|
||||||
class TestOdomFusionScenarios:
|
|
||||||
"""Integration-style tests for realistic scenarios."""
|
|
||||||
|
|
||||||
def test_scenario_straight_line(self, node):
|
|
||||||
"""Scenario: robot moving in straight line."""
|
|
||||||
wheel_msg = Odometry()
|
|
||||||
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
|
|
||||||
wheel_msg.twist.twist.linear.y = 0.0
|
|
||||||
|
|
||||||
node.last_wheel_odom = wheel_msg
|
|
||||||
node.state.theta = 0.0 # Facing +x
|
|
||||||
|
|
||||||
# Move for 5 seconds
|
|
||||||
for _ in range(5):
|
|
||||||
node._fuse_odometry(1.0)
|
|
||||||
|
|
||||||
# Should be approximately 5 meters in +x direction
|
|
||||||
assert 4.0 < node.state.x < 6.0
|
|
||||||
assert abs(node.state.y) < 0.5
|
|
||||||
|
|
||||||
def test_scenario_circular_motion(self, node):
|
|
||||||
"""Scenario: robot moving in circle."""
|
|
||||||
wheel_msg = Odometry()
|
|
||||||
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
|
|
||||||
|
|
||||||
imu_msg = Imu()
|
|
||||||
imu_msg.angular_velocity.z = 0.5 # 0.5 rad/s rotation
|
|
||||||
imu_msg.orientation.w = 1.0
|
|
||||||
|
|
||||||
node.last_wheel_odom = wheel_msg
|
|
||||||
node.last_imu = imu_msg
|
|
||||||
|
|
||||||
# Simulate circular motion for 4 cycles
|
|
||||||
for i in range(4):
|
|
||||||
node._fuse_odometry(1.0)
|
|
||||||
|
|
||||||
# Should have moved distance and rotated
|
|
||||||
distance = math.sqrt(node.state.x**2 + node.state.y**2)
|
|
||||||
assert distance > 1.0
|
|
||||||
|
|
||||||
def test_scenario_drift_correction(self, node):
|
|
||||||
"""Scenario: wheel odometry drifts, vision corrects."""
|
|
||||||
# Wheel says we're at 10, 10
|
|
||||||
node.state.x = 10.0
|
|
||||||
node.state.y = 10.0
|
|
||||||
|
|
||||||
# Vision says we're actually at 9, 9
|
|
||||||
visual_msg = Odometry()
|
|
||||||
visual_msg.pose.pose.position.x = 9.0
|
|
||||||
visual_msg.pose.pose.position.y = 9.0
|
|
||||||
visual_msg.pose.pose.orientation.w = 1.0
|
|
||||||
|
|
||||||
node.last_visual_odom = visual_msg
|
|
||||||
|
|
||||||
# Before correction
|
|
||||||
initial_x = node.state.x
|
|
||||||
initial_y = node.state.y
|
|
||||||
|
|
||||||
node._fuse_odometry(1.0)
|
|
||||||
|
|
||||||
# Position should move toward visual estimate
|
|
||||||
assert node.state.x < initial_x
|
|
||||||
assert node.state.y < initial_y
|
|
||||||
|
|
||||||
def test_scenario_all_sensors_available(self, node):
|
|
||||||
"""Scenario: all three sensor sources available."""
|
|
||||||
wheel_msg = Odometry()
|
|
||||||
wheel_msg.twist.twist.linear.x = 0.5
|
|
||||||
|
|
||||||
visual_msg = Odometry()
|
|
||||||
visual_msg.pose.pose.position.x = 2.0
|
|
||||||
visual_msg.pose.pose.position.y = 1.0
|
|
||||||
visual_msg.pose.pose.orientation.w = 1.0
|
|
||||||
|
|
||||||
imu_msg = Imu()
|
|
||||||
q = quaternion_from_euler(0, 0, 0.1) # Small yaw
|
|
||||||
imu_msg.orientation.x = q[0]
|
|
||||||
imu_msg.orientation.y = q[1]
|
|
||||||
imu_msg.orientation.z = q[2]
|
|
||||||
imu_msg.orientation.w = q[3]
|
|
||||||
imu_msg.angular_velocity.z = 0.05
|
|
||||||
|
|
||||||
node.last_wheel_odom = wheel_msg
|
|
||||||
node.last_visual_odom = visual_msg
|
|
||||||
node.last_imu = imu_msg
|
|
||||||
|
|
||||||
measurements = {
|
|
||||||
"wheel": wheel_msg,
|
|
||||||
"visual": visual_msg,
|
|
||||||
"imu": imu_msg,
|
|
||||||
}
|
|
||||||
|
|
||||||
node._update_covariances(measurements)
|
|
||||||
|
|
||||||
# Covariance should be low with all sources
|
|
||||||
assert node.state.cov_pos[0, 0] < 0.01
|
|
||||||
assert node.state.cov_theta < 0.01
|
|
||||||
|
|
||||||
def test_scenario_sensor_dropout(self, node):
|
|
||||||
"""Scenario: visual sensor drops out, wheel continues."""
|
|
||||||
wheel_msg = Odometry()
|
|
||||||
wheel_msg.twist.twist.linear.x = 1.0
|
|
||||||
|
|
||||||
node.last_wheel_odom = wheel_msg
|
|
||||||
node.last_visual_odom = None # Visual dropout
|
|
||||||
node.last_imu = None
|
|
||||||
|
|
||||||
# Should continue with wheel only
|
|
||||||
node._fuse_odometry(1.0)
|
|
||||||
|
|
||||||
# Position should still integrate
|
|
||||||
assert node.state.x > 0.0
|
|
||||||
Loading…
x
Reference in New Issue
Block a user