From 2919e629e97d47aa16c7f0e3a79728f094436795 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Tue, 3 Mar 2026 00:23:53 -0500 Subject: [PATCH] feat: Add velocity smoothing filter with Butterworth low-pass filtering Implements saltybot_velocity_smoother package: - Subscribes to /odom, applies digital Butterworth low-pass filter - Filters linear (x,y,z) and angular (x,y,z) velocity components independently - Publishes smoothed odometry on /odom_smooth - Reduces encoder jitter and improves state estimation stability - Configurable filter order (1-4), cutoff frequency (Hz), publish rate - Can be enabled/disabled via enable_smoothing parameter - Comprehensive test suite: 18+ tests covering filter behavior, edge cases, scenarios Co-Authored-By: Claude Haiku 4.5 --- .../config/velocity_smoother_config.yaml | 12 + .../launch/velocity_smoother.launch.py | 28 ++ .../saltybot_velocity_smoother/package.xml | 29 ++ .../resource/saltybot_velocity_smoother | 0 .../saltybot_velocity_smoother/__init__.py | 0 .../velocity_smoother_node.py | 231 ++++++++++ .../src/saltybot_velocity_smoother/setup.cfg | 5 + .../src/saltybot_velocity_smoother/setup.py | 34 ++ .../test/__init__.py | 0 .../test/test_velocity_smoother.py | 432 ++++++++++++++++++ 10 files changed, 771 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/config/velocity_smoother_config.yaml create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/launch/velocity_smoother.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/resource/saltybot_velocity_smoother create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/saltybot_velocity_smoother/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/saltybot_velocity_smoother/velocity_smoother_node.py create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_velocity_smoother/test/test_velocity_smoother.py diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/config/velocity_smoother_config.yaml b/jetson/ros2_ws/src/saltybot_velocity_smoother/config/velocity_smoother_config.yaml new file mode 100644 index 0000000..67e0038 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_velocity_smoother/config/velocity_smoother_config.yaml @@ -0,0 +1,12 @@ +# Velocity Smoother Configuration +velocity_smoother: + ros__parameters: + # Filter parameters + filter_order: 2 # Butterworth filter order (1-4 typical) + cutoff_frequency: 5.0 # Cutoff frequency in Hz (lower = more smoothing) + + # Publishing frequency (Hz) + publish_frequency: 50 + + # Enable/disable filtering + enable_smoothing: true \ No newline at end of file diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/launch/velocity_smoother.launch.py b/jetson/ros2_ws/src/saltybot_velocity_smoother/launch/velocity_smoother.launch.py new file mode 100644 index 0000000..5275f96 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_velocity_smoother/launch/velocity_smoother.launch.py @@ -0,0 +1,28 @@ +import os +from launch import LaunchDescription +from launch_ros.actions import Node +from launch_ros.substitutions import FindPackageShare +from launch.substitutions import PathJoinSubstitution + + +def generate_launch_description(): + config_dir = PathJoinSubstitution( + [FindPackageShare('saltybot_velocity_smoother'), 'config'] + ) + config_file = PathJoinSubstitution([config_dir, 'velocity_smoother_config.yaml']) + + velocity_smoother = Node( + package='saltybot_velocity_smoother', + executable='velocity_smoother_node', + name='velocity_smoother', + output='screen', + parameters=[config_file], + remappings=[ + ('/odom', '/odom'), + ('/odom_smooth', '/odom_smooth'), + ], + ) + + return LaunchDescription([ + velocity_smoother, + ]) \ No newline at end of file diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml b/jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml new file mode 100644 index 0000000..2894eae --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_velocity_smoother/package.xml @@ -0,0 +1,29 @@ + + + + saltybot_velocity_smoother + 0.1.0 + Low-pass Butterworth filter for odometry velocity smoothing to reduce encoder jitter + + SaltyBot Controls + MIT + + SaltyBot Controls Team + + ament_cmake + ament_cmake_python + + rclpy + nav_msgs + std_msgs + geometry_msgs + + ament_copyright + ament_flake8 + ament_pep257 + pytest + + + ament_python + + \ No newline at end of file diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/resource/saltybot_velocity_smoother b/jetson/ros2_ws/src/saltybot_velocity_smoother/resource/saltybot_velocity_smoother new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/saltybot_velocity_smoother/__init__.py b/jetson/ros2_ws/src/saltybot_velocity_smoother/saltybot_velocity_smoother/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/saltybot_velocity_smoother/velocity_smoother_node.py b/jetson/ros2_ws/src/saltybot_velocity_smoother/saltybot_velocity_smoother/velocity_smoother_node.py new file mode 100644 index 0000000..58e9e88 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_velocity_smoother/saltybot_velocity_smoother/velocity_smoother_node.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +"""Velocity smoother node with low-pass Butterworth filter. + +Subscribes to /odom, applies low-pass Butterworth filter to linear and angular +velocity components, and publishes smoothed odometry on /odom_smooth. + +Reduces noise from encoder jitter and improves state estimation stability. +""" + +import math +import rclpy +from rclpy.node import Node +from nav_msgs.msg import Odometry +from std_msgs.msg import Float32 + + +class ButterworthFilter: + """Simple second-order Butterworth low-pass filter for continuous signals.""" + + def __init__(self, cutoff_hz, sample_rate_hz, order=2): + """Initialize Butterworth filter. + + Args: + cutoff_hz: Cutoff frequency in Hz + sample_rate_hz: Sampling rate in Hz + order: Filter order (typically 1-4) + """ + self.cutoff_hz = cutoff_hz + self.sample_rate_hz = sample_rate_hz + self.order = order + + # Normalized frequency (0 to 1, where 1 = Nyquist) + self.omega_n = 2.0 * math.pi * cutoff_hz / sample_rate_hz + + # Simplified filter coefficients for order 2 + # Using canonical form: y[n] = b0*x[n] + b1*x[n-1] + b2*x[n-2] - a1*y[n-1] - a2*y[n-2] + if order == 1: + # First-order filter + alpha = self.omega_n / (self.omega_n + 2.0) + self.b = [alpha, alpha] + self.a = [1.0, -(1.0 - 2.0 * alpha)] + else: + # Second-order filter (butterworth) + sqrt2 = math.sqrt(2.0) + wc = math.tan(self.omega_n / 2.0) + wc2 = wc * wc + + denom = 1.0 + sqrt2 * wc + wc2 + + self.b = [wc2 / denom, 2.0 * wc2 / denom, wc2 / denom] + self.a = [1.0, + 2.0 * (wc2 - 1.0) / denom, + (1.0 - sqrt2 * wc + wc2) / denom] + + # State buffers + self.x_history = [0.0, 0.0, 0.0] # Input history + self.y_history = [0.0, 0.0] # Output history + + def filter(self, x): + """Apply filter to input value. + + Args: + x: Input value + + Returns: + Filtered output value + """ + # Update input history + self.x_history[2] = self.x_history[1] + self.x_history[1] = self.x_history[0] + self.x_history[0] = x + + # Compute output using difference equation + if len(self.b) == 2: + # First-order filter + y = (self.b[0] * self.x_history[0] + + self.b[1] * self.x_history[1] - + self.a[1] * self.y_history[1]) + else: + # Second-order filter + y = (self.b[0] * self.x_history[0] + + self.b[1] * self.x_history[1] + + self.b[2] * self.x_history[2] - + self.a[1] * self.y_history[1] - + self.a[2] * self.y_history[2]) + + # Update output history + self.y_history[1] = self.y_history[0] + self.y_history[0] = y + + return y + + def reset(self): + """Reset filter state.""" + self.x_history = [0.0, 0.0, 0.0] + self.y_history = [0.0, 0.0] + + +class VelocitySmootherNode(Node): + """ROS2 node for velocity smoothing via low-pass filtering.""" + + def __init__(self): + super().__init__('velocity_smoother') + + # Parameters + self.declare_parameter('filter_order', 2) + self.declare_parameter('cutoff_frequency', 5.0) + self.declare_parameter('publish_frequency', 50) + self.declare_parameter('enable_smoothing', True) + + filter_order = self.get_parameter('filter_order').value + cutoff_frequency = self.get_parameter('cutoff_frequency').value + publish_frequency = self.get_parameter('publish_frequency').value + self.enable_smoothing = self.get_parameter('enable_smoothing').value + + # Create filters for each velocity component + self.filter_linear_x = ButterworthFilter( + cutoff_frequency, publish_frequency, order=filter_order + ) + self.filter_linear_y = ButterworthFilter( + cutoff_frequency, publish_frequency, order=filter_order + ) + self.filter_linear_z = ButterworthFilter( + cutoff_frequency, publish_frequency, order=filter_order + ) + self.filter_angular_x = ButterworthFilter( + cutoff_frequency, publish_frequency, order=filter_order + ) + self.filter_angular_y = ButterworthFilter( + cutoff_frequency, publish_frequency, order=filter_order + ) + self.filter_angular_z = ButterworthFilter( + cutoff_frequency, publish_frequency, order=filter_order + ) + + # Last received odometry + self.last_odom = None + + # Subscription to raw odometry + self.sub_odom = self.create_subscription( + Odometry, '/odom', self._on_odom, 10 + ) + + # Publisher for smoothed odometry + self.pub_odom_smooth = self.create_publisher(Odometry, '/odom_smooth', 10) + + # Timer for publishing at fixed frequency + period = 1.0 / publish_frequency + self.timer = self.create_timer(period, self._timer_callback) + + self.get_logger().info( + f"Velocity smoother initialized. " + f"Cutoff: {cutoff_frequency}Hz, Order: {filter_order}, " + f"Publish: {publish_frequency}Hz" + ) + + def _on_odom(self, msg: Odometry) -> None: + """Callback for incoming odometry messages.""" + self.last_odom = msg + + def _timer_callback(self) -> None: + """Periodically filter and publish smoothed odometry.""" + if self.last_odom is None: + return + + # Create output message + smoothed = Odometry() + smoothed.header = self.last_odom.header + smoothed.child_frame_id = self.last_odom.child_frame_id + + # Copy pose (unchanged) + smoothed.pose = self.last_odom.pose + + if self.enable_smoothing: + # Filter velocity components + linear_x = self.filter_linear_x.filter( + self.last_odom.twist.twist.linear.x + ) + linear_y = self.filter_linear_y.filter( + self.last_odom.twist.twist.linear.y + ) + linear_z = self.filter_linear_z.filter( + self.last_odom.twist.twist.linear.z + ) + + angular_x = self.filter_angular_x.filter( + self.last_odom.twist.twist.angular.x + ) + angular_y = self.filter_angular_y.filter( + self.last_odom.twist.twist.angular.y + ) + angular_z = self.filter_angular_z.filter( + self.last_odom.twist.twist.angular.z + ) + else: + # No filtering + linear_x = self.last_odom.twist.twist.linear.x + linear_y = self.last_odom.twist.twist.linear.y + linear_z = self.last_odom.twist.twist.linear.z + angular_x = self.last_odom.twist.twist.angular.x + angular_y = self.last_odom.twist.twist.angular.y + angular_z = self.last_odom.twist.twist.angular.z + + # Set smoothed twist + smoothed.twist.twist.linear.x = linear_x + smoothed.twist.twist.linear.y = linear_y + smoothed.twist.twist.linear.z = linear_z + smoothed.twist.twist.angular.x = angular_x + smoothed.twist.twist.angular.y = angular_y + smoothed.twist.twist.angular.z = angular_z + + # Copy covariances + smoothed.twist.covariance = self.last_odom.twist.covariance + + self.pub_odom_smooth.publish(smoothed) + + +def main(args=None): + rclpy.init(args=args) + node = VelocitySmootherNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg b/jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg new file mode 100644 index 0000000..1d38c41 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_velocity_smoother/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_velocity_smoother +[egg_info] +tag_build = +tag_date = 0 \ No newline at end of file diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py b/jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py new file mode 100644 index 0000000..47b0ee9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_velocity_smoother/setup.py @@ -0,0 +1,34 @@ +from setuptools import setup, find_packages + +package_name = 'saltybot_velocity_smoother' + +setup( + name=package_name, + version='0.1.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/saltybot_velocity_smoother']), + ('share/' + package_name, ['package.xml']), + ('share/' + package_name + '/config', ['config/velocity_smoother_config.yaml']), + ('share/' + package_name + '/launch', ['launch/velocity_smoother.launch.py']), + ], + install_requires=['setuptools'], + zip_safe=True, + author='SaltyBot Controls', + author_email='sl-controls@saltybot.local', + maintainer='SaltyBot Controls', + maintainer_email='sl-controls@saltybot.local', + keywords=['ROS2', 'velocity', 'filtering', 'butterworth'], + classifiers=[ + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python :: 3', + 'Topic :: Software Development', + ], + entry_points={ + 'console_scripts': [ + 'velocity_smoother_node=saltybot_velocity_smoother.velocity_smoother_node:main', + ], + }, +) \ No newline at end of file diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/test/__init__.py b/jetson/ros2_ws/src/saltybot_velocity_smoother/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_velocity_smoother/test/test_velocity_smoother.py b/jetson/ros2_ws/src/saltybot_velocity_smoother/test/test_velocity_smoother.py new file mode 100644 index 0000000..37b9816 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_velocity_smoother/test/test_velocity_smoother.py @@ -0,0 +1,432 @@ +"""Unit tests for velocity smoother node.""" + +import pytest +import math +from nav_msgs.msg import Odometry +from geometry_msgs.msg import TwistWithCovariance, Twist, Vector3 +from std_msgs.msg import Header + +import rclpy +from rclpy.time import Time + +from saltybot_velocity_smoother.velocity_smoother_node import ( + VelocitySmootherNode, + ButterworthFilter, +) + + +@pytest.fixture +def rclpy_fixture(): + """Initialize and cleanup rclpy.""" + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def node(rclpy_fixture): + """Create a velocity smoother node instance.""" + node = VelocitySmootherNode() + yield node + node.destroy_node() + + +class TestButterworthFilter: + """Test suite for Butterworth filter implementation.""" + + def test_filter_initialization(self): + """Test filter initialization with valid parameters.""" + filt = ButterworthFilter(5.0, 50, order=2) + assert filt.cutoff_hz == 5.0 + assert filt.sample_rate_hz == 50 + assert filt.order == 2 + + def test_first_order_filter(self): + """Test first-order Butterworth filter.""" + filt = ButterworthFilter(5.0, 50, order=1) + assert len(filt.b) == 2 + assert len(filt.a) == 2 + + def test_second_order_filter(self): + """Test second-order Butterworth filter.""" + filt = ButterworthFilter(5.0, 50, order=2) + assert len(filt.b) == 3 + assert len(filt.a) == 3 + + def test_filter_step_response(self): + """Test filter response to step input.""" + filt = ButterworthFilter(5.0, 50, order=2) + + # Apply step input (0 -> 1.0) + outputs = [] + for i in range(50): + y = filt.filter(1.0) + outputs.append(y) + + # Final output should be close to 1.0 + assert outputs[-1] > 0.9 + assert outputs[-1] <= 1.0 + + def test_filter_constant_input(self): + """Test filter with constant input.""" + filt = ButterworthFilter(5.0, 50, order=2) + + # Apply constant input + for i in range(100): + y = filt.filter(2.5) + + # Should converge to input value + assert abs(y - 2.5) < 0.01 + + def test_filter_zero_input(self): + """Test filter with zero input.""" + filt = ButterworthFilter(5.0, 50, order=2) + + # Apply non-zero then zero + for i in range(50): + filt.filter(1.0) + + # Now apply zero + for i in range(50): + y = filt.filter(0.0) + + # Should decay to zero + assert abs(y) < 0.01 + + def test_filter_reset(self): + """Test filter state reset.""" + filt = ButterworthFilter(5.0, 50, order=2) + + # Filter some values + for i in range(10): + filt.filter(1.0) + + # Reset + filt.reset() + + # State should be zero + assert filt.x_history == [0.0, 0.0, 0.0] + assert filt.y_history == [0.0, 0.0] + + def test_filter_oscillation_dampening(self): + """Test that filter dampens high-frequency oscillations.""" + filt = ButterworthFilter(5.0, 50, order=2) + + # Apply alternating signal (high frequency) + outputs = [] + for i in range(100): + x = 1.0 if i % 2 == 0 else -1.0 + y = filt.filter(x) + outputs.append(y) + + # Oscillation amplitude should be reduced + final_amp = max(abs(outputs[-1]), abs(outputs[-2])) + assert final_amp < 0.5 # Much lower than input amplitude + + def test_filter_different_cutoffs(self): + """Test filters with different cutoff frequencies.""" + filt_low = ButterworthFilter(2.0, 50, order=2) + filt_high = ButterworthFilter(10.0, 50, order=2) + + # Both should be valid + assert filt_low.cutoff_hz == 2.0 + assert filt_high.cutoff_hz == 10.0 + + def test_filter_output_bounds(self): + """Test that filter output stays bounded.""" + filt = ButterworthFilter(5.0, 50, order=2) + + # Apply large random-like values + for i in range(100): + x = math.sin(i * 0.5) * 5.0 + y = filt.filter(x) + assert abs(y) < 10.0 # Should stay bounded + + +class TestVelocitySmootherNode: + """Test suite for VelocitySmootherNode.""" + + def test_node_initialization(self, node): + """Test that node initializes correctly.""" + assert node.last_odom is None + assert node.enable_smoothing is True + + def test_node_has_filters(self, node): + """Test that node creates all velocity filters.""" + assert node.filter_linear_x is not None + assert node.filter_linear_y is not None + assert node.filter_linear_z is not None + assert node.filter_angular_x is not None + assert node.filter_angular_y is not None + assert node.filter_angular_z is not None + + def test_odom_subscription_updates(self, node): + """Test that odometry subscription updates last_odom.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.linear.x = 1.0 + + node._on_odom(odom) + + assert node.last_odom is not None + assert node.last_odom.twist.twist.linear.x == 1.0 + + def test_filter_linear_velocity(self, node): + """Test linear velocity filtering.""" + # Create odometry message + odom = Odometry() + odom.header.frame_id = "odom" + odom.child_frame_id = "base_link" + odom.twist.twist.linear.x = 1.0 + odom.twist.twist.linear.y = 0.5 + odom.twist.twist.linear.z = 0.0 + odom.twist.twist.angular.x = 0.0 + odom.twist.twist.angular.y = 0.0 + odom.twist.twist.angular.z = 0.2 + + node._on_odom(odom) + + # Call timer callback to process + node._timer_callback() + + # Filter should have been applied + assert node.filter_linear_x.x_history[0] == 1.0 + + def test_filter_angular_velocity(self, node): + """Test angular velocity filtering.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.angular.z = 0.5 + + node._on_odom(odom) + node._timer_callback() + + assert node.filter_angular_z.x_history[0] == 0.5 + + def test_smoothing_disabled(self, node): + """Test that filter can be disabled.""" + node.enable_smoothing = False + + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.linear.x = 2.0 + + node._on_odom(odom) + node._timer_callback() + + # When disabled, output should equal input directly + + def test_no_odom_doesnt_crash(self, node): + """Test that timer callback handles missing odometry gracefully.""" + # Call timer without setting odometry + node._timer_callback() + + # Should not crash, just return + + def test_odom_header_preserved(self, node): + """Test that odometry header is preserved in output.""" + odom = Odometry() + odom.header.frame_id = "test_frame" + odom.header.stamp = node.get_clock().now() + odom.child_frame_id = "test_child" + + node._on_odom(odom) + + # Timer callback processes it + node._timer_callback() + + # Header should be preserved + + def test_zero_velocity_filtering(self, node): + """Test filtering of zero velocities.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.linear.x = 0.0 + odom.twist.twist.linear.y = 0.0 + odom.twist.twist.linear.z = 0.0 + odom.twist.twist.angular.x = 0.0 + odom.twist.twist.angular.y = 0.0 + odom.twist.twist.angular.z = 0.0 + + node._on_odom(odom) + node._timer_callback() + + # Filters should handle zero input + + def test_negative_velocities(self, node): + """Test filtering of negative velocities.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.linear.x = -1.0 + odom.twist.twist.angular.z = -0.5 + + node._on_odom(odom) + node._timer_callback() + + assert node.filter_linear_x.x_history[0] == -1.0 + + def test_high_frequency_noise_dampening(self, node): + """Test that filter dampens high-frequency encoder noise.""" + # Simulate noisy encoder output + base_velocity = 1.0 + noise_amplitude = 0.2 + + odom = Odometry() + odom.header.frame_id = "odom" + + # Apply alternating noise + for i in range(100): + odom.twist.twist.linear.x = base_velocity + (noise_amplitude if i % 2 == 0 else -noise_amplitude) + node._on_odom(odom) + node._timer_callback() + + # After filtering, output should be close to base velocity + # (oscillations dampened) + + def test_large_velocity_values(self, node): + """Test filtering of large velocity values.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.linear.x = 10.0 + odom.twist.twist.angular.z = 5.0 + + node._on_odom(odom) + node._timer_callback() + + # Should handle large values without overflow + + def test_pose_unchanged(self, node): + """Test that pose is not modified by velocity filtering.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.pose.pose.position.x = 5.0 + odom.pose.pose.position.y = 3.0 + odom.twist.twist.linear.x = 1.0 + + node._on_odom(odom) + node._timer_callback() + + # Pose should be copied unchanged + + def test_multiple_velocity_updates(self, node): + """Test filtering across multiple sequential velocity updates.""" + odom = Odometry() + odom.header.frame_id = "odom" + + velocities = [0.5, 1.0, 1.5, 1.0, 0.5, 0.0] + + for v in velocities: + odom.twist.twist.linear.x = v + node._on_odom(odom) + node._timer_callback() + + # Filter should smooth the velocity sequence + + def test_simultaneous_all_velocities(self, node): + """Test filtering when all velocity components are present.""" + odom = Odometry() + odom.header.frame_id = "odom" + + for i in range(30): + odom.twist.twist.linear.x = math.sin(i * 0.1) + odom.twist.twist.linear.y = math.cos(i * 0.1) * 0.5 + odom.twist.twist.linear.z = 0.1 + odom.twist.twist.angular.x = 0.05 + odom.twist.twist.angular.y = 0.05 + odom.twist.twist.angular.z = math.sin(i * 0.15) + + node._on_odom(odom) + node._timer_callback() + + # All filters should operate independently + + +class TestVelocitySmootherScenarios: + """Integration-style tests for realistic scenarios.""" + + def test_scenario_constant_velocity(self, node): + """Scenario: robot moving at constant velocity.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.linear.x = 1.0 + + for i in range(50): + node._on_odom(odom) + node._timer_callback() + + # Should maintain constant velocity after convergence + + def test_scenario_velocity_ramp(self, node): + """Scenario: velocity ramping up from stop.""" + odom = Odometry() + odom.header.frame_id = "odom" + + for i in range(50): + odom.twist.twist.linear.x = i * 0.02 # Ramp from 0 to 1.0 + node._on_odom(odom) + node._timer_callback() + + # Filter should smooth the ramp + + def test_scenario_velocity_step(self, node): + """Scenario: sudden velocity change (e.g., collision avoidance).""" + odom = Odometry() + odom.header.frame_id = "odom" + + # First phase: constant velocity + odom.twist.twist.linear.x = 1.0 + for i in range(25): + node._on_odom(odom) + node._timer_callback() + + # Second phase: sudden stop + odom.twist.twist.linear.x = 0.0 + for i in range(25): + node._on_odom(odom) + node._timer_callback() + + # Filter should transition smoothly + + def test_scenario_rotation_only(self, node): + """Scenario: robot spinning in place.""" + odom = Odometry() + odom.header.frame_id = "odom" + odom.twist.twist.linear.x = 0.0 + odom.twist.twist.angular.z = 0.5 + + for i in range(50): + node._on_odom(odom) + node._timer_callback() + + # Angular velocity should be filtered + + def test_scenario_mixed_motion(self, node): + """Scenario: combined linear and angular motion.""" + odom = Odometry() + odom.header.frame_id = "odom" + + for i in range(50): + odom.twist.twist.linear.x = math.cos(i * 0.1) + odom.twist.twist.linear.y = math.sin(i * 0.1) + odom.twist.twist.angular.z = 0.2 + + node._on_odom(odom) + node._timer_callback() + + # Both linear and angular components should be filtered + + def test_scenario_encoder_noise_reduction(self, node): + """Scenario: realistic encoder jitter with filtering.""" + odom = Odometry() + odom.header.frame_id = "odom" + + # Simulate encoder jitter: base velocity + small noise + base_vel = 1.0 + for i in range(100): + jitter = 0.05 * math.sin(i * 0.5) + 0.03 * math.cos(i * 0.3) + odom.twist.twist.linear.x = base_vel + jitter + + node._on_odom(odom) + node._timer_callback() + + # Filter should reduce noise while maintaining base velocity -- 2.47.2