Compare commits
No commits in common. "3c276b0bcec7444f72eaad62f0238b2017b0c99b" and "81786f5e4e65be8c0fd26579b997bfb529beb9d0" have entirely different histories.
3c276b0bce
...
81786f5e4e
@ -1,14 +0,0 @@
|
|||||||
# Acceleration limiter configuration
|
|
||||||
|
|
||||||
accel_limiter:
|
|
||||||
ros__parameters:
|
|
||||||
# Update frequency (Hz)
|
|
||||||
frequency: 50 # 50 Hz = 20ms cycle
|
|
||||||
|
|
||||||
# Linear acceleration limit (m/s²)
|
|
||||||
# Prevents jerky forward/backward and lateral motion
|
|
||||||
linear_accel_limit: 0.5
|
|
||||||
|
|
||||||
# Angular acceleration limit (rad/s²)
|
|
||||||
# Prevents jerky rotation
|
|
||||||
angular_accel_limit: 1.0
|
|
||||||
@ -1,36 +0,0 @@
|
|||||||
"""Launch file for accel_limiter_node."""
|
|
||||||
|
|
||||||
from launch import LaunchDescription
|
|
||||||
from launch_ros.actions import Node
|
|
||||||
from launch.substitutions import LaunchConfiguration
|
|
||||||
from launch.actions import DeclareLaunchArgument
|
|
||||||
import os
|
|
||||||
from ament_index_python.packages import get_package_share_directory
|
|
||||||
|
|
||||||
|
|
||||||
def generate_launch_description():
|
|
||||||
"""Generate launch description for acceleration limiter node."""
|
|
||||||
# Package directory
|
|
||||||
pkg_dir = get_package_share_directory("saltybot_accel_limiter")
|
|
||||||
|
|
||||||
# Parameters
|
|
||||||
config_file = os.path.join(pkg_dir, "config", "accel_limiter_config.yaml")
|
|
||||||
|
|
||||||
# Declare launch arguments
|
|
||||||
return LaunchDescription(
|
|
||||||
[
|
|
||||||
DeclareLaunchArgument(
|
|
||||||
"config_file",
|
|
||||||
default_value=config_file,
|
|
||||||
description="Path to configuration YAML file",
|
|
||||||
),
|
|
||||||
# Acceleration limiter node
|
|
||||||
Node(
|
|
||||||
package="saltybot_accel_limiter",
|
|
||||||
executable="accel_limiter_node",
|
|
||||||
name="accel_limiter",
|
|
||||||
output="screen",
|
|
||||||
parameters=[LaunchConfiguration("config_file")],
|
|
||||||
),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
@ -1,21 +0,0 @@
|
|||||||
<?xml version="1.0"?>
|
|
||||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
|
||||||
<package format="3">
|
|
||||||
<name>saltybot_accel_limiter</name>
|
|
||||||
<version>0.1.0</version>
|
|
||||||
<description>Smooth acceleration and deceleration limiter for velocity commands.</description>
|
|
||||||
<maintainer email="seb@vayrette.com">Seb</maintainer>
|
|
||||||
<license>Apache-2.0</license>
|
|
||||||
|
|
||||||
<buildtool_depend>ament_python</buildtool_depend>
|
|
||||||
|
|
||||||
<depend>rclpy</depend>
|
|
||||||
<depend>geometry_msgs</depend>
|
|
||||||
<depend>std_msgs</depend>
|
|
||||||
|
|
||||||
<test_depend>pytest</test_depend>
|
|
||||||
|
|
||||||
<export>
|
|
||||||
<build_type>ament_python</build_type>
|
|
||||||
</export>
|
|
||||||
</package>
|
|
||||||
@ -1,156 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""Smooth acceleration limiter for velocity commands.
|
|
||||||
|
|
||||||
Applies configurable acceleration and deceleration rate limits to prevent jerky motion.
|
|
||||||
|
|
||||||
Subscribed topics:
|
|
||||||
/cmd_vel (geometry_msgs/Twist) - Target velocity command
|
|
||||||
|
|
||||||
Published topics:
|
|
||||||
/cmd_vel_smooth (geometry_msgs/Twist) - Rate-limited velocity command
|
|
||||||
|
|
||||||
Prevents sudden velocity changes by limiting rate of change to:
|
|
||||||
- linear: 0.5 m/s² (configurable)
|
|
||||||
- angular: 1.0 rad/s² (configurable)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy.timer import Timer
|
|
||||||
from geometry_msgs.msg import Twist
|
|
||||||
|
|
||||||
|
|
||||||
class AccelLimiterNode(Node):
|
|
||||||
"""ROS2 node for smooth acceleration limiting."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__("accel_limiter")
|
|
||||||
|
|
||||||
# Parameters
|
|
||||||
self.declare_parameter("frequency", 50) # Hz
|
|
||||||
frequency = self.get_parameter("frequency").value
|
|
||||||
self.declare_parameter("linear_accel_limit", 0.5) # m/s²
|
|
||||||
self.declare_parameter("angular_accel_limit", 1.0) # rad/s²
|
|
||||||
|
|
||||||
self.linear_accel_limit = self.get_parameter("linear_accel_limit").value
|
|
||||||
self.angular_accel_limit = self.get_parameter("angular_accel_limit").value
|
|
||||||
|
|
||||||
self.period = 1.0 / frequency
|
|
||||||
|
|
||||||
# Latest target velocity from subscription
|
|
||||||
self.target_vel: Optional[Twist] = None
|
|
||||||
|
|
||||||
# Current smoothed velocity (published)
|
|
||||||
self.current_vel = Twist()
|
|
||||||
|
|
||||||
# Subscription to target velocity
|
|
||||||
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
|
|
||||||
|
|
||||||
# Publisher for smoothed velocity
|
|
||||||
self.pub_smooth = self.create_publisher(Twist, "/cmd_vel_smooth", 10)
|
|
||||||
|
|
||||||
# Timer for rate limiting at configured frequency
|
|
||||||
self.timer: Timer = self.create_timer(self.period, self._timer_callback)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"Acceleration limiter initialized at {frequency}Hz. "
|
|
||||||
f"Linear accel limit: {self.linear_accel_limit} m/s². "
|
|
||||||
f"Angular accel limit: {self.angular_accel_limit} rad/s²"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _on_cmd_vel(self, msg: Twist) -> None:
|
|
||||||
"""Update target velocity from cmd_vel subscription."""
|
|
||||||
self.target_vel = msg
|
|
||||||
|
|
||||||
def _timer_callback(self) -> None:
|
|
||||||
"""Apply rate limiting and publish smoothed velocity."""
|
|
||||||
if self.target_vel is None:
|
|
||||||
# No target yet, publish zero velocity
|
|
||||||
self.pub_smooth.publish(self.current_vel)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Create new velocity with rate-limited changes
|
|
||||||
limited_vel = Twist()
|
|
||||||
|
|
||||||
# Rate limit linear components
|
|
||||||
limited_vel.linear.x = self._limit_component(
|
|
||||||
self.current_vel.linear.x,
|
|
||||||
self.target_vel.linear.x,
|
|
||||||
self.linear_accel_limit,
|
|
||||||
)
|
|
||||||
limited_vel.linear.y = self._limit_component(
|
|
||||||
self.current_vel.linear.y,
|
|
||||||
self.target_vel.linear.y,
|
|
||||||
self.linear_accel_limit,
|
|
||||||
)
|
|
||||||
limited_vel.linear.z = self._limit_component(
|
|
||||||
self.current_vel.linear.z,
|
|
||||||
self.target_vel.linear.z,
|
|
||||||
self.linear_accel_limit,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Rate limit angular components
|
|
||||||
limited_vel.angular.x = self._limit_component(
|
|
||||||
self.current_vel.angular.x,
|
|
||||||
self.target_vel.angular.x,
|
|
||||||
self.angular_accel_limit,
|
|
||||||
)
|
|
||||||
limited_vel.angular.y = self._limit_component(
|
|
||||||
self.current_vel.angular.y,
|
|
||||||
self.target_vel.angular.y,
|
|
||||||
self.angular_accel_limit,
|
|
||||||
)
|
|
||||||
limited_vel.angular.z = self._limit_component(
|
|
||||||
self.current_vel.angular.z,
|
|
||||||
self.target_vel.angular.z,
|
|
||||||
self.angular_accel_limit,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update current velocity and publish
|
|
||||||
self.current_vel = limited_vel
|
|
||||||
self.pub_smooth.publish(self.current_vel)
|
|
||||||
|
|
||||||
def _limit_component(
|
|
||||||
self, current: float, target: float, accel_limit: float
|
|
||||||
) -> float:
|
|
||||||
"""Apply acceleration limit to a single velocity component.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
current: Current velocity value (m/s or rad/s)
|
|
||||||
target: Target velocity value (m/s or rad/s)
|
|
||||||
accel_limit: Maximum acceleration (m/s² or rad/s²)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Rate-limited velocity value
|
|
||||||
"""
|
|
||||||
# Compute desired change
|
|
||||||
change = target - current
|
|
||||||
|
|
||||||
# Maximum change allowed in this time period: accel_limit * dt
|
|
||||||
max_change = accel_limit * self.period
|
|
||||||
|
|
||||||
# Clamp change to limits
|
|
||||||
if change > max_change:
|
|
||||||
return current + max_change
|
|
||||||
elif change < -max_change:
|
|
||||||
return current - max_change
|
|
||||||
else:
|
|
||||||
return target
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = AccelLimiterNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@ -1,4 +0,0 @@
|
|||||||
[develop]
|
|
||||||
script-dir=$base/lib/saltybot_accel_limiter
|
|
||||||
[install]
|
|
||||||
install-scripts=$base/lib/saltybot_accel_limiter
|
|
||||||
@ -1,27 +0,0 @@
|
|||||||
from setuptools import find_packages, setup
|
|
||||||
|
|
||||||
package_name = "saltybot_accel_limiter"
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name=package_name,
|
|
||||||
version="0.1.0",
|
|
||||||
packages=find_packages(exclude=["test"]),
|
|
||||||
data_files=[
|
|
||||||
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
|
|
||||||
("share/" + package_name, ["package.xml"]),
|
|
||||||
("share/" + package_name + "/launch", ["launch/accel_limiter.launch.py"]),
|
|
||||||
("share/" + package_name + "/config", ["config/accel_limiter_config.yaml"]),
|
|
||||||
],
|
|
||||||
install_requires=["setuptools"],
|
|
||||||
zip_safe=True,
|
|
||||||
maintainer="Seb",
|
|
||||||
maintainer_email="seb@vayrette.com",
|
|
||||||
description="Smooth acceleration and deceleration rate limiting for velocity commands",
|
|
||||||
license="Apache-2.0",
|
|
||||||
tests_require=["pytest"],
|
|
||||||
entry_points={
|
|
||||||
"console_scripts": [
|
|
||||||
"accel_limiter_node = saltybot_accel_limiter.accel_limiter_node:main",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
@ -1,442 +0,0 @@
|
|||||||
"""Unit tests for accel_limiter_node."""
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import math
|
|
||||||
from geometry_msgs.msg import Twist, Vector3
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
|
|
||||||
# Import the node under test
|
|
||||||
from saltybot_accel_limiter.accel_limiter_node import AccelLimiterNode
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def rclpy_fixture():
|
|
||||||
"""Initialize and cleanup rclpy."""
|
|
||||||
rclpy.init()
|
|
||||||
yield
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def node(rclpy_fixture):
|
|
||||||
"""Create an acceleration limiter node instance."""
|
|
||||||
node = AccelLimiterNode()
|
|
||||||
yield node
|
|
||||||
node.destroy_node()
|
|
||||||
|
|
||||||
|
|
||||||
class TestNodeInitialization:
|
|
||||||
"""Test suite for node initialization."""
|
|
||||||
|
|
||||||
def test_node_initialization(self, node):
|
|
||||||
"""Test that node initializes with correct defaults."""
|
|
||||||
assert node.linear_accel_limit == 0.5
|
|
||||||
assert node.angular_accel_limit == 1.0
|
|
||||||
assert node.target_vel is None
|
|
||||||
assert node.current_vel.linear.x == 0.0
|
|
||||||
assert node.current_vel.linear.y == 0.0
|
|
||||||
assert node.current_vel.linear.z == 0.0
|
|
||||||
assert node.current_vel.angular.x == 0.0
|
|
||||||
assert node.current_vel.angular.y == 0.0
|
|
||||||
assert node.current_vel.angular.z == 0.0
|
|
||||||
|
|
||||||
def test_frequency_parameter(self, node):
|
|
||||||
"""Test frequency parameter is set correctly."""
|
|
||||||
frequency = node.get_parameter("frequency").value
|
|
||||||
assert frequency == 50
|
|
||||||
|
|
||||||
def test_linear_accel_parameter(self, node):
|
|
||||||
"""Test linear acceleration limit parameter."""
|
|
||||||
limit = node.get_parameter("linear_accel_limit").value
|
|
||||||
assert limit == 0.5
|
|
||||||
|
|
||||||
def test_angular_accel_parameter(self, node):
|
|
||||||
"""Test angular acceleration limit parameter."""
|
|
||||||
limit = node.get_parameter("angular_accel_limit").value
|
|
||||||
assert limit == 1.0
|
|
||||||
|
|
||||||
def test_period_calculation(self, node):
|
|
||||||
"""Test that time period is correctly calculated from frequency."""
|
|
||||||
# 50 Hz = 20ms = 0.02s
|
|
||||||
assert node.period == pytest.approx(0.02)
|
|
||||||
|
|
||||||
|
|
||||||
class TestLimitComponent:
|
|
||||||
"""Test suite for _limit_component method."""
|
|
||||||
|
|
||||||
def test_no_change_needed(self, node):
|
|
||||||
"""Test when target equals current (no acceleration needed)."""
|
|
||||||
result = node._limit_component(1.0, 1.0, 0.5)
|
|
||||||
assert result == pytest.approx(1.0)
|
|
||||||
|
|
||||||
def test_zero_acceleration(self, node):
|
|
||||||
"""Test when current is zero and target is zero."""
|
|
||||||
result = node._limit_component(0.0, 0.0, 0.5)
|
|
||||||
assert result == pytest.approx(0.0)
|
|
||||||
|
|
||||||
def test_small_change_within_limit(self, node):
|
|
||||||
"""Test small change that is within acceleration limit."""
|
|
||||||
# Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s
|
|
||||||
# Desired change: 0.005 m/s (less than limit)
|
|
||||||
result = node._limit_component(0.0, 0.005, 0.5)
|
|
||||||
assert result == pytest.approx(0.005)
|
|
||||||
|
|
||||||
def test_acceleration_exceeds_limit(self, node):
|
|
||||||
"""Test acceleration that exceeds the limit."""
|
|
||||||
# Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s
|
|
||||||
# Desired change: 0.1 m/s (much greater than limit)
|
|
||||||
result = node._limit_component(0.0, 0.1, 0.5)
|
|
||||||
assert result == pytest.approx(0.01)
|
|
||||||
|
|
||||||
def test_deceleration_exceeds_limit(self, node):
|
|
||||||
"""Test deceleration that exceeds the limit."""
|
|
||||||
# Max change in 20ms at 0.5 m/s²: -0.5 * 0.02 = -0.01 m/s
|
|
||||||
# Desired change: -0.1 m/s (much more negative than limit)
|
|
||||||
result = node._limit_component(0.1, 0.0, 0.5)
|
|
||||||
assert result == pytest.approx(0.09)
|
|
||||||
|
|
||||||
def test_negative_to_positive_acceleration(self, node):
|
|
||||||
"""Test acceleration from negative to positive velocity."""
|
|
||||||
result = node._limit_component(-0.1, 0.1, 0.5)
|
|
||||||
# Max change: 0.01, so -0.1 + 0.01 = -0.09
|
|
||||||
assert result == pytest.approx(-0.09)
|
|
||||||
|
|
||||||
def test_positive_to_negative_deceleration(self, node):
|
|
||||||
"""Test deceleration from positive to negative velocity."""
|
|
||||||
result = node._limit_component(0.1, -0.1, 0.5)
|
|
||||||
# Max change: -0.01, so 0.1 - 0.01 = 0.09
|
|
||||||
assert result == pytest.approx(0.09)
|
|
||||||
|
|
||||||
def test_angular_acceleration_limit(self, node):
|
|
||||||
"""Test with angular acceleration limit."""
|
|
||||||
# Max change in 20ms at 1.0 rad/s²: 1.0 * 0.02 = 0.02 rad/s
|
|
||||||
# Desired change: 0.1 rad/s (exceeds limit)
|
|
||||||
result = node._limit_component(0.0, 0.1, 1.0)
|
|
||||||
assert result == pytest.approx(0.02)
|
|
||||||
|
|
||||||
def test_large_acceleration_value(self, node):
|
|
||||||
"""Test with large acceleration limit."""
|
|
||||||
# Max change in 20ms at 10.0 m/s²: 10.0 * 0.02 = 0.2 m/s
|
|
||||||
# Desired change: 0.1 m/s (within limit)
|
|
||||||
result = node._limit_component(0.0, 0.1, 10.0)
|
|
||||||
assert result == pytest.approx(0.1)
|
|
||||||
|
|
||||||
|
|
||||||
class TestSubscriptionHandling:
|
|
||||||
"""Test suite for cmd_vel subscription."""
|
|
||||||
|
|
||||||
def test_cmd_vel_subscription(self, node):
|
|
||||||
"""Test that cmd_vel subscription updates target velocity."""
|
|
||||||
cmd = Twist()
|
|
||||||
cmd.linear.x = 1.0
|
|
||||||
cmd.linear.y = 0.5
|
|
||||||
cmd.angular.z = 0.2
|
|
||||||
|
|
||||||
node._on_cmd_vel(cmd)
|
|
||||||
|
|
||||||
assert node.target_vel is cmd
|
|
||||||
assert node.target_vel.linear.x == 1.0
|
|
||||||
assert node.target_vel.linear.y == 0.5
|
|
||||||
assert node.target_vel.angular.z == 0.2
|
|
||||||
|
|
||||||
def test_multiple_cmd_vel_updates(self, node):
|
|
||||||
"""Test that subscription updates replace previous command."""
|
|
||||||
cmd1 = Twist()
|
|
||||||
cmd1.linear.x = 1.0
|
|
||||||
|
|
||||||
cmd2 = Twist()
|
|
||||||
cmd2.linear.x = 2.0
|
|
||||||
|
|
||||||
node._on_cmd_vel(cmd1)
|
|
||||||
assert node.target_vel.linear.x == 1.0
|
|
||||||
|
|
||||||
node._on_cmd_vel(cmd2)
|
|
||||||
assert node.target_vel.linear.x == 2.0
|
|
||||||
|
|
||||||
|
|
||||||
class TestTimerCallback:
|
|
||||||
"""Test suite for timer callback and rate limiting."""
|
|
||||||
|
|
||||||
def test_timer_callback_no_target(self, node):
|
|
||||||
"""Test timer callback when no target velocity received yet."""
|
|
||||||
# Should publish current velocity (zero)
|
|
||||||
node._timer_callback()
|
|
||||||
assert node.current_vel.linear.x == 0.0
|
|
||||||
assert node.current_vel.angular.z == 0.0
|
|
||||||
|
|
||||||
def test_timer_callback_linear_acceleration(self, node):
|
|
||||||
"""Test linear acceleration limiting."""
|
|
||||||
# Target: 1.0 m/s
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 1.0
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
# First callback: current should move towards target, limited
|
|
||||||
node._timer_callback()
|
|
||||||
# Max acceleration: 0.5 m/s², period: 0.02s = 0.01 m/s change
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.01)
|
|
||||||
|
|
||||||
# Second callback: should continue accelerating
|
|
||||||
node._timer_callback()
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.02)
|
|
||||||
|
|
||||||
def test_timer_callback_linear_deceleration(self, node):
|
|
||||||
"""Test linear deceleration limiting."""
|
|
||||||
# Start with velocity
|
|
||||||
node.current_vel.linear.x = 1.0
|
|
||||||
|
|
||||||
# Target: 0 m/s (deceleration)
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.0
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
# Callback should decelerate limited by rate limit
|
|
||||||
node._timer_callback()
|
|
||||||
# Max deceleration: -0.01 m/s, so 1.0 - 0.01 = 0.99
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.99)
|
|
||||||
|
|
||||||
def test_timer_callback_angular_acceleration(self, node):
|
|
||||||
"""Test angular acceleration limiting."""
|
|
||||||
target = Twist()
|
|
||||||
target.angular.z = 1.0
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
node._timer_callback()
|
|
||||||
# Max angular acceleration: 1.0 rad/s², period: 0.02s = 0.02 rad/s
|
|
||||||
assert node.current_vel.angular.z == pytest.approx(0.02)
|
|
||||||
|
|
||||||
def test_timer_callback_all_components(self, node):
|
|
||||||
"""Test rate limiting across all velocity components."""
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 1.0
|
|
||||||
target.linear.y = 0.5
|
|
||||||
target.linear.z = 0.2
|
|
||||||
target.angular.x = 0.1
|
|
||||||
target.angular.y = 0.2
|
|
||||||
target.angular.z = 1.0
|
|
||||||
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
node._timer_callback()
|
|
||||||
|
|
||||||
# All linear components should be limited by linear_accel_limit
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.01)
|
|
||||||
assert node.current_vel.linear.y == pytest.approx(0.005)
|
|
||||||
assert node.current_vel.linear.z == pytest.approx(0.002)
|
|
||||||
|
|
||||||
# All angular components should be limited by angular_accel_limit
|
|
||||||
assert node.current_vel.angular.x == pytest.approx(0.002)
|
|
||||||
assert node.current_vel.angular.y == pytest.approx(0.004)
|
|
||||||
assert node.current_vel.angular.z == pytest.approx(0.02)
|
|
||||||
|
|
||||||
def test_timer_reaches_target(self, node):
|
|
||||||
"""Test that velocity eventually reaches target after multiple callbacks."""
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.1
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
# Max change per callback: 0.01
|
|
||||||
# Target: 0.1, so needs 10 callbacks to reach it
|
|
||||||
for _ in range(5):
|
|
||||||
node._timer_callback()
|
|
||||||
|
|
||||||
# After 5 callbacks: 0.05
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.05)
|
|
||||||
|
|
||||||
# Continue for 5 more callbacks
|
|
||||||
for _ in range(5):
|
|
||||||
node._timer_callback()
|
|
||||||
|
|
||||||
# Should be at target now or very close
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.1)
|
|
||||||
|
|
||||||
def test_timer_stops_at_target(self, node):
|
|
||||||
"""Test that velocity stops changing once target is reached."""
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.01
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
node._timer_callback()
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.01)
|
|
||||||
|
|
||||||
# Next callback should stay at target
|
|
||||||
node._timer_callback()
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.01)
|
|
||||||
|
|
||||||
|
|
||||||
class TestRateLimit:
|
|
||||||
"""Test suite for rate limiting behavior."""
|
|
||||||
|
|
||||||
def test_linear_vs_angular_different_limits(self, node):
|
|
||||||
"""Test that linear and angular limits are independent."""
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 1.0
|
|
||||||
target.angular.z = 1.0
|
|
||||||
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
node._timer_callback()
|
|
||||||
|
|
||||||
# Linear limited by 0.5 m/s²: 0.5 * 0.02 = 0.01
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.01)
|
|
||||||
|
|
||||||
# Angular limited by 1.0 rad/s²: 1.0 * 0.02 = 0.02
|
|
||||||
assert node.current_vel.angular.z == pytest.approx(0.02)
|
|
||||||
|
|
||||||
def test_emergency_stop(self, node):
|
|
||||||
"""Test emergency deceleration from high speed."""
|
|
||||||
# Start at high speed
|
|
||||||
node.current_vel.linear.x = 2.0
|
|
||||||
|
|
||||||
# Sudden stop command
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.0
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
# Should decelerate gradually, not instantly
|
|
||||||
node._timer_callback()
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(1.99)
|
|
||||||
assert node.current_vel.linear.x > 0.0 # Not zero yet
|
|
||||||
|
|
||||||
def test_alternating_direction(self, node):
|
|
||||||
"""Test rapid direction changes are smoothed."""
|
|
||||||
# Accelerate forward
|
|
||||||
target1 = Twist()
|
|
||||||
target1.linear.x = 0.5
|
|
||||||
node._on_cmd_vel(target1)
|
|
||||||
|
|
||||||
for _ in range(10):
|
|
||||||
node._timer_callback()
|
|
||||||
|
|
||||||
vel_forward = node.current_vel.linear.x
|
|
||||||
|
|
||||||
# Suddenly reverse
|
|
||||||
target2 = Twist()
|
|
||||||
target2.linear.x = -0.5
|
|
||||||
node._on_cmd_vel(target2)
|
|
||||||
|
|
||||||
node._timer_callback()
|
|
||||||
vel_after_reverse = node.current_vel.linear.x
|
|
||||||
|
|
||||||
# Should still be positive but decreasing
|
|
||||||
assert vel_after_reverse < vel_forward
|
|
||||||
assert vel_after_reverse > -0.5
|
|
||||||
|
|
||||||
|
|
||||||
class TestScenarios:
|
|
||||||
"""Integration-style tests for realistic scenarios."""
|
|
||||||
|
|
||||||
def test_scenario_gradual_acceleration(self, node):
|
|
||||||
"""Scenario: Smooth acceleration from stop to cruising speed."""
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.5 # Cruise at 0.5 m/s
|
|
||||||
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
velocities = []
|
|
||||||
for _ in range(100):
|
|
||||||
node._timer_callback()
|
|
||||||
velocities.append(node.current_vel.linear.x)
|
|
||||||
|
|
||||||
# Velocities should be monotonically increasing
|
|
||||||
for i in range(1, len(velocities)):
|
|
||||||
assert velocities[i] >= velocities[i - 1]
|
|
||||||
|
|
||||||
# Should reach target
|
|
||||||
assert velocities[-1] == pytest.approx(0.5)
|
|
||||||
|
|
||||||
def test_scenario_smooth_stop(self, node):
|
|
||||||
"""Scenario: Smooth deceleration to stop."""
|
|
||||||
# Start at cruising speed
|
|
||||||
node.current_vel.linear.x = 0.5
|
|
||||||
|
|
||||||
# Command to stop
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.0
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
velocities = []
|
|
||||||
for _ in range(100):
|
|
||||||
node._timer_callback()
|
|
||||||
velocities.append(node.current_vel.linear.x)
|
|
||||||
|
|
||||||
# Velocities should be monotonically decreasing
|
|
||||||
for i in range(1, len(velocities)):
|
|
||||||
assert velocities[i] <= velocities[i - 1]
|
|
||||||
|
|
||||||
# Should reach zero
|
|
||||||
assert velocities[-1] == pytest.approx(0.0)
|
|
||||||
|
|
||||||
def test_scenario_turn_while_moving(self, node):
|
|
||||||
"""Scenario: Robot turns while moving forward."""
|
|
||||||
node.current_vel.linear.x = 0.5
|
|
||||||
node.current_vel.linear.y = 0.0
|
|
||||||
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.5
|
|
||||||
target.linear.y = 0.2
|
|
||||||
target.angular.z = 0.5
|
|
||||||
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
# Multiple iterations to see smooth change
|
|
||||||
for _ in range(50):
|
|
||||||
node._timer_callback()
|
|
||||||
|
|
||||||
# Forward velocity unchanged (same target)
|
|
||||||
assert node.current_vel.linear.x == pytest.approx(0.5)
|
|
||||||
|
|
||||||
# Lateral velocity increased toward target
|
|
||||||
assert node.current_vel.linear.y > 0.0
|
|
||||||
assert node.current_vel.linear.y <= 0.2
|
|
||||||
|
|
||||||
# Angular velocity increased
|
|
||||||
assert node.current_vel.angular.z > 0.0
|
|
||||||
assert node.current_vel.angular.z <= 0.5
|
|
||||||
|
|
||||||
def test_scenario_obstacle_avoidance(self, node):
|
|
||||||
"""Scenario: Quick lateral movement due to obstacle."""
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.5
|
|
||||||
target.linear.y = 0.3 # Quick dodge sideways
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
|
|
||||||
# Y velocity should increase gradually, not jump to 0.3
|
|
||||||
accelerations = []
|
|
||||||
for _ in range(10):
|
|
||||||
node._timer_callback()
|
|
||||||
accelerations.append(node.current_vel.linear.y)
|
|
||||||
|
|
||||||
# Each step should be limited
|
|
||||||
for i in range(1, len(accelerations)):
|
|
||||||
delta = accelerations[i] - accelerations[i - 1]
|
|
||||||
max_delta = 0.5 * 0.02 # linear_accel_limit * period
|
|
||||||
assert abs(delta) <= max_delta + 1e-6 # Small tolerance
|
|
||||||
|
|
||||||
def test_scenario_continuous_motion(self, node):
|
|
||||||
"""Scenario: Continuous velocity updates (tracking)."""
|
|
||||||
for i in range(100):
|
|
||||||
# Gradually increasing target
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = i * 0.001
|
|
||||||
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
node._timer_callback()
|
|
||||||
|
|
||||||
# Current velocity should lag behind target
|
|
||||||
assert node.current_vel.linear.x < 0.1
|
|
||||||
|
|
||||||
def test_scenario_oscillating_command(self, node):
|
|
||||||
"""Scenario: Target velocity oscillates."""
|
|
||||||
oscillations = []
|
|
||||||
|
|
||||||
for i in range(200):
|
|
||||||
# Sinusoidal target
|
|
||||||
target = Twist()
|
|
||||||
target.linear.x = 0.5 * (1 + math.sin(i * 0.05))
|
|
||||||
|
|
||||||
node._on_cmd_vel(target)
|
|
||||||
node._timer_callback()
|
|
||||||
oscillations.append(node.current_vel.linear.x)
|
|
||||||
|
|
||||||
# Velocity should lag behind target but not diverge
|
|
||||||
assert all(0.0 <= v <= 1.0 for v in oscillations)
|
|
||||||
Loading…
x
Reference in New Issue
Block a user