Merge pull request 'feat: Add velocity smoothing filter ROS2 node' (#316) from sl-controls/velocity-smooth-filter into main
This commit is contained in:
commit
d5e0c58594
@ -0,0 +1,12 @@
|
||||
# Velocity Smoother Configuration
|
||||
velocity_smoother:
|
||||
ros__parameters:
|
||||
# Filter parameters
|
||||
filter_order: 2 # Butterworth filter order (1-4 typical)
|
||||
cutoff_frequency: 5.0 # Cutoff frequency in Hz (lower = more smoothing)
|
||||
|
||||
# Publishing frequency (Hz)
|
||||
publish_frequency: 50
|
||||
|
||||
# Enable/disable filtering
|
||||
enable_smoothing: true
|
||||
@ -0,0 +1,28 @@
|
||||
import os
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch_ros.substitutions import FindPackageShare
|
||||
from launch.substitutions import PathJoinSubstitution
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
config_dir = PathJoinSubstitution(
|
||||
[FindPackageShare('saltybot_velocity_smoother'), 'config']
|
||||
)
|
||||
config_file = PathJoinSubstitution([config_dir, 'velocity_smoother_config.yaml'])
|
||||
|
||||
velocity_smoother = Node(
|
||||
package='saltybot_velocity_smoother',
|
||||
executable='velocity_smoother_node',
|
||||
name='velocity_smoother',
|
||||
output='screen',
|
||||
parameters=[config_file],
|
||||
remappings=[
|
||||
('/odom', '/odom'),
|
||||
('/odom_smooth', '/odom_smooth'),
|
||||
],
|
||||
)
|
||||
|
||||
return LaunchDescription([
|
||||
velocity_smoother,
|
||||
])
|
||||
29
jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml
Normal file
@ -0,0 +1,29 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_velocity_smoother</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Low-pass Butterworth filter for odometry velocity smoothing to reduce encoder jitter</description>
|
||||
|
||||
<maintainer email="sl-controls@saltybot.local">SaltyBot Controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<author email="sl-controls@saltybot.local">SaltyBot Controls Team</author>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
<buildtool_depend>ament_cmake_python</buildtool_depend>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,231 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Velocity smoother node with low-pass Butterworth filter.
|
||||
|
||||
Subscribes to /odom, applies low-pass Butterworth filter to linear and angular
|
||||
velocity components, and publishes smoothed odometry on /odom_smooth.
|
||||
|
||||
Reduces noise from encoder jitter and improves state estimation stability.
|
||||
"""
|
||||
|
||||
import math
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from nav_msgs.msg import Odometry
|
||||
from std_msgs.msg import Float32
|
||||
|
||||
|
||||
class ButterworthFilter:
|
||||
"""Simple second-order Butterworth low-pass filter for continuous signals."""
|
||||
|
||||
def __init__(self, cutoff_hz, sample_rate_hz, order=2):
|
||||
"""Initialize Butterworth filter.
|
||||
|
||||
Args:
|
||||
cutoff_hz: Cutoff frequency in Hz
|
||||
sample_rate_hz: Sampling rate in Hz
|
||||
order: Filter order (typically 1-4)
|
||||
"""
|
||||
self.cutoff_hz = cutoff_hz
|
||||
self.sample_rate_hz = sample_rate_hz
|
||||
self.order = order
|
||||
|
||||
# Normalized frequency (0 to 1, where 1 = Nyquist)
|
||||
self.omega_n = 2.0 * math.pi * cutoff_hz / sample_rate_hz
|
||||
|
||||
# Simplified filter coefficients for order 2
|
||||
# Using canonical form: y[n] = b0*x[n] + b1*x[n-1] + b2*x[n-2] - a1*y[n-1] - a2*y[n-2]
|
||||
if order == 1:
|
||||
# First-order filter
|
||||
alpha = self.omega_n / (self.omega_n + 2.0)
|
||||
self.b = [alpha, alpha]
|
||||
self.a = [1.0, -(1.0 - 2.0 * alpha)]
|
||||
else:
|
||||
# Second-order filter (butterworth)
|
||||
sqrt2 = math.sqrt(2.0)
|
||||
wc = math.tan(self.omega_n / 2.0)
|
||||
wc2 = wc * wc
|
||||
|
||||
denom = 1.0 + sqrt2 * wc + wc2
|
||||
|
||||
self.b = [wc2 / denom, 2.0 * wc2 / denom, wc2 / denom]
|
||||
self.a = [1.0,
|
||||
2.0 * (wc2 - 1.0) / denom,
|
||||
(1.0 - sqrt2 * wc + wc2) / denom]
|
||||
|
||||
# State buffers
|
||||
self.x_history = [0.0, 0.0, 0.0] # Input history
|
||||
self.y_history = [0.0, 0.0] # Output history
|
||||
|
||||
def filter(self, x):
|
||||
"""Apply filter to input value.
|
||||
|
||||
Args:
|
||||
x: Input value
|
||||
|
||||
Returns:
|
||||
Filtered output value
|
||||
"""
|
||||
# Update input history
|
||||
self.x_history[2] = self.x_history[1]
|
||||
self.x_history[1] = self.x_history[0]
|
||||
self.x_history[0] = x
|
||||
|
||||
# Compute output using difference equation
|
||||
if len(self.b) == 2:
|
||||
# First-order filter
|
||||
y = (self.b[0] * self.x_history[0] +
|
||||
self.b[1] * self.x_history[1] -
|
||||
self.a[1] * self.y_history[1])
|
||||
else:
|
||||
# Second-order filter
|
||||
y = (self.b[0] * self.x_history[0] +
|
||||
self.b[1] * self.x_history[1] +
|
||||
self.b[2] * self.x_history[2] -
|
||||
self.a[1] * self.y_history[1] -
|
||||
self.a[2] * self.y_history[2])
|
||||
|
||||
# Update output history
|
||||
self.y_history[1] = self.y_history[0]
|
||||
self.y_history[0] = y
|
||||
|
||||
return y
|
||||
|
||||
def reset(self):
|
||||
"""Reset filter state."""
|
||||
self.x_history = [0.0, 0.0, 0.0]
|
||||
self.y_history = [0.0, 0.0]
|
||||
|
||||
|
||||
class VelocitySmootherNode(Node):
|
||||
"""ROS2 node for velocity smoothing via low-pass filtering."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('velocity_smoother')
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter('filter_order', 2)
|
||||
self.declare_parameter('cutoff_frequency', 5.0)
|
||||
self.declare_parameter('publish_frequency', 50)
|
||||
self.declare_parameter('enable_smoothing', True)
|
||||
|
||||
filter_order = self.get_parameter('filter_order').value
|
||||
cutoff_frequency = self.get_parameter('cutoff_frequency').value
|
||||
publish_frequency = self.get_parameter('publish_frequency').value
|
||||
self.enable_smoothing = self.get_parameter('enable_smoothing').value
|
||||
|
||||
# Create filters for each velocity component
|
||||
self.filter_linear_x = ButterworthFilter(
|
||||
cutoff_frequency, publish_frequency, order=filter_order
|
||||
)
|
||||
self.filter_linear_y = ButterworthFilter(
|
||||
cutoff_frequency, publish_frequency, order=filter_order
|
||||
)
|
||||
self.filter_linear_z = ButterworthFilter(
|
||||
cutoff_frequency, publish_frequency, order=filter_order
|
||||
)
|
||||
self.filter_angular_x = ButterworthFilter(
|
||||
cutoff_frequency, publish_frequency, order=filter_order
|
||||
)
|
||||
self.filter_angular_y = ButterworthFilter(
|
||||
cutoff_frequency, publish_frequency, order=filter_order
|
||||
)
|
||||
self.filter_angular_z = ButterworthFilter(
|
||||
cutoff_frequency, publish_frequency, order=filter_order
|
||||
)
|
||||
|
||||
# Last received odometry
|
||||
self.last_odom = None
|
||||
|
||||
# Subscription to raw odometry
|
||||
self.sub_odom = self.create_subscription(
|
||||
Odometry, '/odom', self._on_odom, 10
|
||||
)
|
||||
|
||||
# Publisher for smoothed odometry
|
||||
self.pub_odom_smooth = self.create_publisher(Odometry, '/odom_smooth', 10)
|
||||
|
||||
# Timer for publishing at fixed frequency
|
||||
period = 1.0 / publish_frequency
|
||||
self.timer = self.create_timer(period, self._timer_callback)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Velocity smoother initialized. "
|
||||
f"Cutoff: {cutoff_frequency}Hz, Order: {filter_order}, "
|
||||
f"Publish: {publish_frequency}Hz"
|
||||
)
|
||||
|
||||
def _on_odom(self, msg: Odometry) -> None:
|
||||
"""Callback for incoming odometry messages."""
|
||||
self.last_odom = msg
|
||||
|
||||
def _timer_callback(self) -> None:
|
||||
"""Periodically filter and publish smoothed odometry."""
|
||||
if self.last_odom is None:
|
||||
return
|
||||
|
||||
# Create output message
|
||||
smoothed = Odometry()
|
||||
smoothed.header = self.last_odom.header
|
||||
smoothed.child_frame_id = self.last_odom.child_frame_id
|
||||
|
||||
# Copy pose (unchanged)
|
||||
smoothed.pose = self.last_odom.pose
|
||||
|
||||
if self.enable_smoothing:
|
||||
# Filter velocity components
|
||||
linear_x = self.filter_linear_x.filter(
|
||||
self.last_odom.twist.twist.linear.x
|
||||
)
|
||||
linear_y = self.filter_linear_y.filter(
|
||||
self.last_odom.twist.twist.linear.y
|
||||
)
|
||||
linear_z = self.filter_linear_z.filter(
|
||||
self.last_odom.twist.twist.linear.z
|
||||
)
|
||||
|
||||
angular_x = self.filter_angular_x.filter(
|
||||
self.last_odom.twist.twist.angular.x
|
||||
)
|
||||
angular_y = self.filter_angular_y.filter(
|
||||
self.last_odom.twist.twist.angular.y
|
||||
)
|
||||
angular_z = self.filter_angular_z.filter(
|
||||
self.last_odom.twist.twist.angular.z
|
||||
)
|
||||
else:
|
||||
# No filtering
|
||||
linear_x = self.last_odom.twist.twist.linear.x
|
||||
linear_y = self.last_odom.twist.twist.linear.y
|
||||
linear_z = self.last_odom.twist.twist.linear.z
|
||||
angular_x = self.last_odom.twist.twist.angular.x
|
||||
angular_y = self.last_odom.twist.twist.angular.y
|
||||
angular_z = self.last_odom.twist.twist.angular.z
|
||||
|
||||
# Set smoothed twist
|
||||
smoothed.twist.twist.linear.x = linear_x
|
||||
smoothed.twist.twist.linear.y = linear_y
|
||||
smoothed.twist.twist.linear.z = linear_z
|
||||
smoothed.twist.twist.angular.x = angular_x
|
||||
smoothed.twist.twist.angular.y = angular_y
|
||||
smoothed.twist.twist.angular.z = angular_z
|
||||
|
||||
# Copy covariances
|
||||
smoothed.twist.covariance = self.last_odom.twist.covariance
|
||||
|
||||
self.pub_odom_smooth.publish(smoothed)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = VelocitySmootherNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
5
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_velocity_smoother
|
||||
[egg_info]
|
||||
tag_build =
|
||||
tag_date = 0
|
||||
34
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py
Normal file
34
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py
Normal file
@ -0,0 +1,34 @@
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
package_name = 'saltybot_velocity_smoother'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.1.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/saltybot_velocity_smoother']),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
('share/' + package_name + '/config', ['config/velocity_smoother_config.yaml']),
|
||||
('share/' + package_name + '/launch', ['launch/velocity_smoother.launch.py']),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
author='SaltyBot Controls',
|
||||
author_email='sl-controls@saltybot.local',
|
||||
maintainer='SaltyBot Controls',
|
||||
maintainer_email='sl-controls@saltybot.local',
|
||||
keywords=['ROS2', 'velocity', 'filtering', 'butterworth'],
|
||||
classifiers=[
|
||||
'Intended Audience :: Developers',
|
||||
'License :: OSI Approved :: MIT License',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Topic :: Software Development',
|
||||
],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'velocity_smoother_node=saltybot_velocity_smoother.velocity_smoother_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,432 @@
|
||||
"""Unit tests for velocity smoother node."""
|
||||
|
||||
import pytest
|
||||
import math
|
||||
from nav_msgs.msg import Odometry
|
||||
from geometry_msgs.msg import TwistWithCovariance, Twist, Vector3
|
||||
from std_msgs.msg import Header
|
||||
|
||||
import rclpy
|
||||
from rclpy.time import Time
|
||||
|
||||
from saltybot_velocity_smoother.velocity_smoother_node import (
|
||||
VelocitySmootherNode,
|
||||
ButterworthFilter,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rclpy_fixture():
|
||||
"""Initialize and cleanup rclpy."""
|
||||
rclpy.init()
|
||||
yield
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node(rclpy_fixture):
|
||||
"""Create a velocity smoother node instance."""
|
||||
node = VelocitySmootherNode()
|
||||
yield node
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
class TestButterworthFilter:
|
||||
"""Test suite for Butterworth filter implementation."""
|
||||
|
||||
def test_filter_initialization(self):
|
||||
"""Test filter initialization with valid parameters."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
assert filt.cutoff_hz == 5.0
|
||||
assert filt.sample_rate_hz == 50
|
||||
assert filt.order == 2
|
||||
|
||||
def test_first_order_filter(self):
|
||||
"""Test first-order Butterworth filter."""
|
||||
filt = ButterworthFilter(5.0, 50, order=1)
|
||||
assert len(filt.b) == 2
|
||||
assert len(filt.a) == 2
|
||||
|
||||
def test_second_order_filter(self):
|
||||
"""Test second-order Butterworth filter."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
assert len(filt.b) == 3
|
||||
assert len(filt.a) == 3
|
||||
|
||||
def test_filter_step_response(self):
|
||||
"""Test filter response to step input."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
|
||||
# Apply step input (0 -> 1.0)
|
||||
outputs = []
|
||||
for i in range(50):
|
||||
y = filt.filter(1.0)
|
||||
outputs.append(y)
|
||||
|
||||
# Final output should be close to 1.0
|
||||
assert outputs[-1] > 0.9
|
||||
assert outputs[-1] <= 1.0
|
||||
|
||||
def test_filter_constant_input(self):
|
||||
"""Test filter with constant input."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
|
||||
# Apply constant input
|
||||
for i in range(100):
|
||||
y = filt.filter(2.5)
|
||||
|
||||
# Should converge to input value
|
||||
assert abs(y - 2.5) < 0.01
|
||||
|
||||
def test_filter_zero_input(self):
|
||||
"""Test filter with zero input."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
|
||||
# Apply non-zero then zero
|
||||
for i in range(50):
|
||||
filt.filter(1.0)
|
||||
|
||||
# Now apply zero
|
||||
for i in range(50):
|
||||
y = filt.filter(0.0)
|
||||
|
||||
# Should decay to zero
|
||||
assert abs(y) < 0.01
|
||||
|
||||
def test_filter_reset(self):
|
||||
"""Test filter state reset."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
|
||||
# Filter some values
|
||||
for i in range(10):
|
||||
filt.filter(1.0)
|
||||
|
||||
# Reset
|
||||
filt.reset()
|
||||
|
||||
# State should be zero
|
||||
assert filt.x_history == [0.0, 0.0, 0.0]
|
||||
assert filt.y_history == [0.0, 0.0]
|
||||
|
||||
def test_filter_oscillation_dampening(self):
|
||||
"""Test that filter dampens high-frequency oscillations."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
|
||||
# Apply alternating signal (high frequency)
|
||||
outputs = []
|
||||
for i in range(100):
|
||||
x = 1.0 if i % 2 == 0 else -1.0
|
||||
y = filt.filter(x)
|
||||
outputs.append(y)
|
||||
|
||||
# Oscillation amplitude should be reduced
|
||||
final_amp = max(abs(outputs[-1]), abs(outputs[-2]))
|
||||
assert final_amp < 0.5 # Much lower than input amplitude
|
||||
|
||||
def test_filter_different_cutoffs(self):
|
||||
"""Test filters with different cutoff frequencies."""
|
||||
filt_low = ButterworthFilter(2.0, 50, order=2)
|
||||
filt_high = ButterworthFilter(10.0, 50, order=2)
|
||||
|
||||
# Both should be valid
|
||||
assert filt_low.cutoff_hz == 2.0
|
||||
assert filt_high.cutoff_hz == 10.0
|
||||
|
||||
def test_filter_output_bounds(self):
|
||||
"""Test that filter output stays bounded."""
|
||||
filt = ButterworthFilter(5.0, 50, order=2)
|
||||
|
||||
# Apply large random-like values
|
||||
for i in range(100):
|
||||
x = math.sin(i * 0.5) * 5.0
|
||||
y = filt.filter(x)
|
||||
assert abs(y) < 10.0 # Should stay bounded
|
||||
|
||||
|
||||
class TestVelocitySmootherNode:
|
||||
"""Test suite for VelocitySmootherNode."""
|
||||
|
||||
def test_node_initialization(self, node):
|
||||
"""Test that node initializes correctly."""
|
||||
assert node.last_odom is None
|
||||
assert node.enable_smoothing is True
|
||||
|
||||
def test_node_has_filters(self, node):
|
||||
"""Test that node creates all velocity filters."""
|
||||
assert node.filter_linear_x is not None
|
||||
assert node.filter_linear_y is not None
|
||||
assert node.filter_linear_z is not None
|
||||
assert node.filter_angular_x is not None
|
||||
assert node.filter_angular_y is not None
|
||||
assert node.filter_angular_z is not None
|
||||
|
||||
def test_odom_subscription_updates(self, node):
|
||||
"""Test that odometry subscription updates last_odom."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.linear.x = 1.0
|
||||
|
||||
node._on_odom(odom)
|
||||
|
||||
assert node.last_odom is not None
|
||||
assert node.last_odom.twist.twist.linear.x == 1.0
|
||||
|
||||
def test_filter_linear_velocity(self, node):
|
||||
"""Test linear velocity filtering."""
|
||||
# Create odometry message
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.child_frame_id = "base_link"
|
||||
odom.twist.twist.linear.x = 1.0
|
||||
odom.twist.twist.linear.y = 0.5
|
||||
odom.twist.twist.linear.z = 0.0
|
||||
odom.twist.twist.angular.x = 0.0
|
||||
odom.twist.twist.angular.y = 0.0
|
||||
odom.twist.twist.angular.z = 0.2
|
||||
|
||||
node._on_odom(odom)
|
||||
|
||||
# Call timer callback to process
|
||||
node._timer_callback()
|
||||
|
||||
# Filter should have been applied
|
||||
assert node.filter_linear_x.x_history[0] == 1.0
|
||||
|
||||
def test_filter_angular_velocity(self, node):
|
||||
"""Test angular velocity filtering."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.angular.z = 0.5
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
assert node.filter_angular_z.x_history[0] == 0.5
|
||||
|
||||
def test_smoothing_disabled(self, node):
|
||||
"""Test that filter can be disabled."""
|
||||
node.enable_smoothing = False
|
||||
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.linear.x = 2.0
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# When disabled, output should equal input directly
|
||||
|
||||
def test_no_odom_doesnt_crash(self, node):
|
||||
"""Test that timer callback handles missing odometry gracefully."""
|
||||
# Call timer without setting odometry
|
||||
node._timer_callback()
|
||||
|
||||
# Should not crash, just return
|
||||
|
||||
def test_odom_header_preserved(self, node):
|
||||
"""Test that odometry header is preserved in output."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "test_frame"
|
||||
odom.header.stamp = node.get_clock().now()
|
||||
odom.child_frame_id = "test_child"
|
||||
|
||||
node._on_odom(odom)
|
||||
|
||||
# Timer callback processes it
|
||||
node._timer_callback()
|
||||
|
||||
# Header should be preserved
|
||||
|
||||
def test_zero_velocity_filtering(self, node):
|
||||
"""Test filtering of zero velocities."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.linear.x = 0.0
|
||||
odom.twist.twist.linear.y = 0.0
|
||||
odom.twist.twist.linear.z = 0.0
|
||||
odom.twist.twist.angular.x = 0.0
|
||||
odom.twist.twist.angular.y = 0.0
|
||||
odom.twist.twist.angular.z = 0.0
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Filters should handle zero input
|
||||
|
||||
def test_negative_velocities(self, node):
|
||||
"""Test filtering of negative velocities."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.linear.x = -1.0
|
||||
odom.twist.twist.angular.z = -0.5
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
assert node.filter_linear_x.x_history[0] == -1.0
|
||||
|
||||
def test_high_frequency_noise_dampening(self, node):
|
||||
"""Test that filter dampens high-frequency encoder noise."""
|
||||
# Simulate noisy encoder output
|
||||
base_velocity = 1.0
|
||||
noise_amplitude = 0.2
|
||||
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
|
||||
# Apply alternating noise
|
||||
for i in range(100):
|
||||
odom.twist.twist.linear.x = base_velocity + (noise_amplitude if i % 2 == 0 else -noise_amplitude)
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# After filtering, output should be close to base velocity
|
||||
# (oscillations dampened)
|
||||
|
||||
def test_large_velocity_values(self, node):
|
||||
"""Test filtering of large velocity values."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.linear.x = 10.0
|
||||
odom.twist.twist.angular.z = 5.0
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Should handle large values without overflow
|
||||
|
||||
def test_pose_unchanged(self, node):
|
||||
"""Test that pose is not modified by velocity filtering."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.pose.pose.position.x = 5.0
|
||||
odom.pose.pose.position.y = 3.0
|
||||
odom.twist.twist.linear.x = 1.0
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Pose should be copied unchanged
|
||||
|
||||
def test_multiple_velocity_updates(self, node):
|
||||
"""Test filtering across multiple sequential velocity updates."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
|
||||
velocities = [0.5, 1.0, 1.5, 1.0, 0.5, 0.0]
|
||||
|
||||
for v in velocities:
|
||||
odom.twist.twist.linear.x = v
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Filter should smooth the velocity sequence
|
||||
|
||||
def test_simultaneous_all_velocities(self, node):
|
||||
"""Test filtering when all velocity components are present."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
|
||||
for i in range(30):
|
||||
odom.twist.twist.linear.x = math.sin(i * 0.1)
|
||||
odom.twist.twist.linear.y = math.cos(i * 0.1) * 0.5
|
||||
odom.twist.twist.linear.z = 0.1
|
||||
odom.twist.twist.angular.x = 0.05
|
||||
odom.twist.twist.angular.y = 0.05
|
||||
odom.twist.twist.angular.z = math.sin(i * 0.15)
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# All filters should operate independently
|
||||
|
||||
|
||||
class TestVelocitySmootherScenarios:
|
||||
"""Integration-style tests for realistic scenarios."""
|
||||
|
||||
def test_scenario_constant_velocity(self, node):
|
||||
"""Scenario: robot moving at constant velocity."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.linear.x = 1.0
|
||||
|
||||
for i in range(50):
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Should maintain constant velocity after convergence
|
||||
|
||||
def test_scenario_velocity_ramp(self, node):
|
||||
"""Scenario: velocity ramping up from stop."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
|
||||
for i in range(50):
|
||||
odom.twist.twist.linear.x = i * 0.02 # Ramp from 0 to 1.0
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Filter should smooth the ramp
|
||||
|
||||
def test_scenario_velocity_step(self, node):
|
||||
"""Scenario: sudden velocity change (e.g., collision avoidance)."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
|
||||
# First phase: constant velocity
|
||||
odom.twist.twist.linear.x = 1.0
|
||||
for i in range(25):
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Second phase: sudden stop
|
||||
odom.twist.twist.linear.x = 0.0
|
||||
for i in range(25):
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Filter should transition smoothly
|
||||
|
||||
def test_scenario_rotation_only(self, node):
|
||||
"""Scenario: robot spinning in place."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
odom.twist.twist.linear.x = 0.0
|
||||
odom.twist.twist.angular.z = 0.5
|
||||
|
||||
for i in range(50):
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Angular velocity should be filtered
|
||||
|
||||
def test_scenario_mixed_motion(self, node):
|
||||
"""Scenario: combined linear and angular motion."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
|
||||
for i in range(50):
|
||||
odom.twist.twist.linear.x = math.cos(i * 0.1)
|
||||
odom.twist.twist.linear.y = math.sin(i * 0.1)
|
||||
odom.twist.twist.angular.z = 0.2
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Both linear and angular components should be filtered
|
||||
|
||||
def test_scenario_encoder_noise_reduction(self, node):
|
||||
"""Scenario: realistic encoder jitter with filtering."""
|
||||
odom = Odometry()
|
||||
odom.header.frame_id = "odom"
|
||||
|
||||
# Simulate encoder jitter: base velocity + small noise
|
||||
base_vel = 1.0
|
||||
for i in range(100):
|
||||
jitter = 0.05 * math.sin(i * 0.5) + 0.03 * math.cos(i * 0.3)
|
||||
odom.twist.twist.linear.x = base_vel + jitter
|
||||
|
||||
node._on_odom(odom)
|
||||
node._timer_callback()
|
||||
|
||||
# Filter should reduce noise while maintaining base velocity
|
||||
Loading…
x
Reference in New Issue
Block a user