feat(controls): Smooth acceleration rate limiter (Issue #241) #246

Merged
sl-jetson merged 1 commits from sl-controls/issue-241-smooth-accel into main 2026-03-02 12:46:22 -05:00
10 changed files with 700 additions and 0 deletions
Showing only changes of commit 60d8d19342 - Show all commits

View File

@ -0,0 +1,14 @@
# Acceleration limiter configuration
accel_limiter:
ros__parameters:
# Update frequency (Hz)
frequency: 50 # 50 Hz = 20ms cycle
# Linear acceleration limit (m/s²)
# Prevents jerky forward/backward and lateral motion
linear_accel_limit: 0.5
# Angular acceleration limit (rad/s²)
# Prevents jerky rotation
angular_accel_limit: 1.0

View File

@ -0,0 +1,36 @@
"""Launch file for accel_limiter_node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for acceleration limiter node."""
# Package directory
pkg_dir = get_package_share_directory("saltybot_accel_limiter")
# Parameters
config_file = os.path.join(pkg_dir, "config", "accel_limiter_config.yaml")
# Declare launch arguments
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
# Acceleration limiter node
Node(
package="saltybot_accel_limiter",
executable="accel_limiter_node",
name="accel_limiter",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_accel_limiter</name>
<version>0.1.0</version>
<description>Smooth acceleration and deceleration limiter for velocity commands.</description>
<maintainer email="seb@vayrette.com">Seb</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""Smooth acceleration limiter for velocity commands.
Applies configurable acceleration and deceleration rate limits to prevent jerky motion.
Subscribed topics:
/cmd_vel (geometry_msgs/Twist) - Target velocity command
Published topics:
/cmd_vel_smooth (geometry_msgs/Twist) - Rate-limited velocity command
Prevents sudden velocity changes by limiting rate of change to:
- linear: 0.5 m/ (configurable)
- angular: 1.0 rad/ (configurable)
"""
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.timer import Timer
from geometry_msgs.msg import Twist
class AccelLimiterNode(Node):
"""ROS2 node for smooth acceleration limiting."""
def __init__(self):
super().__init__("accel_limiter")
# Parameters
self.declare_parameter("frequency", 50) # Hz
frequency = self.get_parameter("frequency").value
self.declare_parameter("linear_accel_limit", 0.5) # m/s²
self.declare_parameter("angular_accel_limit", 1.0) # rad/s²
self.linear_accel_limit = self.get_parameter("linear_accel_limit").value
self.angular_accel_limit = self.get_parameter("angular_accel_limit").value
self.period = 1.0 / frequency
# Latest target velocity from subscription
self.target_vel: Optional[Twist] = None
# Current smoothed velocity (published)
self.current_vel = Twist()
# Subscription to target velocity
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
# Publisher for smoothed velocity
self.pub_smooth = self.create_publisher(Twist, "/cmd_vel_smooth", 10)
# Timer for rate limiting at configured frequency
self.timer: Timer = self.create_timer(self.period, self._timer_callback)
self.get_logger().info(
f"Acceleration limiter initialized at {frequency}Hz. "
f"Linear accel limit: {self.linear_accel_limit} m/s². "
f"Angular accel limit: {self.angular_accel_limit} rad/s²"
)
def _on_cmd_vel(self, msg: Twist) -> None:
"""Update target velocity from cmd_vel subscription."""
self.target_vel = msg
def _timer_callback(self) -> None:
"""Apply rate limiting and publish smoothed velocity."""
if self.target_vel is None:
# No target yet, publish zero velocity
self.pub_smooth.publish(self.current_vel)
return
# Create new velocity with rate-limited changes
limited_vel = Twist()
# Rate limit linear components
limited_vel.linear.x = self._limit_component(
self.current_vel.linear.x,
self.target_vel.linear.x,
self.linear_accel_limit,
)
limited_vel.linear.y = self._limit_component(
self.current_vel.linear.y,
self.target_vel.linear.y,
self.linear_accel_limit,
)
limited_vel.linear.z = self._limit_component(
self.current_vel.linear.z,
self.target_vel.linear.z,
self.linear_accel_limit,
)
# Rate limit angular components
limited_vel.angular.x = self._limit_component(
self.current_vel.angular.x,
self.target_vel.angular.x,
self.angular_accel_limit,
)
limited_vel.angular.y = self._limit_component(
self.current_vel.angular.y,
self.target_vel.angular.y,
self.angular_accel_limit,
)
limited_vel.angular.z = self._limit_component(
self.current_vel.angular.z,
self.target_vel.angular.z,
self.angular_accel_limit,
)
# Update current velocity and publish
self.current_vel = limited_vel
self.pub_smooth.publish(self.current_vel)
def _limit_component(
self, current: float, target: float, accel_limit: float
) -> float:
"""Apply acceleration limit to a single velocity component.
Args:
current: Current velocity value (m/s or rad/s)
target: Target velocity value (m/s or rad/s)
accel_limit: Maximum acceleration (m/ or rad/)
Returns:
Rate-limited velocity value
"""
# Compute desired change
change = target - current
# Maximum change allowed in this time period: accel_limit * dt
max_change = accel_limit * self.period
# Clamp change to limits
if change > max_change:
return current + max_change
elif change < -max_change:
return current - max_change
else:
return target
def main(args=None):
rclpy.init(args=args)
node = AccelLimiterNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script-dir=$base/lib/saltybot_accel_limiter
[install]
install-scripts=$base/lib/saltybot_accel_limiter

View File

@ -0,0 +1,27 @@
from setuptools import find_packages, setup
package_name = "saltybot_accel_limiter"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
("share/" + package_name + "/launch", ["launch/accel_limiter.launch.py"]),
("share/" + package_name + "/config", ["config/accel_limiter_config.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="Seb",
maintainer_email="seb@vayrette.com",
description="Smooth acceleration and deceleration rate limiting for velocity commands",
license="Apache-2.0",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"accel_limiter_node = saltybot_accel_limiter.accel_limiter_node:main",
],
},
)

View File

@ -0,0 +1,442 @@
"""Unit tests for accel_limiter_node."""
import pytest
import math
from geometry_msgs.msg import Twist, Vector3
import rclpy
# Import the node under test
from saltybot_accel_limiter.accel_limiter_node import AccelLimiterNode
@pytest.fixture
def rclpy_fixture():
"""Initialize and cleanup rclpy."""
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
"""Create an acceleration limiter node instance."""
node = AccelLimiterNode()
yield node
node.destroy_node()
class TestNodeInitialization:
"""Test suite for node initialization."""
def test_node_initialization(self, node):
"""Test that node initializes with correct defaults."""
assert node.linear_accel_limit == 0.5
assert node.angular_accel_limit == 1.0
assert node.target_vel is None
assert node.current_vel.linear.x == 0.0
assert node.current_vel.linear.y == 0.0
assert node.current_vel.linear.z == 0.0
assert node.current_vel.angular.x == 0.0
assert node.current_vel.angular.y == 0.0
assert node.current_vel.angular.z == 0.0
def test_frequency_parameter(self, node):
"""Test frequency parameter is set correctly."""
frequency = node.get_parameter("frequency").value
assert frequency == 50
def test_linear_accel_parameter(self, node):
"""Test linear acceleration limit parameter."""
limit = node.get_parameter("linear_accel_limit").value
assert limit == 0.5
def test_angular_accel_parameter(self, node):
"""Test angular acceleration limit parameter."""
limit = node.get_parameter("angular_accel_limit").value
assert limit == 1.0
def test_period_calculation(self, node):
"""Test that time period is correctly calculated from frequency."""
# 50 Hz = 20ms = 0.02s
assert node.period == pytest.approx(0.02)
class TestLimitComponent:
"""Test suite for _limit_component method."""
def test_no_change_needed(self, node):
"""Test when target equals current (no acceleration needed)."""
result = node._limit_component(1.0, 1.0, 0.5)
assert result == pytest.approx(1.0)
def test_zero_acceleration(self, node):
"""Test when current is zero and target is zero."""
result = node._limit_component(0.0, 0.0, 0.5)
assert result == pytest.approx(0.0)
def test_small_change_within_limit(self, node):
"""Test small change that is within acceleration limit."""
# Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s
# Desired change: 0.005 m/s (less than limit)
result = node._limit_component(0.0, 0.005, 0.5)
assert result == pytest.approx(0.005)
def test_acceleration_exceeds_limit(self, node):
"""Test acceleration that exceeds the limit."""
# Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s
# Desired change: 0.1 m/s (much greater than limit)
result = node._limit_component(0.0, 0.1, 0.5)
assert result == pytest.approx(0.01)
def test_deceleration_exceeds_limit(self, node):
"""Test deceleration that exceeds the limit."""
# Max change in 20ms at 0.5 m/s²: -0.5 * 0.02 = -0.01 m/s
# Desired change: -0.1 m/s (much more negative than limit)
result = node._limit_component(0.1, 0.0, 0.5)
assert result == pytest.approx(0.09)
def test_negative_to_positive_acceleration(self, node):
"""Test acceleration from negative to positive velocity."""
result = node._limit_component(-0.1, 0.1, 0.5)
# Max change: 0.01, so -0.1 + 0.01 = -0.09
assert result == pytest.approx(-0.09)
def test_positive_to_negative_deceleration(self, node):
"""Test deceleration from positive to negative velocity."""
result = node._limit_component(0.1, -0.1, 0.5)
# Max change: -0.01, so 0.1 - 0.01 = 0.09
assert result == pytest.approx(0.09)
def test_angular_acceleration_limit(self, node):
"""Test with angular acceleration limit."""
# Max change in 20ms at 1.0 rad/s²: 1.0 * 0.02 = 0.02 rad/s
# Desired change: 0.1 rad/s (exceeds limit)
result = node._limit_component(0.0, 0.1, 1.0)
assert result == pytest.approx(0.02)
def test_large_acceleration_value(self, node):
"""Test with large acceleration limit."""
# Max change in 20ms at 10.0 m/s²: 10.0 * 0.02 = 0.2 m/s
# Desired change: 0.1 m/s (within limit)
result = node._limit_component(0.0, 0.1, 10.0)
assert result == pytest.approx(0.1)
class TestSubscriptionHandling:
"""Test suite for cmd_vel subscription."""
def test_cmd_vel_subscription(self, node):
"""Test that cmd_vel subscription updates target velocity."""
cmd = Twist()
cmd.linear.x = 1.0
cmd.linear.y = 0.5
cmd.angular.z = 0.2
node._on_cmd_vel(cmd)
assert node.target_vel is cmd
assert node.target_vel.linear.x == 1.0
assert node.target_vel.linear.y == 0.5
assert node.target_vel.angular.z == 0.2
def test_multiple_cmd_vel_updates(self, node):
"""Test that subscription updates replace previous command."""
cmd1 = Twist()
cmd1.linear.x = 1.0
cmd2 = Twist()
cmd2.linear.x = 2.0
node._on_cmd_vel(cmd1)
assert node.target_vel.linear.x == 1.0
node._on_cmd_vel(cmd2)
assert node.target_vel.linear.x == 2.0
class TestTimerCallback:
"""Test suite for timer callback and rate limiting."""
def test_timer_callback_no_target(self, node):
"""Test timer callback when no target velocity received yet."""
# Should publish current velocity (zero)
node._timer_callback()
assert node.current_vel.linear.x == 0.0
assert node.current_vel.angular.z == 0.0
def test_timer_callback_linear_acceleration(self, node):
"""Test linear acceleration limiting."""
# Target: 1.0 m/s
target = Twist()
target.linear.x = 1.0
node._on_cmd_vel(target)
# First callback: current should move towards target, limited
node._timer_callback()
# Max acceleration: 0.5 m/s², period: 0.02s = 0.01 m/s change
assert node.current_vel.linear.x == pytest.approx(0.01)
# Second callback: should continue accelerating
node._timer_callback()
assert node.current_vel.linear.x == pytest.approx(0.02)
def test_timer_callback_linear_deceleration(self, node):
"""Test linear deceleration limiting."""
# Start with velocity
node.current_vel.linear.x = 1.0
# Target: 0 m/s (deceleration)
target = Twist()
target.linear.x = 0.0
node._on_cmd_vel(target)
# Callback should decelerate limited by rate limit
node._timer_callback()
# Max deceleration: -0.01 m/s, so 1.0 - 0.01 = 0.99
assert node.current_vel.linear.x == pytest.approx(0.99)
def test_timer_callback_angular_acceleration(self, node):
"""Test angular acceleration limiting."""
target = Twist()
target.angular.z = 1.0
node._on_cmd_vel(target)
node._timer_callback()
# Max angular acceleration: 1.0 rad/s², period: 0.02s = 0.02 rad/s
assert node.current_vel.angular.z == pytest.approx(0.02)
def test_timer_callback_all_components(self, node):
"""Test rate limiting across all velocity components."""
target = Twist()
target.linear.x = 1.0
target.linear.y = 0.5
target.linear.z = 0.2
target.angular.x = 0.1
target.angular.y = 0.2
target.angular.z = 1.0
node._on_cmd_vel(target)
node._timer_callback()
# All linear components should be limited by linear_accel_limit
assert node.current_vel.linear.x == pytest.approx(0.01)
assert node.current_vel.linear.y == pytest.approx(0.005)
assert node.current_vel.linear.z == pytest.approx(0.002)
# All angular components should be limited by angular_accel_limit
assert node.current_vel.angular.x == pytest.approx(0.002)
assert node.current_vel.angular.y == pytest.approx(0.004)
assert node.current_vel.angular.z == pytest.approx(0.02)
def test_timer_reaches_target(self, node):
"""Test that velocity eventually reaches target after multiple callbacks."""
target = Twist()
target.linear.x = 0.1
node._on_cmd_vel(target)
# Max change per callback: 0.01
# Target: 0.1, so needs 10 callbacks to reach it
for _ in range(5):
node._timer_callback()
# After 5 callbacks: 0.05
assert node.current_vel.linear.x == pytest.approx(0.05)
# Continue for 5 more callbacks
for _ in range(5):
node._timer_callback()
# Should be at target now or very close
assert node.current_vel.linear.x == pytest.approx(0.1)
def test_timer_stops_at_target(self, node):
"""Test that velocity stops changing once target is reached."""
target = Twist()
target.linear.x = 0.01
node._on_cmd_vel(target)
node._timer_callback()
assert node.current_vel.linear.x == pytest.approx(0.01)
# Next callback should stay at target
node._timer_callback()
assert node.current_vel.linear.x == pytest.approx(0.01)
class TestRateLimit:
"""Test suite for rate limiting behavior."""
def test_linear_vs_angular_different_limits(self, node):
"""Test that linear and angular limits are independent."""
target = Twist()
target.linear.x = 1.0
target.angular.z = 1.0
node._on_cmd_vel(target)
node._timer_callback()
# Linear limited by 0.5 m/s²: 0.5 * 0.02 = 0.01
assert node.current_vel.linear.x == pytest.approx(0.01)
# Angular limited by 1.0 rad/s²: 1.0 * 0.02 = 0.02
assert node.current_vel.angular.z == pytest.approx(0.02)
def test_emergency_stop(self, node):
"""Test emergency deceleration from high speed."""
# Start at high speed
node.current_vel.linear.x = 2.0
# Sudden stop command
target = Twist()
target.linear.x = 0.0
node._on_cmd_vel(target)
# Should decelerate gradually, not instantly
node._timer_callback()
assert node.current_vel.linear.x == pytest.approx(1.99)
assert node.current_vel.linear.x > 0.0 # Not zero yet
def test_alternating_direction(self, node):
"""Test rapid direction changes are smoothed."""
# Accelerate forward
target1 = Twist()
target1.linear.x = 0.5
node._on_cmd_vel(target1)
for _ in range(10):
node._timer_callback()
vel_forward = node.current_vel.linear.x
# Suddenly reverse
target2 = Twist()
target2.linear.x = -0.5
node._on_cmd_vel(target2)
node._timer_callback()
vel_after_reverse = node.current_vel.linear.x
# Should still be positive but decreasing
assert vel_after_reverse < vel_forward
assert vel_after_reverse > -0.5
class TestScenarios:
"""Integration-style tests for realistic scenarios."""
def test_scenario_gradual_acceleration(self, node):
"""Scenario: Smooth acceleration from stop to cruising speed."""
target = Twist()
target.linear.x = 0.5 # Cruise at 0.5 m/s
node._on_cmd_vel(target)
velocities = []
for _ in range(100):
node._timer_callback()
velocities.append(node.current_vel.linear.x)
# Velocities should be monotonically increasing
for i in range(1, len(velocities)):
assert velocities[i] >= velocities[i - 1]
# Should reach target
assert velocities[-1] == pytest.approx(0.5)
def test_scenario_smooth_stop(self, node):
"""Scenario: Smooth deceleration to stop."""
# Start at cruising speed
node.current_vel.linear.x = 0.5
# Command to stop
target = Twist()
target.linear.x = 0.0
node._on_cmd_vel(target)
velocities = []
for _ in range(100):
node._timer_callback()
velocities.append(node.current_vel.linear.x)
# Velocities should be monotonically decreasing
for i in range(1, len(velocities)):
assert velocities[i] <= velocities[i - 1]
# Should reach zero
assert velocities[-1] == pytest.approx(0.0)
def test_scenario_turn_while_moving(self, node):
"""Scenario: Robot turns while moving forward."""
node.current_vel.linear.x = 0.5
node.current_vel.linear.y = 0.0
target = Twist()
target.linear.x = 0.5
target.linear.y = 0.2
target.angular.z = 0.5
node._on_cmd_vel(target)
# Multiple iterations to see smooth change
for _ in range(50):
node._timer_callback()
# Forward velocity unchanged (same target)
assert node.current_vel.linear.x == pytest.approx(0.5)
# Lateral velocity increased toward target
assert node.current_vel.linear.y > 0.0
assert node.current_vel.linear.y <= 0.2
# Angular velocity increased
assert node.current_vel.angular.z > 0.0
assert node.current_vel.angular.z <= 0.5
def test_scenario_obstacle_avoidance(self, node):
"""Scenario: Quick lateral movement due to obstacle."""
target = Twist()
target.linear.x = 0.5
target.linear.y = 0.3 # Quick dodge sideways
node._on_cmd_vel(target)
# Y velocity should increase gradually, not jump to 0.3
accelerations = []
for _ in range(10):
node._timer_callback()
accelerations.append(node.current_vel.linear.y)
# Each step should be limited
for i in range(1, len(accelerations)):
delta = accelerations[i] - accelerations[i - 1]
max_delta = 0.5 * 0.02 # linear_accel_limit * period
assert abs(delta) <= max_delta + 1e-6 # Small tolerance
def test_scenario_continuous_motion(self, node):
"""Scenario: Continuous velocity updates (tracking)."""
for i in range(100):
# Gradually increasing target
target = Twist()
target.linear.x = i * 0.001
node._on_cmd_vel(target)
node._timer_callback()
# Current velocity should lag behind target
assert node.current_vel.linear.x < 0.1
def test_scenario_oscillating_command(self, node):
"""Scenario: Target velocity oscillates."""
oscillations = []
for i in range(200):
# Sinusoidal target
target = Twist()
target.linear.x = 0.5 * (1 + math.sin(i * 0.05))
node._on_cmd_vel(target)
node._timer_callback()
oscillations.append(node.current_vel.linear.x)
# Velocity should lag behind target but not diverge
assert all(0.0 <= v <= 1.0 for v in oscillations)