Merge pull request 'feat(controls): Smooth acceleration rate limiter (Issue #241)' (#246) from sl-controls/issue-241-smooth-accel into main
This commit is contained in:
commit
3c276b0bce
@ -0,0 +1,14 @@
|
|||||||
|
# Acceleration limiter configuration
|
||||||
|
|
||||||
|
accel_limiter:
|
||||||
|
ros__parameters:
|
||||||
|
# Update frequency (Hz)
|
||||||
|
frequency: 50 # 50 Hz = 20ms cycle
|
||||||
|
|
||||||
|
# Linear acceleration limit (m/s²)
|
||||||
|
# Prevents jerky forward/backward and lateral motion
|
||||||
|
linear_accel_limit: 0.5
|
||||||
|
|
||||||
|
# Angular acceleration limit (rad/s²)
|
||||||
|
# Prevents jerky rotation
|
||||||
|
angular_accel_limit: 1.0
|
||||||
@ -0,0 +1,36 @@
|
|||||||
|
"""Launch file for accel_limiter_node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description for acceleration limiter node."""
|
||||||
|
# Package directory
|
||||||
|
pkg_dir = get_package_share_directory("saltybot_accel_limiter")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
config_file = os.path.join(pkg_dir, "config", "accel_limiter_config.yaml")
|
||||||
|
|
||||||
|
# Declare launch arguments
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"config_file",
|
||||||
|
default_value=config_file,
|
||||||
|
description="Path to configuration YAML file",
|
||||||
|
),
|
||||||
|
# Acceleration limiter node
|
||||||
|
Node(
|
||||||
|
package="saltybot_accel_limiter",
|
||||||
|
executable="accel_limiter_node",
|
||||||
|
name="accel_limiter",
|
||||||
|
output="screen",
|
||||||
|
parameters=[LaunchConfiguration("config_file")],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
21
jetson/ros2_ws/src/saltybot_accel_limiter/package.xml
Normal file
21
jetson/ros2_ws/src/saltybot_accel_limiter/package.xml
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_accel_limiter</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Smooth acceleration and deceleration limiter for velocity commands.</description>
|
||||||
|
<maintainer email="seb@vayrette.com">Seb</maintainer>
|
||||||
|
<license>Apache-2.0</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,156 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Smooth acceleration limiter for velocity commands.
|
||||||
|
|
||||||
|
Applies configurable acceleration and deceleration rate limits to prevent jerky motion.
|
||||||
|
|
||||||
|
Subscribed topics:
|
||||||
|
/cmd_vel (geometry_msgs/Twist) - Target velocity command
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/cmd_vel_smooth (geometry_msgs/Twist) - Rate-limited velocity command
|
||||||
|
|
||||||
|
Prevents sudden velocity changes by limiting rate of change to:
|
||||||
|
- linear: 0.5 m/s² (configurable)
|
||||||
|
- angular: 1.0 rad/s² (configurable)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.timer import Timer
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
|
||||||
|
|
||||||
|
class AccelLimiterNode(Node):
|
||||||
|
"""ROS2 node for smooth acceleration limiting."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("accel_limiter")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter("frequency", 50) # Hz
|
||||||
|
frequency = self.get_parameter("frequency").value
|
||||||
|
self.declare_parameter("linear_accel_limit", 0.5) # m/s²
|
||||||
|
self.declare_parameter("angular_accel_limit", 1.0) # rad/s²
|
||||||
|
|
||||||
|
self.linear_accel_limit = self.get_parameter("linear_accel_limit").value
|
||||||
|
self.angular_accel_limit = self.get_parameter("angular_accel_limit").value
|
||||||
|
|
||||||
|
self.period = 1.0 / frequency
|
||||||
|
|
||||||
|
# Latest target velocity from subscription
|
||||||
|
self.target_vel: Optional[Twist] = None
|
||||||
|
|
||||||
|
# Current smoothed velocity (published)
|
||||||
|
self.current_vel = Twist()
|
||||||
|
|
||||||
|
# Subscription to target velocity
|
||||||
|
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
|
||||||
|
|
||||||
|
# Publisher for smoothed velocity
|
||||||
|
self.pub_smooth = self.create_publisher(Twist, "/cmd_vel_smooth", 10)
|
||||||
|
|
||||||
|
# Timer for rate limiting at configured frequency
|
||||||
|
self.timer: Timer = self.create_timer(self.period, self._timer_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Acceleration limiter initialized at {frequency}Hz. "
|
||||||
|
f"Linear accel limit: {self.linear_accel_limit} m/s². "
|
||||||
|
f"Angular accel limit: {self.angular_accel_limit} rad/s²"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||||
|
"""Update target velocity from cmd_vel subscription."""
|
||||||
|
self.target_vel = msg
|
||||||
|
|
||||||
|
def _timer_callback(self) -> None:
|
||||||
|
"""Apply rate limiting and publish smoothed velocity."""
|
||||||
|
if self.target_vel is None:
|
||||||
|
# No target yet, publish zero velocity
|
||||||
|
self.pub_smooth.publish(self.current_vel)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create new velocity with rate-limited changes
|
||||||
|
limited_vel = Twist()
|
||||||
|
|
||||||
|
# Rate limit linear components
|
||||||
|
limited_vel.linear.x = self._limit_component(
|
||||||
|
self.current_vel.linear.x,
|
||||||
|
self.target_vel.linear.x,
|
||||||
|
self.linear_accel_limit,
|
||||||
|
)
|
||||||
|
limited_vel.linear.y = self._limit_component(
|
||||||
|
self.current_vel.linear.y,
|
||||||
|
self.target_vel.linear.y,
|
||||||
|
self.linear_accel_limit,
|
||||||
|
)
|
||||||
|
limited_vel.linear.z = self._limit_component(
|
||||||
|
self.current_vel.linear.z,
|
||||||
|
self.target_vel.linear.z,
|
||||||
|
self.linear_accel_limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Rate limit angular components
|
||||||
|
limited_vel.angular.x = self._limit_component(
|
||||||
|
self.current_vel.angular.x,
|
||||||
|
self.target_vel.angular.x,
|
||||||
|
self.angular_accel_limit,
|
||||||
|
)
|
||||||
|
limited_vel.angular.y = self._limit_component(
|
||||||
|
self.current_vel.angular.y,
|
||||||
|
self.target_vel.angular.y,
|
||||||
|
self.angular_accel_limit,
|
||||||
|
)
|
||||||
|
limited_vel.angular.z = self._limit_component(
|
||||||
|
self.current_vel.angular.z,
|
||||||
|
self.target_vel.angular.z,
|
||||||
|
self.angular_accel_limit,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update current velocity and publish
|
||||||
|
self.current_vel = limited_vel
|
||||||
|
self.pub_smooth.publish(self.current_vel)
|
||||||
|
|
||||||
|
def _limit_component(
|
||||||
|
self, current: float, target: float, accel_limit: float
|
||||||
|
) -> float:
|
||||||
|
"""Apply acceleration limit to a single velocity component.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
current: Current velocity value (m/s or rad/s)
|
||||||
|
target: Target velocity value (m/s or rad/s)
|
||||||
|
accel_limit: Maximum acceleration (m/s² or rad/s²)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Rate-limited velocity value
|
||||||
|
"""
|
||||||
|
# Compute desired change
|
||||||
|
change = target - current
|
||||||
|
|
||||||
|
# Maximum change allowed in this time period: accel_limit * dt
|
||||||
|
max_change = accel_limit * self.period
|
||||||
|
|
||||||
|
# Clamp change to limits
|
||||||
|
if change > max_change:
|
||||||
|
return current + max_change
|
||||||
|
elif change < -max_change:
|
||||||
|
return current - max_change
|
||||||
|
else:
|
||||||
|
return target
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = AccelLimiterNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_accel_limiter/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_accel_limiter/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script-dir=$base/lib/saltybot_accel_limiter
|
||||||
|
[install]
|
||||||
|
install-scripts=$base/lib/saltybot_accel_limiter
|
||||||
27
jetson/ros2_ws/src/saltybot_accel_limiter/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_accel_limiter/setup.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
from setuptools import find_packages, setup
|
||||||
|
|
||||||
|
package_name = "saltybot_accel_limiter"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=find_packages(exclude=["test"]),
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
|
||||||
|
("share/" + package_name, ["package.xml"]),
|
||||||
|
("share/" + package_name + "/launch", ["launch/accel_limiter.launch.py"]),
|
||||||
|
("share/" + package_name + "/config", ["config/accel_limiter_config.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="Seb",
|
||||||
|
maintainer_email="seb@vayrette.com",
|
||||||
|
description="Smooth acceleration and deceleration rate limiting for velocity commands",
|
||||||
|
license="Apache-2.0",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"accel_limiter_node = saltybot_accel_limiter.accel_limiter_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,442 @@
|
|||||||
|
"""Unit tests for accel_limiter_node."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import math
|
||||||
|
from geometry_msgs.msg import Twist, Vector3
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
|
||||||
|
# Import the node under test
|
||||||
|
from saltybot_accel_limiter.accel_limiter_node import AccelLimiterNode
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rclpy_fixture():
|
||||||
|
"""Initialize and cleanup rclpy."""
|
||||||
|
rclpy.init()
|
||||||
|
yield
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def node(rclpy_fixture):
|
||||||
|
"""Create an acceleration limiter node instance."""
|
||||||
|
node = AccelLimiterNode()
|
||||||
|
yield node
|
||||||
|
node.destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
class TestNodeInitialization:
|
||||||
|
"""Test suite for node initialization."""
|
||||||
|
|
||||||
|
def test_node_initialization(self, node):
|
||||||
|
"""Test that node initializes with correct defaults."""
|
||||||
|
assert node.linear_accel_limit == 0.5
|
||||||
|
assert node.angular_accel_limit == 1.0
|
||||||
|
assert node.target_vel is None
|
||||||
|
assert node.current_vel.linear.x == 0.0
|
||||||
|
assert node.current_vel.linear.y == 0.0
|
||||||
|
assert node.current_vel.linear.z == 0.0
|
||||||
|
assert node.current_vel.angular.x == 0.0
|
||||||
|
assert node.current_vel.angular.y == 0.0
|
||||||
|
assert node.current_vel.angular.z == 0.0
|
||||||
|
|
||||||
|
def test_frequency_parameter(self, node):
|
||||||
|
"""Test frequency parameter is set correctly."""
|
||||||
|
frequency = node.get_parameter("frequency").value
|
||||||
|
assert frequency == 50
|
||||||
|
|
||||||
|
def test_linear_accel_parameter(self, node):
|
||||||
|
"""Test linear acceleration limit parameter."""
|
||||||
|
limit = node.get_parameter("linear_accel_limit").value
|
||||||
|
assert limit == 0.5
|
||||||
|
|
||||||
|
def test_angular_accel_parameter(self, node):
|
||||||
|
"""Test angular acceleration limit parameter."""
|
||||||
|
limit = node.get_parameter("angular_accel_limit").value
|
||||||
|
assert limit == 1.0
|
||||||
|
|
||||||
|
def test_period_calculation(self, node):
|
||||||
|
"""Test that time period is correctly calculated from frequency."""
|
||||||
|
# 50 Hz = 20ms = 0.02s
|
||||||
|
assert node.period == pytest.approx(0.02)
|
||||||
|
|
||||||
|
|
||||||
|
class TestLimitComponent:
|
||||||
|
"""Test suite for _limit_component method."""
|
||||||
|
|
||||||
|
def test_no_change_needed(self, node):
|
||||||
|
"""Test when target equals current (no acceleration needed)."""
|
||||||
|
result = node._limit_component(1.0, 1.0, 0.5)
|
||||||
|
assert result == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_zero_acceleration(self, node):
|
||||||
|
"""Test when current is zero and target is zero."""
|
||||||
|
result = node._limit_component(0.0, 0.0, 0.5)
|
||||||
|
assert result == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_small_change_within_limit(self, node):
|
||||||
|
"""Test small change that is within acceleration limit."""
|
||||||
|
# Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s
|
||||||
|
# Desired change: 0.005 m/s (less than limit)
|
||||||
|
result = node._limit_component(0.0, 0.005, 0.5)
|
||||||
|
assert result == pytest.approx(0.005)
|
||||||
|
|
||||||
|
def test_acceleration_exceeds_limit(self, node):
|
||||||
|
"""Test acceleration that exceeds the limit."""
|
||||||
|
# Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s
|
||||||
|
# Desired change: 0.1 m/s (much greater than limit)
|
||||||
|
result = node._limit_component(0.0, 0.1, 0.5)
|
||||||
|
assert result == pytest.approx(0.01)
|
||||||
|
|
||||||
|
def test_deceleration_exceeds_limit(self, node):
|
||||||
|
"""Test deceleration that exceeds the limit."""
|
||||||
|
# Max change in 20ms at 0.5 m/s²: -0.5 * 0.02 = -0.01 m/s
|
||||||
|
# Desired change: -0.1 m/s (much more negative than limit)
|
||||||
|
result = node._limit_component(0.1, 0.0, 0.5)
|
||||||
|
assert result == pytest.approx(0.09)
|
||||||
|
|
||||||
|
def test_negative_to_positive_acceleration(self, node):
|
||||||
|
"""Test acceleration from negative to positive velocity."""
|
||||||
|
result = node._limit_component(-0.1, 0.1, 0.5)
|
||||||
|
# Max change: 0.01, so -0.1 + 0.01 = -0.09
|
||||||
|
assert result == pytest.approx(-0.09)
|
||||||
|
|
||||||
|
def test_positive_to_negative_deceleration(self, node):
|
||||||
|
"""Test deceleration from positive to negative velocity."""
|
||||||
|
result = node._limit_component(0.1, -0.1, 0.5)
|
||||||
|
# Max change: -0.01, so 0.1 - 0.01 = 0.09
|
||||||
|
assert result == pytest.approx(0.09)
|
||||||
|
|
||||||
|
def test_angular_acceleration_limit(self, node):
|
||||||
|
"""Test with angular acceleration limit."""
|
||||||
|
# Max change in 20ms at 1.0 rad/s²: 1.0 * 0.02 = 0.02 rad/s
|
||||||
|
# Desired change: 0.1 rad/s (exceeds limit)
|
||||||
|
result = node._limit_component(0.0, 0.1, 1.0)
|
||||||
|
assert result == pytest.approx(0.02)
|
||||||
|
|
||||||
|
def test_large_acceleration_value(self, node):
|
||||||
|
"""Test with large acceleration limit."""
|
||||||
|
# Max change in 20ms at 10.0 m/s²: 10.0 * 0.02 = 0.2 m/s
|
||||||
|
# Desired change: 0.1 m/s (within limit)
|
||||||
|
result = node._limit_component(0.0, 0.1, 10.0)
|
||||||
|
assert result == pytest.approx(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSubscriptionHandling:
|
||||||
|
"""Test suite for cmd_vel subscription."""
|
||||||
|
|
||||||
|
def test_cmd_vel_subscription(self, node):
|
||||||
|
"""Test that cmd_vel subscription updates target velocity."""
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 1.0
|
||||||
|
cmd.linear.y = 0.5
|
||||||
|
cmd.angular.z = 0.2
|
||||||
|
|
||||||
|
node._on_cmd_vel(cmd)
|
||||||
|
|
||||||
|
assert node.target_vel is cmd
|
||||||
|
assert node.target_vel.linear.x == 1.0
|
||||||
|
assert node.target_vel.linear.y == 0.5
|
||||||
|
assert node.target_vel.angular.z == 0.2
|
||||||
|
|
||||||
|
def test_multiple_cmd_vel_updates(self, node):
|
||||||
|
"""Test that subscription updates replace previous command."""
|
||||||
|
cmd1 = Twist()
|
||||||
|
cmd1.linear.x = 1.0
|
||||||
|
|
||||||
|
cmd2 = Twist()
|
||||||
|
cmd2.linear.x = 2.0
|
||||||
|
|
||||||
|
node._on_cmd_vel(cmd1)
|
||||||
|
assert node.target_vel.linear.x == 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel(cmd2)
|
||||||
|
assert node.target_vel.linear.x == 2.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestTimerCallback:
|
||||||
|
"""Test suite for timer callback and rate limiting."""
|
||||||
|
|
||||||
|
def test_timer_callback_no_target(self, node):
|
||||||
|
"""Test timer callback when no target velocity received yet."""
|
||||||
|
# Should publish current velocity (zero)
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.current_vel.linear.x == 0.0
|
||||||
|
assert node.current_vel.angular.z == 0.0
|
||||||
|
|
||||||
|
def test_timer_callback_linear_acceleration(self, node):
|
||||||
|
"""Test linear acceleration limiting."""
|
||||||
|
# Target: 1.0 m/s
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 1.0
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
# First callback: current should move towards target, limited
|
||||||
|
node._timer_callback()
|
||||||
|
# Max acceleration: 0.5 m/s², period: 0.02s = 0.01 m/s change
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.01)
|
||||||
|
|
||||||
|
# Second callback: should continue accelerating
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.02)
|
||||||
|
|
||||||
|
def test_timer_callback_linear_deceleration(self, node):
|
||||||
|
"""Test linear deceleration limiting."""
|
||||||
|
# Start with velocity
|
||||||
|
node.current_vel.linear.x = 1.0
|
||||||
|
|
||||||
|
# Target: 0 m/s (deceleration)
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.0
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
# Callback should decelerate limited by rate limit
|
||||||
|
node._timer_callback()
|
||||||
|
# Max deceleration: -0.01 m/s, so 1.0 - 0.01 = 0.99
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.99)
|
||||||
|
|
||||||
|
def test_timer_callback_angular_acceleration(self, node):
|
||||||
|
"""Test angular acceleration limiting."""
|
||||||
|
target = Twist()
|
||||||
|
target.angular.z = 1.0
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
# Max angular acceleration: 1.0 rad/s², period: 0.02s = 0.02 rad/s
|
||||||
|
assert node.current_vel.angular.z == pytest.approx(0.02)
|
||||||
|
|
||||||
|
def test_timer_callback_all_components(self, node):
|
||||||
|
"""Test rate limiting across all velocity components."""
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 1.0
|
||||||
|
target.linear.y = 0.5
|
||||||
|
target.linear.z = 0.2
|
||||||
|
target.angular.x = 0.1
|
||||||
|
target.angular.y = 0.2
|
||||||
|
target.angular.z = 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# All linear components should be limited by linear_accel_limit
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.01)
|
||||||
|
assert node.current_vel.linear.y == pytest.approx(0.005)
|
||||||
|
assert node.current_vel.linear.z == pytest.approx(0.002)
|
||||||
|
|
||||||
|
# All angular components should be limited by angular_accel_limit
|
||||||
|
assert node.current_vel.angular.x == pytest.approx(0.002)
|
||||||
|
assert node.current_vel.angular.y == pytest.approx(0.004)
|
||||||
|
assert node.current_vel.angular.z == pytest.approx(0.02)
|
||||||
|
|
||||||
|
def test_timer_reaches_target(self, node):
|
||||||
|
"""Test that velocity eventually reaches target after multiple callbacks."""
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.1
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
# Max change per callback: 0.01
|
||||||
|
# Target: 0.1, so needs 10 callbacks to reach it
|
||||||
|
for _ in range(5):
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# After 5 callbacks: 0.05
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.05)
|
||||||
|
|
||||||
|
# Continue for 5 more callbacks
|
||||||
|
for _ in range(5):
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Should be at target now or very close
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.1)
|
||||||
|
|
||||||
|
def test_timer_stops_at_target(self, node):
|
||||||
|
"""Test that velocity stops changing once target is reached."""
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.01
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.01)
|
||||||
|
|
||||||
|
# Next callback should stay at target
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRateLimit:
|
||||||
|
"""Test suite for rate limiting behavior."""
|
||||||
|
|
||||||
|
def test_linear_vs_angular_different_limits(self, node):
|
||||||
|
"""Test that linear and angular limits are independent."""
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 1.0
|
||||||
|
target.angular.z = 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Linear limited by 0.5 m/s²: 0.5 * 0.02 = 0.01
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.01)
|
||||||
|
|
||||||
|
# Angular limited by 1.0 rad/s²: 1.0 * 0.02 = 0.02
|
||||||
|
assert node.current_vel.angular.z == pytest.approx(0.02)
|
||||||
|
|
||||||
|
def test_emergency_stop(self, node):
|
||||||
|
"""Test emergency deceleration from high speed."""
|
||||||
|
# Start at high speed
|
||||||
|
node.current_vel.linear.x = 2.0
|
||||||
|
|
||||||
|
# Sudden stop command
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.0
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
# Should decelerate gradually, not instantly
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(1.99)
|
||||||
|
assert node.current_vel.linear.x > 0.0 # Not zero yet
|
||||||
|
|
||||||
|
def test_alternating_direction(self, node):
|
||||||
|
"""Test rapid direction changes are smoothed."""
|
||||||
|
# Accelerate forward
|
||||||
|
target1 = Twist()
|
||||||
|
target1.linear.x = 0.5
|
||||||
|
node._on_cmd_vel(target1)
|
||||||
|
|
||||||
|
for _ in range(10):
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
vel_forward = node.current_vel.linear.x
|
||||||
|
|
||||||
|
# Suddenly reverse
|
||||||
|
target2 = Twist()
|
||||||
|
target2.linear.x = -0.5
|
||||||
|
node._on_cmd_vel(target2)
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
vel_after_reverse = node.current_vel.linear.x
|
||||||
|
|
||||||
|
# Should still be positive but decreasing
|
||||||
|
assert vel_after_reverse < vel_forward
|
||||||
|
assert vel_after_reverse > -0.5
|
||||||
|
|
||||||
|
|
||||||
|
class TestScenarios:
|
||||||
|
"""Integration-style tests for realistic scenarios."""
|
||||||
|
|
||||||
|
def test_scenario_gradual_acceleration(self, node):
|
||||||
|
"""Scenario: Smooth acceleration from stop to cruising speed."""
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.5 # Cruise at 0.5 m/s
|
||||||
|
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
velocities = []
|
||||||
|
for _ in range(100):
|
||||||
|
node._timer_callback()
|
||||||
|
velocities.append(node.current_vel.linear.x)
|
||||||
|
|
||||||
|
# Velocities should be monotonically increasing
|
||||||
|
for i in range(1, len(velocities)):
|
||||||
|
assert velocities[i] >= velocities[i - 1]
|
||||||
|
|
||||||
|
# Should reach target
|
||||||
|
assert velocities[-1] == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_scenario_smooth_stop(self, node):
|
||||||
|
"""Scenario: Smooth deceleration to stop."""
|
||||||
|
# Start at cruising speed
|
||||||
|
node.current_vel.linear.x = 0.5
|
||||||
|
|
||||||
|
# Command to stop
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.0
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
velocities = []
|
||||||
|
for _ in range(100):
|
||||||
|
node._timer_callback()
|
||||||
|
velocities.append(node.current_vel.linear.x)
|
||||||
|
|
||||||
|
# Velocities should be monotonically decreasing
|
||||||
|
for i in range(1, len(velocities)):
|
||||||
|
assert velocities[i] <= velocities[i - 1]
|
||||||
|
|
||||||
|
# Should reach zero
|
||||||
|
assert velocities[-1] == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_scenario_turn_while_moving(self, node):
|
||||||
|
"""Scenario: Robot turns while moving forward."""
|
||||||
|
node.current_vel.linear.x = 0.5
|
||||||
|
node.current_vel.linear.y = 0.0
|
||||||
|
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.5
|
||||||
|
target.linear.y = 0.2
|
||||||
|
target.angular.z = 0.5
|
||||||
|
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
# Multiple iterations to see smooth change
|
||||||
|
for _ in range(50):
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Forward velocity unchanged (same target)
|
||||||
|
assert node.current_vel.linear.x == pytest.approx(0.5)
|
||||||
|
|
||||||
|
# Lateral velocity increased toward target
|
||||||
|
assert node.current_vel.linear.y > 0.0
|
||||||
|
assert node.current_vel.linear.y <= 0.2
|
||||||
|
|
||||||
|
# Angular velocity increased
|
||||||
|
assert node.current_vel.angular.z > 0.0
|
||||||
|
assert node.current_vel.angular.z <= 0.5
|
||||||
|
|
||||||
|
def test_scenario_obstacle_avoidance(self, node):
|
||||||
|
"""Scenario: Quick lateral movement due to obstacle."""
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.5
|
||||||
|
target.linear.y = 0.3 # Quick dodge sideways
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
|
||||||
|
# Y velocity should increase gradually, not jump to 0.3
|
||||||
|
accelerations = []
|
||||||
|
for _ in range(10):
|
||||||
|
node._timer_callback()
|
||||||
|
accelerations.append(node.current_vel.linear.y)
|
||||||
|
|
||||||
|
# Each step should be limited
|
||||||
|
for i in range(1, len(accelerations)):
|
||||||
|
delta = accelerations[i] - accelerations[i - 1]
|
||||||
|
max_delta = 0.5 * 0.02 # linear_accel_limit * period
|
||||||
|
assert abs(delta) <= max_delta + 1e-6 # Small tolerance
|
||||||
|
|
||||||
|
def test_scenario_continuous_motion(self, node):
|
||||||
|
"""Scenario: Continuous velocity updates (tracking)."""
|
||||||
|
for i in range(100):
|
||||||
|
# Gradually increasing target
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = i * 0.001
|
||||||
|
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Current velocity should lag behind target
|
||||||
|
assert node.current_vel.linear.x < 0.1
|
||||||
|
|
||||||
|
def test_scenario_oscillating_command(self, node):
|
||||||
|
"""Scenario: Target velocity oscillates."""
|
||||||
|
oscillations = []
|
||||||
|
|
||||||
|
for i in range(200):
|
||||||
|
# Sinusoidal target
|
||||||
|
target = Twist()
|
||||||
|
target.linear.x = 0.5 * (1 + math.sin(i * 0.05))
|
||||||
|
|
||||||
|
node._on_cmd_vel(target)
|
||||||
|
node._timer_callback()
|
||||||
|
oscillations.append(node.current_vel.linear.x)
|
||||||
|
|
||||||
|
# Velocity should lag behind target but not diverge
|
||||||
|
assert all(0.0 <= v <= 1.0 for v in oscillations)
|
||||||
Loading…
x
Reference in New Issue
Block a user