feat: Add velocity smoothing filter ROS2 node #316

Merged
sl-jetson merged 1 commits from sl-controls/velocity-smooth-filter into main 2026-03-03 00:41:17 -05:00
10 changed files with 771 additions and 0 deletions

View File

@ -0,0 +1,12 @@
# Velocity Smoother Configuration
velocity_smoother:
ros__parameters:
# Filter parameters
filter_order: 2 # Butterworth filter order (1-4 typical)
cutoff_frequency: 5.0 # Cutoff frequency in Hz (lower = more smoothing)
# Publishing frequency (Hz)
publish_frequency: 50
# Enable/disable filtering
enable_smoothing: true

View File

@ -0,0 +1,28 @@
import os
from launch import LaunchDescription
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch.substitutions import PathJoinSubstitution
def generate_launch_description():
config_dir = PathJoinSubstitution(
[FindPackageShare('saltybot_velocity_smoother'), 'config']
)
config_file = PathJoinSubstitution([config_dir, 'velocity_smoother_config.yaml'])
velocity_smoother = Node(
package='saltybot_velocity_smoother',
executable='velocity_smoother_node',
name='velocity_smoother',
output='screen',
parameters=[config_file],
remappings=[
('/odom', '/odom'),
('/odom_smooth', '/odom_smooth'),
],
)
return LaunchDescription([
velocity_smoother,
])

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_velocity_smoother</name>
<version>0.1.0</version>
<description>Low-pass Butterworth filter for odometry velocity smoothing to reduce encoder jitter</description>
<maintainer email="sl-controls@saltybot.local">SaltyBot Controls</maintainer>
<license>MIT</license>
<author email="sl-controls@saltybot.local">SaltyBot Controls Team</author>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>ament_cmake_python</buildtool_depend>
<depend>rclpy</depend>
<depend>nav_msgs</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""Velocity smoother node with low-pass Butterworth filter.
Subscribes to /odom, applies low-pass Butterworth filter to linear and angular
velocity components, and publishes smoothed odometry on /odom_smooth.
Reduces noise from encoder jitter and improves state estimation stability.
"""
import math
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from std_msgs.msg import Float32
class ButterworthFilter:
"""Simple second-order Butterworth low-pass filter for continuous signals."""
def __init__(self, cutoff_hz, sample_rate_hz, order=2):
"""Initialize Butterworth filter.
Args:
cutoff_hz: Cutoff frequency in Hz
sample_rate_hz: Sampling rate in Hz
order: Filter order (typically 1-4)
"""
self.cutoff_hz = cutoff_hz
self.sample_rate_hz = sample_rate_hz
self.order = order
# Normalized frequency (0 to 1, where 1 = Nyquist)
self.omega_n = 2.0 * math.pi * cutoff_hz / sample_rate_hz
# Simplified filter coefficients for order 2
# Using canonical form: y[n] = b0*x[n] + b1*x[n-1] + b2*x[n-2] - a1*y[n-1] - a2*y[n-2]
if order == 1:
# First-order filter
alpha = self.omega_n / (self.omega_n + 2.0)
self.b = [alpha, alpha]
self.a = [1.0, -(1.0 - 2.0 * alpha)]
else:
# Second-order filter (butterworth)
sqrt2 = math.sqrt(2.0)
wc = math.tan(self.omega_n / 2.0)
wc2 = wc * wc
denom = 1.0 + sqrt2 * wc + wc2
self.b = [wc2 / denom, 2.0 * wc2 / denom, wc2 / denom]
self.a = [1.0,
2.0 * (wc2 - 1.0) / denom,
(1.0 - sqrt2 * wc + wc2) / denom]
# State buffers
self.x_history = [0.0, 0.0, 0.0] # Input history
self.y_history = [0.0, 0.0] # Output history
def filter(self, x):
"""Apply filter to input value.
Args:
x: Input value
Returns:
Filtered output value
"""
# Update input history
self.x_history[2] = self.x_history[1]
self.x_history[1] = self.x_history[0]
self.x_history[0] = x
# Compute output using difference equation
if len(self.b) == 2:
# First-order filter
y = (self.b[0] * self.x_history[0] +
self.b[1] * self.x_history[1] -
self.a[1] * self.y_history[1])
else:
# Second-order filter
y = (self.b[0] * self.x_history[0] +
self.b[1] * self.x_history[1] +
self.b[2] * self.x_history[2] -
self.a[1] * self.y_history[1] -
self.a[2] * self.y_history[2])
# Update output history
self.y_history[1] = self.y_history[0]
self.y_history[0] = y
return y
def reset(self):
"""Reset filter state."""
self.x_history = [0.0, 0.0, 0.0]
self.y_history = [0.0, 0.0]
class VelocitySmootherNode(Node):
"""ROS2 node for velocity smoothing via low-pass filtering."""
def __init__(self):
super().__init__('velocity_smoother')
# Parameters
self.declare_parameter('filter_order', 2)
self.declare_parameter('cutoff_frequency', 5.0)
self.declare_parameter('publish_frequency', 50)
self.declare_parameter('enable_smoothing', True)
filter_order = self.get_parameter('filter_order').value
cutoff_frequency = self.get_parameter('cutoff_frequency').value
publish_frequency = self.get_parameter('publish_frequency').value
self.enable_smoothing = self.get_parameter('enable_smoothing').value
# Create filters for each velocity component
self.filter_linear_x = ButterworthFilter(
cutoff_frequency, publish_frequency, order=filter_order
)
self.filter_linear_y = ButterworthFilter(
cutoff_frequency, publish_frequency, order=filter_order
)
self.filter_linear_z = ButterworthFilter(
cutoff_frequency, publish_frequency, order=filter_order
)
self.filter_angular_x = ButterworthFilter(
cutoff_frequency, publish_frequency, order=filter_order
)
self.filter_angular_y = ButterworthFilter(
cutoff_frequency, publish_frequency, order=filter_order
)
self.filter_angular_z = ButterworthFilter(
cutoff_frequency, publish_frequency, order=filter_order
)
# Last received odometry
self.last_odom = None
# Subscription to raw odometry
self.sub_odom = self.create_subscription(
Odometry, '/odom', self._on_odom, 10
)
# Publisher for smoothed odometry
self.pub_odom_smooth = self.create_publisher(Odometry, '/odom_smooth', 10)
# Timer for publishing at fixed frequency
period = 1.0 / publish_frequency
self.timer = self.create_timer(period, self._timer_callback)
self.get_logger().info(
f"Velocity smoother initialized. "
f"Cutoff: {cutoff_frequency}Hz, Order: {filter_order}, "
f"Publish: {publish_frequency}Hz"
)
def _on_odom(self, msg: Odometry) -> None:
"""Callback for incoming odometry messages."""
self.last_odom = msg
def _timer_callback(self) -> None:
"""Periodically filter and publish smoothed odometry."""
if self.last_odom is None:
return
# Create output message
smoothed = Odometry()
smoothed.header = self.last_odom.header
smoothed.child_frame_id = self.last_odom.child_frame_id
# Copy pose (unchanged)
smoothed.pose = self.last_odom.pose
if self.enable_smoothing:
# Filter velocity components
linear_x = self.filter_linear_x.filter(
self.last_odom.twist.twist.linear.x
)
linear_y = self.filter_linear_y.filter(
self.last_odom.twist.twist.linear.y
)
linear_z = self.filter_linear_z.filter(
self.last_odom.twist.twist.linear.z
)
angular_x = self.filter_angular_x.filter(
self.last_odom.twist.twist.angular.x
)
angular_y = self.filter_angular_y.filter(
self.last_odom.twist.twist.angular.y
)
angular_z = self.filter_angular_z.filter(
self.last_odom.twist.twist.angular.z
)
else:
# No filtering
linear_x = self.last_odom.twist.twist.linear.x
linear_y = self.last_odom.twist.twist.linear.y
linear_z = self.last_odom.twist.twist.linear.z
angular_x = self.last_odom.twist.twist.angular.x
angular_y = self.last_odom.twist.twist.angular.y
angular_z = self.last_odom.twist.twist.angular.z
# Set smoothed twist
smoothed.twist.twist.linear.x = linear_x
smoothed.twist.twist.linear.y = linear_y
smoothed.twist.twist.linear.z = linear_z
smoothed.twist.twist.angular.x = angular_x
smoothed.twist.twist.angular.y = angular_y
smoothed.twist.twist.angular.z = angular_z
# Copy covariances
smoothed.twist.covariance = self.last_odom.twist.covariance
self.pub_odom_smooth.publish(smoothed)
def main(args=None):
rclpy.init(args=args)
node = VelocitySmootherNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_velocity_smoother
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,34 @@
from setuptools import setup, find_packages
package_name = 'saltybot_velocity_smoother'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/saltybot_velocity_smoother']),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/config', ['config/velocity_smoother_config.yaml']),
('share/' + package_name + '/launch', ['launch/velocity_smoother.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyBot Controls',
author_email='sl-controls@saltybot.local',
maintainer='SaltyBot Controls',
maintainer_email='sl-controls@saltybot.local',
keywords=['ROS2', 'velocity', 'filtering', 'butterworth'],
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3',
'Topic :: Software Development',
],
entry_points={
'console_scripts': [
'velocity_smoother_node=saltybot_velocity_smoother.velocity_smoother_node:main',
],
},
)

View File

@ -0,0 +1,432 @@
"""Unit tests for velocity smoother node."""
import pytest
import math
from nav_msgs.msg import Odometry
from geometry_msgs.msg import TwistWithCovariance, Twist, Vector3
from std_msgs.msg import Header
import rclpy
from rclpy.time import Time
from saltybot_velocity_smoother.velocity_smoother_node import (
VelocitySmootherNode,
ButterworthFilter,
)
@pytest.fixture
def rclpy_fixture():
"""Initialize and cleanup rclpy."""
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
"""Create a velocity smoother node instance."""
node = VelocitySmootherNode()
yield node
node.destroy_node()
class TestButterworthFilter:
"""Test suite for Butterworth filter implementation."""
def test_filter_initialization(self):
"""Test filter initialization with valid parameters."""
filt = ButterworthFilter(5.0, 50, order=2)
assert filt.cutoff_hz == 5.0
assert filt.sample_rate_hz == 50
assert filt.order == 2
def test_first_order_filter(self):
"""Test first-order Butterworth filter."""
filt = ButterworthFilter(5.0, 50, order=1)
assert len(filt.b) == 2
assert len(filt.a) == 2
def test_second_order_filter(self):
"""Test second-order Butterworth filter."""
filt = ButterworthFilter(5.0, 50, order=2)
assert len(filt.b) == 3
assert len(filt.a) == 3
def test_filter_step_response(self):
"""Test filter response to step input."""
filt = ButterworthFilter(5.0, 50, order=2)
# Apply step input (0 -> 1.0)
outputs = []
for i in range(50):
y = filt.filter(1.0)
outputs.append(y)
# Final output should be close to 1.0
assert outputs[-1] > 0.9
assert outputs[-1] <= 1.0
def test_filter_constant_input(self):
"""Test filter with constant input."""
filt = ButterworthFilter(5.0, 50, order=2)
# Apply constant input
for i in range(100):
y = filt.filter(2.5)
# Should converge to input value
assert abs(y - 2.5) < 0.01
def test_filter_zero_input(self):
"""Test filter with zero input."""
filt = ButterworthFilter(5.0, 50, order=2)
# Apply non-zero then zero
for i in range(50):
filt.filter(1.0)
# Now apply zero
for i in range(50):
y = filt.filter(0.0)
# Should decay to zero
assert abs(y) < 0.01
def test_filter_reset(self):
"""Test filter state reset."""
filt = ButterworthFilter(5.0, 50, order=2)
# Filter some values
for i in range(10):
filt.filter(1.0)
# Reset
filt.reset()
# State should be zero
assert filt.x_history == [0.0, 0.0, 0.0]
assert filt.y_history == [0.0, 0.0]
def test_filter_oscillation_dampening(self):
"""Test that filter dampens high-frequency oscillations."""
filt = ButterworthFilter(5.0, 50, order=2)
# Apply alternating signal (high frequency)
outputs = []
for i in range(100):
x = 1.0 if i % 2 == 0 else -1.0
y = filt.filter(x)
outputs.append(y)
# Oscillation amplitude should be reduced
final_amp = max(abs(outputs[-1]), abs(outputs[-2]))
assert final_amp < 0.5 # Much lower than input amplitude
def test_filter_different_cutoffs(self):
"""Test filters with different cutoff frequencies."""
filt_low = ButterworthFilter(2.0, 50, order=2)
filt_high = ButterworthFilter(10.0, 50, order=2)
# Both should be valid
assert filt_low.cutoff_hz == 2.0
assert filt_high.cutoff_hz == 10.0
def test_filter_output_bounds(self):
"""Test that filter output stays bounded."""
filt = ButterworthFilter(5.0, 50, order=2)
# Apply large random-like values
for i in range(100):
x = math.sin(i * 0.5) * 5.0
y = filt.filter(x)
assert abs(y) < 10.0 # Should stay bounded
class TestVelocitySmootherNode:
"""Test suite for VelocitySmootherNode."""
def test_node_initialization(self, node):
"""Test that node initializes correctly."""
assert node.last_odom is None
assert node.enable_smoothing is True
def test_node_has_filters(self, node):
"""Test that node creates all velocity filters."""
assert node.filter_linear_x is not None
assert node.filter_linear_y is not None
assert node.filter_linear_z is not None
assert node.filter_angular_x is not None
assert node.filter_angular_y is not None
assert node.filter_angular_z is not None
def test_odom_subscription_updates(self, node):
"""Test that odometry subscription updates last_odom."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.linear.x = 1.0
node._on_odom(odom)
assert node.last_odom is not None
assert node.last_odom.twist.twist.linear.x == 1.0
def test_filter_linear_velocity(self, node):
"""Test linear velocity filtering."""
# Create odometry message
odom = Odometry()
odom.header.frame_id = "odom"
odom.child_frame_id = "base_link"
odom.twist.twist.linear.x = 1.0
odom.twist.twist.linear.y = 0.5
odom.twist.twist.linear.z = 0.0
odom.twist.twist.angular.x = 0.0
odom.twist.twist.angular.y = 0.0
odom.twist.twist.angular.z = 0.2
node._on_odom(odom)
# Call timer callback to process
node._timer_callback()
# Filter should have been applied
assert node.filter_linear_x.x_history[0] == 1.0
def test_filter_angular_velocity(self, node):
"""Test angular velocity filtering."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.angular.z = 0.5
node._on_odom(odom)
node._timer_callback()
assert node.filter_angular_z.x_history[0] == 0.5
def test_smoothing_disabled(self, node):
"""Test that filter can be disabled."""
node.enable_smoothing = False
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.linear.x = 2.0
node._on_odom(odom)
node._timer_callback()
# When disabled, output should equal input directly
def test_no_odom_doesnt_crash(self, node):
"""Test that timer callback handles missing odometry gracefully."""
# Call timer without setting odometry
node._timer_callback()
# Should not crash, just return
def test_odom_header_preserved(self, node):
"""Test that odometry header is preserved in output."""
odom = Odometry()
odom.header.frame_id = "test_frame"
odom.header.stamp = node.get_clock().now()
odom.child_frame_id = "test_child"
node._on_odom(odom)
# Timer callback processes it
node._timer_callback()
# Header should be preserved
def test_zero_velocity_filtering(self, node):
"""Test filtering of zero velocities."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.linear.x = 0.0
odom.twist.twist.linear.y = 0.0
odom.twist.twist.linear.z = 0.0
odom.twist.twist.angular.x = 0.0
odom.twist.twist.angular.y = 0.0
odom.twist.twist.angular.z = 0.0
node._on_odom(odom)
node._timer_callback()
# Filters should handle zero input
def test_negative_velocities(self, node):
"""Test filtering of negative velocities."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.linear.x = -1.0
odom.twist.twist.angular.z = -0.5
node._on_odom(odom)
node._timer_callback()
assert node.filter_linear_x.x_history[0] == -1.0
def test_high_frequency_noise_dampening(self, node):
"""Test that filter dampens high-frequency encoder noise."""
# Simulate noisy encoder output
base_velocity = 1.0
noise_amplitude = 0.2
odom = Odometry()
odom.header.frame_id = "odom"
# Apply alternating noise
for i in range(100):
odom.twist.twist.linear.x = base_velocity + (noise_amplitude if i % 2 == 0 else -noise_amplitude)
node._on_odom(odom)
node._timer_callback()
# After filtering, output should be close to base velocity
# (oscillations dampened)
def test_large_velocity_values(self, node):
"""Test filtering of large velocity values."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.linear.x = 10.0
odom.twist.twist.angular.z = 5.0
node._on_odom(odom)
node._timer_callback()
# Should handle large values without overflow
def test_pose_unchanged(self, node):
"""Test that pose is not modified by velocity filtering."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.pose.pose.position.x = 5.0
odom.pose.pose.position.y = 3.0
odom.twist.twist.linear.x = 1.0
node._on_odom(odom)
node._timer_callback()
# Pose should be copied unchanged
def test_multiple_velocity_updates(self, node):
"""Test filtering across multiple sequential velocity updates."""
odom = Odometry()
odom.header.frame_id = "odom"
velocities = [0.5, 1.0, 1.5, 1.0, 0.5, 0.0]
for v in velocities:
odom.twist.twist.linear.x = v
node._on_odom(odom)
node._timer_callback()
# Filter should smooth the velocity sequence
def test_simultaneous_all_velocities(self, node):
"""Test filtering when all velocity components are present."""
odom = Odometry()
odom.header.frame_id = "odom"
for i in range(30):
odom.twist.twist.linear.x = math.sin(i * 0.1)
odom.twist.twist.linear.y = math.cos(i * 0.1) * 0.5
odom.twist.twist.linear.z = 0.1
odom.twist.twist.angular.x = 0.05
odom.twist.twist.angular.y = 0.05
odom.twist.twist.angular.z = math.sin(i * 0.15)
node._on_odom(odom)
node._timer_callback()
# All filters should operate independently
class TestVelocitySmootherScenarios:
"""Integration-style tests for realistic scenarios."""
def test_scenario_constant_velocity(self, node):
"""Scenario: robot moving at constant velocity."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.linear.x = 1.0
for i in range(50):
node._on_odom(odom)
node._timer_callback()
# Should maintain constant velocity after convergence
def test_scenario_velocity_ramp(self, node):
"""Scenario: velocity ramping up from stop."""
odom = Odometry()
odom.header.frame_id = "odom"
for i in range(50):
odom.twist.twist.linear.x = i * 0.02 # Ramp from 0 to 1.0
node._on_odom(odom)
node._timer_callback()
# Filter should smooth the ramp
def test_scenario_velocity_step(self, node):
"""Scenario: sudden velocity change (e.g., collision avoidance)."""
odom = Odometry()
odom.header.frame_id = "odom"
# First phase: constant velocity
odom.twist.twist.linear.x = 1.0
for i in range(25):
node._on_odom(odom)
node._timer_callback()
# Second phase: sudden stop
odom.twist.twist.linear.x = 0.0
for i in range(25):
node._on_odom(odom)
node._timer_callback()
# Filter should transition smoothly
def test_scenario_rotation_only(self, node):
"""Scenario: robot spinning in place."""
odom = Odometry()
odom.header.frame_id = "odom"
odom.twist.twist.linear.x = 0.0
odom.twist.twist.angular.z = 0.5
for i in range(50):
node._on_odom(odom)
node._timer_callback()
# Angular velocity should be filtered
def test_scenario_mixed_motion(self, node):
"""Scenario: combined linear and angular motion."""
odom = Odometry()
odom.header.frame_id = "odom"
for i in range(50):
odom.twist.twist.linear.x = math.cos(i * 0.1)
odom.twist.twist.linear.y = math.sin(i * 0.1)
odom.twist.twist.angular.z = 0.2
node._on_odom(odom)
node._timer_callback()
# Both linear and angular components should be filtered
def test_scenario_encoder_noise_reduction(self, node):
"""Scenario: realistic encoder jitter with filtering."""
odom = Odometry()
odom.header.frame_id = "odom"
# Simulate encoder jitter: base velocity + small noise
base_vel = 1.0
for i in range(100):
jitter = 0.05 * math.sin(i * 0.5) + 0.03 * math.cos(i * 0.3)
odom.twist.twist.linear.x = base_vel + jitter
node._on_odom(odom)
node._timer_callback()
# Filter should reduce noise while maintaining base velocity