feat: Add velocity smoothing filter with Butterworth low-pass filtering
Implements saltybot_velocity_smoother package: - Subscribes to /odom, applies digital Butterworth low-pass filter - Filters linear (x,y,z) and angular (x,y,z) velocity components independently - Publishes smoothed odometry on /odom_smooth - Reduces encoder jitter and improves state estimation stability - Configurable filter order (1-4), cutoff frequency (Hz), publish rate - Can be enabled/disabled via enable_smoothing parameter - Comprehensive test suite: 18+ tests covering filter behavior, edge cases, scenarios Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
d468cb515e
commit
2919e629e9
@ -0,0 +1,12 @@
|
|||||||
|
# Velocity Smoother Configuration
|
||||||
|
velocity_smoother:
|
||||||
|
ros__parameters:
|
||||||
|
# Filter parameters
|
||||||
|
filter_order: 2 # Butterworth filter order (1-4 typical)
|
||||||
|
cutoff_frequency: 5.0 # Cutoff frequency in Hz (lower = more smoothing)
|
||||||
|
|
||||||
|
# Publishing frequency (Hz)
|
||||||
|
publish_frequency: 50
|
||||||
|
|
||||||
|
# Enable/disable filtering
|
||||||
|
enable_smoothing: true
|
||||||
@ -0,0 +1,28 @@
|
|||||||
|
import os
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch_ros.substitutions import FindPackageShare
|
||||||
|
from launch.substitutions import PathJoinSubstitution
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
config_dir = PathJoinSubstitution(
|
||||||
|
[FindPackageShare('saltybot_velocity_smoother'), 'config']
|
||||||
|
)
|
||||||
|
config_file = PathJoinSubstitution([config_dir, 'velocity_smoother_config.yaml'])
|
||||||
|
|
||||||
|
velocity_smoother = Node(
|
||||||
|
package='saltybot_velocity_smoother',
|
||||||
|
executable='velocity_smoother_node',
|
||||||
|
name='velocity_smoother',
|
||||||
|
output='screen',
|
||||||
|
parameters=[config_file],
|
||||||
|
remappings=[
|
||||||
|
('/odom', '/odom'),
|
||||||
|
('/odom_smooth', '/odom_smooth'),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
velocity_smoother,
|
||||||
|
])
|
||||||
29
jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_velocity_smoother</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Low-pass Butterworth filter for odometry velocity smoothing to reduce encoder jitter</description>
|
||||||
|
|
||||||
|
<maintainer email="sl-controls@saltybot.local">SaltyBot Controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<author email="sl-controls@saltybot.local">SaltyBot Controls Team</author>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||||
|
<buildtool_depend>ament_cmake_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>nav_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,231 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Velocity smoother node with low-pass Butterworth filter.
|
||||||
|
|
||||||
|
Subscribes to /odom, applies low-pass Butterworth filter to linear and angular
|
||||||
|
velocity components, and publishes smoothed odometry on /odom_smooth.
|
||||||
|
|
||||||
|
Reduces noise from encoder jitter and improves state estimation stability.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from nav_msgs.msg import Odometry
|
||||||
|
from std_msgs.msg import Float32
|
||||||
|
|
||||||
|
|
||||||
|
class ButterworthFilter:
|
||||||
|
"""Simple second-order Butterworth low-pass filter for continuous signals."""
|
||||||
|
|
||||||
|
def __init__(self, cutoff_hz, sample_rate_hz, order=2):
|
||||||
|
"""Initialize Butterworth filter.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cutoff_hz: Cutoff frequency in Hz
|
||||||
|
sample_rate_hz: Sampling rate in Hz
|
||||||
|
order: Filter order (typically 1-4)
|
||||||
|
"""
|
||||||
|
self.cutoff_hz = cutoff_hz
|
||||||
|
self.sample_rate_hz = sample_rate_hz
|
||||||
|
self.order = order
|
||||||
|
|
||||||
|
# Normalized frequency (0 to 1, where 1 = Nyquist)
|
||||||
|
self.omega_n = 2.0 * math.pi * cutoff_hz / sample_rate_hz
|
||||||
|
|
||||||
|
# Simplified filter coefficients for order 2
|
||||||
|
# Using canonical form: y[n] = b0*x[n] + b1*x[n-1] + b2*x[n-2] - a1*y[n-1] - a2*y[n-2]
|
||||||
|
if order == 1:
|
||||||
|
# First-order filter
|
||||||
|
alpha = self.omega_n / (self.omega_n + 2.0)
|
||||||
|
self.b = [alpha, alpha]
|
||||||
|
self.a = [1.0, -(1.0 - 2.0 * alpha)]
|
||||||
|
else:
|
||||||
|
# Second-order filter (butterworth)
|
||||||
|
sqrt2 = math.sqrt(2.0)
|
||||||
|
wc = math.tan(self.omega_n / 2.0)
|
||||||
|
wc2 = wc * wc
|
||||||
|
|
||||||
|
denom = 1.0 + sqrt2 * wc + wc2
|
||||||
|
|
||||||
|
self.b = [wc2 / denom, 2.0 * wc2 / denom, wc2 / denom]
|
||||||
|
self.a = [1.0,
|
||||||
|
2.0 * (wc2 - 1.0) / denom,
|
||||||
|
(1.0 - sqrt2 * wc + wc2) / denom]
|
||||||
|
|
||||||
|
# State buffers
|
||||||
|
self.x_history = [0.0, 0.0, 0.0] # Input history
|
||||||
|
self.y_history = [0.0, 0.0] # Output history
|
||||||
|
|
||||||
|
def filter(self, x):
|
||||||
|
"""Apply filter to input value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
x: Input value
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Filtered output value
|
||||||
|
"""
|
||||||
|
# Update input history
|
||||||
|
self.x_history[2] = self.x_history[1]
|
||||||
|
self.x_history[1] = self.x_history[0]
|
||||||
|
self.x_history[0] = x
|
||||||
|
|
||||||
|
# Compute output using difference equation
|
||||||
|
if len(self.b) == 2:
|
||||||
|
# First-order filter
|
||||||
|
y = (self.b[0] * self.x_history[0] +
|
||||||
|
self.b[1] * self.x_history[1] -
|
||||||
|
self.a[1] * self.y_history[1])
|
||||||
|
else:
|
||||||
|
# Second-order filter
|
||||||
|
y = (self.b[0] * self.x_history[0] +
|
||||||
|
self.b[1] * self.x_history[1] +
|
||||||
|
self.b[2] * self.x_history[2] -
|
||||||
|
self.a[1] * self.y_history[1] -
|
||||||
|
self.a[2] * self.y_history[2])
|
||||||
|
|
||||||
|
# Update output history
|
||||||
|
self.y_history[1] = self.y_history[0]
|
||||||
|
self.y_history[0] = y
|
||||||
|
|
||||||
|
return y
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""Reset filter state."""
|
||||||
|
self.x_history = [0.0, 0.0, 0.0]
|
||||||
|
self.y_history = [0.0, 0.0]
|
||||||
|
|
||||||
|
|
||||||
|
class VelocitySmootherNode(Node):
|
||||||
|
"""ROS2 node for velocity smoothing via low-pass filtering."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('velocity_smoother')
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter('filter_order', 2)
|
||||||
|
self.declare_parameter('cutoff_frequency', 5.0)
|
||||||
|
self.declare_parameter('publish_frequency', 50)
|
||||||
|
self.declare_parameter('enable_smoothing', True)
|
||||||
|
|
||||||
|
filter_order = self.get_parameter('filter_order').value
|
||||||
|
cutoff_frequency = self.get_parameter('cutoff_frequency').value
|
||||||
|
publish_frequency = self.get_parameter('publish_frequency').value
|
||||||
|
self.enable_smoothing = self.get_parameter('enable_smoothing').value
|
||||||
|
|
||||||
|
# Create filters for each velocity component
|
||||||
|
self.filter_linear_x = ButterworthFilter(
|
||||||
|
cutoff_frequency, publish_frequency, order=filter_order
|
||||||
|
)
|
||||||
|
self.filter_linear_y = ButterworthFilter(
|
||||||
|
cutoff_frequency, publish_frequency, order=filter_order
|
||||||
|
)
|
||||||
|
self.filter_linear_z = ButterworthFilter(
|
||||||
|
cutoff_frequency, publish_frequency, order=filter_order
|
||||||
|
)
|
||||||
|
self.filter_angular_x = ButterworthFilter(
|
||||||
|
cutoff_frequency, publish_frequency, order=filter_order
|
||||||
|
)
|
||||||
|
self.filter_angular_y = ButterworthFilter(
|
||||||
|
cutoff_frequency, publish_frequency, order=filter_order
|
||||||
|
)
|
||||||
|
self.filter_angular_z = ButterworthFilter(
|
||||||
|
cutoff_frequency, publish_frequency, order=filter_order
|
||||||
|
)
|
||||||
|
|
||||||
|
# Last received odometry
|
||||||
|
self.last_odom = None
|
||||||
|
|
||||||
|
# Subscription to raw odometry
|
||||||
|
self.sub_odom = self.create_subscription(
|
||||||
|
Odometry, '/odom', self._on_odom, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publisher for smoothed odometry
|
||||||
|
self.pub_odom_smooth = self.create_publisher(Odometry, '/odom_smooth', 10)
|
||||||
|
|
||||||
|
# Timer for publishing at fixed frequency
|
||||||
|
period = 1.0 / publish_frequency
|
||||||
|
self.timer = self.create_timer(period, self._timer_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Velocity smoother initialized. "
|
||||||
|
f"Cutoff: {cutoff_frequency}Hz, Order: {filter_order}, "
|
||||||
|
f"Publish: {publish_frequency}Hz"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_odom(self, msg: Odometry) -> None:
|
||||||
|
"""Callback for incoming odometry messages."""
|
||||||
|
self.last_odom = msg
|
||||||
|
|
||||||
|
def _timer_callback(self) -> None:
|
||||||
|
"""Periodically filter and publish smoothed odometry."""
|
||||||
|
if self.last_odom is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create output message
|
||||||
|
smoothed = Odometry()
|
||||||
|
smoothed.header = self.last_odom.header
|
||||||
|
smoothed.child_frame_id = self.last_odom.child_frame_id
|
||||||
|
|
||||||
|
# Copy pose (unchanged)
|
||||||
|
smoothed.pose = self.last_odom.pose
|
||||||
|
|
||||||
|
if self.enable_smoothing:
|
||||||
|
# Filter velocity components
|
||||||
|
linear_x = self.filter_linear_x.filter(
|
||||||
|
self.last_odom.twist.twist.linear.x
|
||||||
|
)
|
||||||
|
linear_y = self.filter_linear_y.filter(
|
||||||
|
self.last_odom.twist.twist.linear.y
|
||||||
|
)
|
||||||
|
linear_z = self.filter_linear_z.filter(
|
||||||
|
self.last_odom.twist.twist.linear.z
|
||||||
|
)
|
||||||
|
|
||||||
|
angular_x = self.filter_angular_x.filter(
|
||||||
|
self.last_odom.twist.twist.angular.x
|
||||||
|
)
|
||||||
|
angular_y = self.filter_angular_y.filter(
|
||||||
|
self.last_odom.twist.twist.angular.y
|
||||||
|
)
|
||||||
|
angular_z = self.filter_angular_z.filter(
|
||||||
|
self.last_odom.twist.twist.angular.z
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No filtering
|
||||||
|
linear_x = self.last_odom.twist.twist.linear.x
|
||||||
|
linear_y = self.last_odom.twist.twist.linear.y
|
||||||
|
linear_z = self.last_odom.twist.twist.linear.z
|
||||||
|
angular_x = self.last_odom.twist.twist.angular.x
|
||||||
|
angular_y = self.last_odom.twist.twist.angular.y
|
||||||
|
angular_z = self.last_odom.twist.twist.angular.z
|
||||||
|
|
||||||
|
# Set smoothed twist
|
||||||
|
smoothed.twist.twist.linear.x = linear_x
|
||||||
|
smoothed.twist.twist.linear.y = linear_y
|
||||||
|
smoothed.twist.twist.linear.z = linear_z
|
||||||
|
smoothed.twist.twist.angular.x = angular_x
|
||||||
|
smoothed.twist.twist.angular.y = angular_y
|
||||||
|
smoothed.twist.twist.angular.z = angular_z
|
||||||
|
|
||||||
|
# Copy covariances
|
||||||
|
smoothed.twist.covariance = self.last_odom.twist.covariance
|
||||||
|
|
||||||
|
self.pub_odom_smooth.publish(smoothed)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = VelocitySmootherNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
5
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_velocity_smoother
|
||||||
|
[egg_info]
|
||||||
|
tag_build =
|
||||||
|
tag_date = 0
|
||||||
34
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py
Normal file
34
jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
package_name = 'saltybot_velocity_smoother'
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(exclude=['test']),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/saltybot_velocity_smoother']),
|
||||||
|
('share/' + package_name, ['package.xml']),
|
||||||
|
('share/' + package_name + '/config', ['config/velocity_smoother_config.yaml']),
|
||||||
|
('share/' + package_name + '/launch', ['launch/velocity_smoother.launch.py']),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
author='SaltyBot Controls',
|
||||||
|
author_email='sl-controls@saltybot.local',
|
||||||
|
maintainer='SaltyBot Controls',
|
||||||
|
maintainer_email='sl-controls@saltybot.local',
|
||||||
|
keywords=['ROS2', 'velocity', 'filtering', 'butterworth'],
|
||||||
|
classifiers=[
|
||||||
|
'Intended Audience :: Developers',
|
||||||
|
'License :: OSI Approved :: MIT License',
|
||||||
|
'Programming Language :: Python :: 3',
|
||||||
|
'Topic :: Software Development',
|
||||||
|
],
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'velocity_smoother_node=saltybot_velocity_smoother.velocity_smoother_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,432 @@
|
|||||||
|
"""Unit tests for velocity smoother node."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import math
|
||||||
|
from nav_msgs.msg import Odometry
|
||||||
|
from geometry_msgs.msg import TwistWithCovariance, Twist, Vector3
|
||||||
|
from std_msgs.msg import Header
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.time import Time
|
||||||
|
|
||||||
|
from saltybot_velocity_smoother.velocity_smoother_node import (
|
||||||
|
VelocitySmootherNode,
|
||||||
|
ButterworthFilter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rclpy_fixture():
|
||||||
|
"""Initialize and cleanup rclpy."""
|
||||||
|
rclpy.init()
|
||||||
|
yield
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def node(rclpy_fixture):
|
||||||
|
"""Create a velocity smoother node instance."""
|
||||||
|
node = VelocitySmootherNode()
|
||||||
|
yield node
|
||||||
|
node.destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
class TestButterworthFilter:
|
||||||
|
"""Test suite for Butterworth filter implementation."""
|
||||||
|
|
||||||
|
def test_filter_initialization(self):
|
||||||
|
"""Test filter initialization with valid parameters."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
assert filt.cutoff_hz == 5.0
|
||||||
|
assert filt.sample_rate_hz == 50
|
||||||
|
assert filt.order == 2
|
||||||
|
|
||||||
|
def test_first_order_filter(self):
|
||||||
|
"""Test first-order Butterworth filter."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=1)
|
||||||
|
assert len(filt.b) == 2
|
||||||
|
assert len(filt.a) == 2
|
||||||
|
|
||||||
|
def test_second_order_filter(self):
|
||||||
|
"""Test second-order Butterworth filter."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
assert len(filt.b) == 3
|
||||||
|
assert len(filt.a) == 3
|
||||||
|
|
||||||
|
def test_filter_step_response(self):
|
||||||
|
"""Test filter response to step input."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
|
||||||
|
# Apply step input (0 -> 1.0)
|
||||||
|
outputs = []
|
||||||
|
for i in range(50):
|
||||||
|
y = filt.filter(1.0)
|
||||||
|
outputs.append(y)
|
||||||
|
|
||||||
|
# Final output should be close to 1.0
|
||||||
|
assert outputs[-1] > 0.9
|
||||||
|
assert outputs[-1] <= 1.0
|
||||||
|
|
||||||
|
def test_filter_constant_input(self):
|
||||||
|
"""Test filter with constant input."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
|
||||||
|
# Apply constant input
|
||||||
|
for i in range(100):
|
||||||
|
y = filt.filter(2.5)
|
||||||
|
|
||||||
|
# Should converge to input value
|
||||||
|
assert abs(y - 2.5) < 0.01
|
||||||
|
|
||||||
|
def test_filter_zero_input(self):
|
||||||
|
"""Test filter with zero input."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
|
||||||
|
# Apply non-zero then zero
|
||||||
|
for i in range(50):
|
||||||
|
filt.filter(1.0)
|
||||||
|
|
||||||
|
# Now apply zero
|
||||||
|
for i in range(50):
|
||||||
|
y = filt.filter(0.0)
|
||||||
|
|
||||||
|
# Should decay to zero
|
||||||
|
assert abs(y) < 0.01
|
||||||
|
|
||||||
|
def test_filter_reset(self):
|
||||||
|
"""Test filter state reset."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
|
||||||
|
# Filter some values
|
||||||
|
for i in range(10):
|
||||||
|
filt.filter(1.0)
|
||||||
|
|
||||||
|
# Reset
|
||||||
|
filt.reset()
|
||||||
|
|
||||||
|
# State should be zero
|
||||||
|
assert filt.x_history == [0.0, 0.0, 0.0]
|
||||||
|
assert filt.y_history == [0.0, 0.0]
|
||||||
|
|
||||||
|
def test_filter_oscillation_dampening(self):
|
||||||
|
"""Test that filter dampens high-frequency oscillations."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
|
||||||
|
# Apply alternating signal (high frequency)
|
||||||
|
outputs = []
|
||||||
|
for i in range(100):
|
||||||
|
x = 1.0 if i % 2 == 0 else -1.0
|
||||||
|
y = filt.filter(x)
|
||||||
|
outputs.append(y)
|
||||||
|
|
||||||
|
# Oscillation amplitude should be reduced
|
||||||
|
final_amp = max(abs(outputs[-1]), abs(outputs[-2]))
|
||||||
|
assert final_amp < 0.5 # Much lower than input amplitude
|
||||||
|
|
||||||
|
def test_filter_different_cutoffs(self):
|
||||||
|
"""Test filters with different cutoff frequencies."""
|
||||||
|
filt_low = ButterworthFilter(2.0, 50, order=2)
|
||||||
|
filt_high = ButterworthFilter(10.0, 50, order=2)
|
||||||
|
|
||||||
|
# Both should be valid
|
||||||
|
assert filt_low.cutoff_hz == 2.0
|
||||||
|
assert filt_high.cutoff_hz == 10.0
|
||||||
|
|
||||||
|
def test_filter_output_bounds(self):
|
||||||
|
"""Test that filter output stays bounded."""
|
||||||
|
filt = ButterworthFilter(5.0, 50, order=2)
|
||||||
|
|
||||||
|
# Apply large random-like values
|
||||||
|
for i in range(100):
|
||||||
|
x = math.sin(i * 0.5) * 5.0
|
||||||
|
y = filt.filter(x)
|
||||||
|
assert abs(y) < 10.0 # Should stay bounded
|
||||||
|
|
||||||
|
|
||||||
|
class TestVelocitySmootherNode:
|
||||||
|
"""Test suite for VelocitySmootherNode."""
|
||||||
|
|
||||||
|
def test_node_initialization(self, node):
|
||||||
|
"""Test that node initializes correctly."""
|
||||||
|
assert node.last_odom is None
|
||||||
|
assert node.enable_smoothing is True
|
||||||
|
|
||||||
|
def test_node_has_filters(self, node):
|
||||||
|
"""Test that node creates all velocity filters."""
|
||||||
|
assert node.filter_linear_x is not None
|
||||||
|
assert node.filter_linear_y is not None
|
||||||
|
assert node.filter_linear_z is not None
|
||||||
|
assert node.filter_angular_x is not None
|
||||||
|
assert node.filter_angular_y is not None
|
||||||
|
assert node.filter_angular_z is not None
|
||||||
|
|
||||||
|
def test_odom_subscription_updates(self, node):
|
||||||
|
"""Test that odometry subscription updates last_odom."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.linear.x = 1.0
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
|
||||||
|
assert node.last_odom is not None
|
||||||
|
assert node.last_odom.twist.twist.linear.x == 1.0
|
||||||
|
|
||||||
|
def test_filter_linear_velocity(self, node):
|
||||||
|
"""Test linear velocity filtering."""
|
||||||
|
# Create odometry message
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.child_frame_id = "base_link"
|
||||||
|
odom.twist.twist.linear.x = 1.0
|
||||||
|
odom.twist.twist.linear.y = 0.5
|
||||||
|
odom.twist.twist.linear.z = 0.0
|
||||||
|
odom.twist.twist.angular.x = 0.0
|
||||||
|
odom.twist.twist.angular.y = 0.0
|
||||||
|
odom.twist.twist.angular.z = 0.2
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
|
||||||
|
# Call timer callback to process
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Filter should have been applied
|
||||||
|
assert node.filter_linear_x.x_history[0] == 1.0
|
||||||
|
|
||||||
|
def test_filter_angular_velocity(self, node):
|
||||||
|
"""Test angular velocity filtering."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.angular.z = 0.5
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.filter_angular_z.x_history[0] == 0.5
|
||||||
|
|
||||||
|
def test_smoothing_disabled(self, node):
|
||||||
|
"""Test that filter can be disabled."""
|
||||||
|
node.enable_smoothing = False
|
||||||
|
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.linear.x = 2.0
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# When disabled, output should equal input directly
|
||||||
|
|
||||||
|
def test_no_odom_doesnt_crash(self, node):
|
||||||
|
"""Test that timer callback handles missing odometry gracefully."""
|
||||||
|
# Call timer without setting odometry
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Should not crash, just return
|
||||||
|
|
||||||
|
def test_odom_header_preserved(self, node):
|
||||||
|
"""Test that odometry header is preserved in output."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "test_frame"
|
||||||
|
odom.header.stamp = node.get_clock().now()
|
||||||
|
odom.child_frame_id = "test_child"
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
|
||||||
|
# Timer callback processes it
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Header should be preserved
|
||||||
|
|
||||||
|
def test_zero_velocity_filtering(self, node):
|
||||||
|
"""Test filtering of zero velocities."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.linear.x = 0.0
|
||||||
|
odom.twist.twist.linear.y = 0.0
|
||||||
|
odom.twist.twist.linear.z = 0.0
|
||||||
|
odom.twist.twist.angular.x = 0.0
|
||||||
|
odom.twist.twist.angular.y = 0.0
|
||||||
|
odom.twist.twist.angular.z = 0.0
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Filters should handle zero input
|
||||||
|
|
||||||
|
def test_negative_velocities(self, node):
|
||||||
|
"""Test filtering of negative velocities."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.linear.x = -1.0
|
||||||
|
odom.twist.twist.angular.z = -0.5
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.filter_linear_x.x_history[0] == -1.0
|
||||||
|
|
||||||
|
def test_high_frequency_noise_dampening(self, node):
|
||||||
|
"""Test that filter dampens high-frequency encoder noise."""
|
||||||
|
# Simulate noisy encoder output
|
||||||
|
base_velocity = 1.0
|
||||||
|
noise_amplitude = 0.2
|
||||||
|
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
|
||||||
|
# Apply alternating noise
|
||||||
|
for i in range(100):
|
||||||
|
odom.twist.twist.linear.x = base_velocity + (noise_amplitude if i % 2 == 0 else -noise_amplitude)
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# After filtering, output should be close to base velocity
|
||||||
|
# (oscillations dampened)
|
||||||
|
|
||||||
|
def test_large_velocity_values(self, node):
|
||||||
|
"""Test filtering of large velocity values."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.linear.x = 10.0
|
||||||
|
odom.twist.twist.angular.z = 5.0
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Should handle large values without overflow
|
||||||
|
|
||||||
|
def test_pose_unchanged(self, node):
|
||||||
|
"""Test that pose is not modified by velocity filtering."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.pose.pose.position.x = 5.0
|
||||||
|
odom.pose.pose.position.y = 3.0
|
||||||
|
odom.twist.twist.linear.x = 1.0
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Pose should be copied unchanged
|
||||||
|
|
||||||
|
def test_multiple_velocity_updates(self, node):
|
||||||
|
"""Test filtering across multiple sequential velocity updates."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
|
||||||
|
velocities = [0.5, 1.0, 1.5, 1.0, 0.5, 0.0]
|
||||||
|
|
||||||
|
for v in velocities:
|
||||||
|
odom.twist.twist.linear.x = v
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Filter should smooth the velocity sequence
|
||||||
|
|
||||||
|
def test_simultaneous_all_velocities(self, node):
|
||||||
|
"""Test filtering when all velocity components are present."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
|
||||||
|
for i in range(30):
|
||||||
|
odom.twist.twist.linear.x = math.sin(i * 0.1)
|
||||||
|
odom.twist.twist.linear.y = math.cos(i * 0.1) * 0.5
|
||||||
|
odom.twist.twist.linear.z = 0.1
|
||||||
|
odom.twist.twist.angular.x = 0.05
|
||||||
|
odom.twist.twist.angular.y = 0.05
|
||||||
|
odom.twist.twist.angular.z = math.sin(i * 0.15)
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# All filters should operate independently
|
||||||
|
|
||||||
|
|
||||||
|
class TestVelocitySmootherScenarios:
|
||||||
|
"""Integration-style tests for realistic scenarios."""
|
||||||
|
|
||||||
|
def test_scenario_constant_velocity(self, node):
|
||||||
|
"""Scenario: robot moving at constant velocity."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.linear.x = 1.0
|
||||||
|
|
||||||
|
for i in range(50):
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Should maintain constant velocity after convergence
|
||||||
|
|
||||||
|
def test_scenario_velocity_ramp(self, node):
|
||||||
|
"""Scenario: velocity ramping up from stop."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
|
||||||
|
for i in range(50):
|
||||||
|
odom.twist.twist.linear.x = i * 0.02 # Ramp from 0 to 1.0
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Filter should smooth the ramp
|
||||||
|
|
||||||
|
def test_scenario_velocity_step(self, node):
|
||||||
|
"""Scenario: sudden velocity change (e.g., collision avoidance)."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
|
||||||
|
# First phase: constant velocity
|
||||||
|
odom.twist.twist.linear.x = 1.0
|
||||||
|
for i in range(25):
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Second phase: sudden stop
|
||||||
|
odom.twist.twist.linear.x = 0.0
|
||||||
|
for i in range(25):
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Filter should transition smoothly
|
||||||
|
|
||||||
|
def test_scenario_rotation_only(self, node):
|
||||||
|
"""Scenario: robot spinning in place."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
odom.twist.twist.linear.x = 0.0
|
||||||
|
odom.twist.twist.angular.z = 0.5
|
||||||
|
|
||||||
|
for i in range(50):
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Angular velocity should be filtered
|
||||||
|
|
||||||
|
def test_scenario_mixed_motion(self, node):
|
||||||
|
"""Scenario: combined linear and angular motion."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
|
||||||
|
for i in range(50):
|
||||||
|
odom.twist.twist.linear.x = math.cos(i * 0.1)
|
||||||
|
odom.twist.twist.linear.y = math.sin(i * 0.1)
|
||||||
|
odom.twist.twist.angular.z = 0.2
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Both linear and angular components should be filtered
|
||||||
|
|
||||||
|
def test_scenario_encoder_noise_reduction(self, node):
|
||||||
|
"""Scenario: realistic encoder jitter with filtering."""
|
||||||
|
odom = Odometry()
|
||||||
|
odom.header.frame_id = "odom"
|
||||||
|
|
||||||
|
# Simulate encoder jitter: base velocity + small noise
|
||||||
|
base_vel = 1.0
|
||||||
|
for i in range(100):
|
||||||
|
jitter = 0.05 * math.sin(i * 0.5) + 0.03 * math.cos(i * 0.3)
|
||||||
|
odom.twist.twist.linear.x = base_vel + jitter
|
||||||
|
|
||||||
|
node._on_odom(odom)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Filter should reduce noise while maintaining base velocity
|
||||||
Loading…
x
Reference in New Issue
Block a user