diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/config/accel_limiter_config.yaml b/jetson/ros2_ws/src/saltybot_accel_limiter/config/accel_limiter_config.yaml new file mode 100644 index 0000000..310ee2c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_accel_limiter/config/accel_limiter_config.yaml @@ -0,0 +1,14 @@ +# Acceleration limiter configuration + +accel_limiter: + ros__parameters: + # Update frequency (Hz) + frequency: 50 # 50 Hz = 20ms cycle + + # Linear acceleration limit (m/s²) + # Prevents jerky forward/backward and lateral motion + linear_accel_limit: 0.5 + + # Angular acceleration limit (rad/s²) + # Prevents jerky rotation + angular_accel_limit: 1.0 diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/launch/accel_limiter.launch.py b/jetson/ros2_ws/src/saltybot_accel_limiter/launch/accel_limiter.launch.py new file mode 100644 index 0000000..74fb901 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_accel_limiter/launch/accel_limiter.launch.py @@ -0,0 +1,36 @@ +"""Launch file for accel_limiter_node.""" + +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration +from launch.actions import DeclareLaunchArgument +import os +from ament_index_python.packages import get_package_share_directory + + +def generate_launch_description(): + """Generate launch description for acceleration limiter node.""" + # Package directory + pkg_dir = get_package_share_directory("saltybot_accel_limiter") + + # Parameters + config_file = os.path.join(pkg_dir, "config", "accel_limiter_config.yaml") + + # Declare launch arguments + return LaunchDescription( + [ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to configuration YAML file", + ), + # Acceleration limiter node + Node( + package="saltybot_accel_limiter", + executable="accel_limiter_node", + name="accel_limiter", + output="screen", + parameters=[LaunchConfiguration("config_file")], + ), + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/package.xml b/jetson/ros2_ws/src/saltybot_accel_limiter/package.xml new file mode 100644 index 0000000..67213fd --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_accel_limiter/package.xml @@ -0,0 +1,21 @@ + + + + saltybot_accel_limiter + 0.1.0 + Smooth acceleration and deceleration limiter for velocity commands. + Seb + Apache-2.0 + + ament_python + + rclpy + geometry_msgs + std_msgs + + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/resource/saltybot_accel_limiter b/jetson/ros2_ws/src/saltybot_accel_limiter/resource/saltybot_accel_limiter new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/saltybot_accel_limiter/__init__.py b/jetson/ros2_ws/src/saltybot_accel_limiter/saltybot_accel_limiter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/saltybot_accel_limiter/accel_limiter_node.py b/jetson/ros2_ws/src/saltybot_accel_limiter/saltybot_accel_limiter/accel_limiter_node.py new file mode 100644 index 0000000..501b14b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_accel_limiter/saltybot_accel_limiter/accel_limiter_node.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +"""Smooth acceleration limiter for velocity commands. + +Applies configurable acceleration and deceleration rate limits to prevent jerky motion. + +Subscribed topics: + /cmd_vel (geometry_msgs/Twist) - Target velocity command + +Published topics: + /cmd_vel_smooth (geometry_msgs/Twist) - Rate-limited velocity command + +Prevents sudden velocity changes by limiting rate of change to: + - linear: 0.5 m/s² (configurable) + - angular: 1.0 rad/s² (configurable) +""" + +from typing import Optional + +import rclpy +from rclpy.node import Node +from rclpy.timer import Timer +from geometry_msgs.msg import Twist + + +class AccelLimiterNode(Node): + """ROS2 node for smooth acceleration limiting.""" + + def __init__(self): + super().__init__("accel_limiter") + + # Parameters + self.declare_parameter("frequency", 50) # Hz + frequency = self.get_parameter("frequency").value + self.declare_parameter("linear_accel_limit", 0.5) # m/s² + self.declare_parameter("angular_accel_limit", 1.0) # rad/s² + + self.linear_accel_limit = self.get_parameter("linear_accel_limit").value + self.angular_accel_limit = self.get_parameter("angular_accel_limit").value + + self.period = 1.0 / frequency + + # Latest target velocity from subscription + self.target_vel: Optional[Twist] = None + + # Current smoothed velocity (published) + self.current_vel = Twist() + + # Subscription to target velocity + self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10) + + # Publisher for smoothed velocity + self.pub_smooth = self.create_publisher(Twist, "/cmd_vel_smooth", 10) + + # Timer for rate limiting at configured frequency + self.timer: Timer = self.create_timer(self.period, self._timer_callback) + + self.get_logger().info( + f"Acceleration limiter initialized at {frequency}Hz. " + f"Linear accel limit: {self.linear_accel_limit} m/s². " + f"Angular accel limit: {self.angular_accel_limit} rad/s²" + ) + + def _on_cmd_vel(self, msg: Twist) -> None: + """Update target velocity from cmd_vel subscription.""" + self.target_vel = msg + + def _timer_callback(self) -> None: + """Apply rate limiting and publish smoothed velocity.""" + if self.target_vel is None: + # No target yet, publish zero velocity + self.pub_smooth.publish(self.current_vel) + return + + # Create new velocity with rate-limited changes + limited_vel = Twist() + + # Rate limit linear components + limited_vel.linear.x = self._limit_component( + self.current_vel.linear.x, + self.target_vel.linear.x, + self.linear_accel_limit, + ) + limited_vel.linear.y = self._limit_component( + self.current_vel.linear.y, + self.target_vel.linear.y, + self.linear_accel_limit, + ) + limited_vel.linear.z = self._limit_component( + self.current_vel.linear.z, + self.target_vel.linear.z, + self.linear_accel_limit, + ) + + # Rate limit angular components + limited_vel.angular.x = self._limit_component( + self.current_vel.angular.x, + self.target_vel.angular.x, + self.angular_accel_limit, + ) + limited_vel.angular.y = self._limit_component( + self.current_vel.angular.y, + self.target_vel.angular.y, + self.angular_accel_limit, + ) + limited_vel.angular.z = self._limit_component( + self.current_vel.angular.z, + self.target_vel.angular.z, + self.angular_accel_limit, + ) + + # Update current velocity and publish + self.current_vel = limited_vel + self.pub_smooth.publish(self.current_vel) + + def _limit_component( + self, current: float, target: float, accel_limit: float + ) -> float: + """Apply acceleration limit to a single velocity component. + + Args: + current: Current velocity value (m/s or rad/s) + target: Target velocity value (m/s or rad/s) + accel_limit: Maximum acceleration (m/s² or rad/s²) + + Returns: + Rate-limited velocity value + """ + # Compute desired change + change = target - current + + # Maximum change allowed in this time period: accel_limit * dt + max_change = accel_limit * self.period + + # Clamp change to limits + if change > max_change: + return current + max_change + elif change < -max_change: + return current - max_change + else: + return target + + +def main(args=None): + rclpy.init(args=args) + node = AccelLimiterNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/setup.cfg b/jetson/ros2_ws/src/saltybot_accel_limiter/setup.cfg new file mode 100644 index 0000000..905fb9b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_accel_limiter/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script-dir=$base/lib/saltybot_accel_limiter +[install] +install-scripts=$base/lib/saltybot_accel_limiter diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/setup.py b/jetson/ros2_ws/src/saltybot_accel_limiter/setup.py new file mode 100644 index 0000000..39f6576 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_accel_limiter/setup.py @@ -0,0 +1,27 @@ +from setuptools import find_packages, setup + +package_name = "saltybot_accel_limiter" + +setup( + name=package_name, + version="0.1.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", ["resource/" + package_name]), + ("share/" + package_name, ["package.xml"]), + ("share/" + package_name + "/launch", ["launch/accel_limiter.launch.py"]), + ("share/" + package_name + "/config", ["config/accel_limiter_config.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="Seb", + maintainer_email="seb@vayrette.com", + description="Smooth acceleration and deceleration rate limiting for velocity commands", + license="Apache-2.0", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "accel_limiter_node = saltybot_accel_limiter.accel_limiter_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/test/__init__.py b/jetson/ros2_ws/src/saltybot_accel_limiter/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_accel_limiter/test/test_accel_limiter.py b/jetson/ros2_ws/src/saltybot_accel_limiter/test/test_accel_limiter.py new file mode 100644 index 0000000..e20aeff --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_accel_limiter/test/test_accel_limiter.py @@ -0,0 +1,442 @@ +"""Unit tests for accel_limiter_node.""" + +import pytest +import math +from geometry_msgs.msg import Twist, Vector3 + +import rclpy + +# Import the node under test +from saltybot_accel_limiter.accel_limiter_node import AccelLimiterNode + + +@pytest.fixture +def rclpy_fixture(): + """Initialize and cleanup rclpy.""" + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def node(rclpy_fixture): + """Create an acceleration limiter node instance.""" + node = AccelLimiterNode() + yield node + node.destroy_node() + + +class TestNodeInitialization: + """Test suite for node initialization.""" + + def test_node_initialization(self, node): + """Test that node initializes with correct defaults.""" + assert node.linear_accel_limit == 0.5 + assert node.angular_accel_limit == 1.0 + assert node.target_vel is None + assert node.current_vel.linear.x == 0.0 + assert node.current_vel.linear.y == 0.0 + assert node.current_vel.linear.z == 0.0 + assert node.current_vel.angular.x == 0.0 + assert node.current_vel.angular.y == 0.0 + assert node.current_vel.angular.z == 0.0 + + def test_frequency_parameter(self, node): + """Test frequency parameter is set correctly.""" + frequency = node.get_parameter("frequency").value + assert frequency == 50 + + def test_linear_accel_parameter(self, node): + """Test linear acceleration limit parameter.""" + limit = node.get_parameter("linear_accel_limit").value + assert limit == 0.5 + + def test_angular_accel_parameter(self, node): + """Test angular acceleration limit parameter.""" + limit = node.get_parameter("angular_accel_limit").value + assert limit == 1.0 + + def test_period_calculation(self, node): + """Test that time period is correctly calculated from frequency.""" + # 50 Hz = 20ms = 0.02s + assert node.period == pytest.approx(0.02) + + +class TestLimitComponent: + """Test suite for _limit_component method.""" + + def test_no_change_needed(self, node): + """Test when target equals current (no acceleration needed).""" + result = node._limit_component(1.0, 1.0, 0.5) + assert result == pytest.approx(1.0) + + def test_zero_acceleration(self, node): + """Test when current is zero and target is zero.""" + result = node._limit_component(0.0, 0.0, 0.5) + assert result == pytest.approx(0.0) + + def test_small_change_within_limit(self, node): + """Test small change that is within acceleration limit.""" + # Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s + # Desired change: 0.005 m/s (less than limit) + result = node._limit_component(0.0, 0.005, 0.5) + assert result == pytest.approx(0.005) + + def test_acceleration_exceeds_limit(self, node): + """Test acceleration that exceeds the limit.""" + # Max change in 20ms at 0.5 m/s²: 0.5 * 0.02 = 0.01 m/s + # Desired change: 0.1 m/s (much greater than limit) + result = node._limit_component(0.0, 0.1, 0.5) + assert result == pytest.approx(0.01) + + def test_deceleration_exceeds_limit(self, node): + """Test deceleration that exceeds the limit.""" + # Max change in 20ms at 0.5 m/s²: -0.5 * 0.02 = -0.01 m/s + # Desired change: -0.1 m/s (much more negative than limit) + result = node._limit_component(0.1, 0.0, 0.5) + assert result == pytest.approx(0.09) + + def test_negative_to_positive_acceleration(self, node): + """Test acceleration from negative to positive velocity.""" + result = node._limit_component(-0.1, 0.1, 0.5) + # Max change: 0.01, so -0.1 + 0.01 = -0.09 + assert result == pytest.approx(-0.09) + + def test_positive_to_negative_deceleration(self, node): + """Test deceleration from positive to negative velocity.""" + result = node._limit_component(0.1, -0.1, 0.5) + # Max change: -0.01, so 0.1 - 0.01 = 0.09 + assert result == pytest.approx(0.09) + + def test_angular_acceleration_limit(self, node): + """Test with angular acceleration limit.""" + # Max change in 20ms at 1.0 rad/s²: 1.0 * 0.02 = 0.02 rad/s + # Desired change: 0.1 rad/s (exceeds limit) + result = node._limit_component(0.0, 0.1, 1.0) + assert result == pytest.approx(0.02) + + def test_large_acceleration_value(self, node): + """Test with large acceleration limit.""" + # Max change in 20ms at 10.0 m/s²: 10.0 * 0.02 = 0.2 m/s + # Desired change: 0.1 m/s (within limit) + result = node._limit_component(0.0, 0.1, 10.0) + assert result == pytest.approx(0.1) + + +class TestSubscriptionHandling: + """Test suite for cmd_vel subscription.""" + + def test_cmd_vel_subscription(self, node): + """Test that cmd_vel subscription updates target velocity.""" + cmd = Twist() + cmd.linear.x = 1.0 + cmd.linear.y = 0.5 + cmd.angular.z = 0.2 + + node._on_cmd_vel(cmd) + + assert node.target_vel is cmd + assert node.target_vel.linear.x == 1.0 + assert node.target_vel.linear.y == 0.5 + assert node.target_vel.angular.z == 0.2 + + def test_multiple_cmd_vel_updates(self, node): + """Test that subscription updates replace previous command.""" + cmd1 = Twist() + cmd1.linear.x = 1.0 + + cmd2 = Twist() + cmd2.linear.x = 2.0 + + node._on_cmd_vel(cmd1) + assert node.target_vel.linear.x == 1.0 + + node._on_cmd_vel(cmd2) + assert node.target_vel.linear.x == 2.0 + + +class TestTimerCallback: + """Test suite for timer callback and rate limiting.""" + + def test_timer_callback_no_target(self, node): + """Test timer callback when no target velocity received yet.""" + # Should publish current velocity (zero) + node._timer_callback() + assert node.current_vel.linear.x == 0.0 + assert node.current_vel.angular.z == 0.0 + + def test_timer_callback_linear_acceleration(self, node): + """Test linear acceleration limiting.""" + # Target: 1.0 m/s + target = Twist() + target.linear.x = 1.0 + node._on_cmd_vel(target) + + # First callback: current should move towards target, limited + node._timer_callback() + # Max acceleration: 0.5 m/s², period: 0.02s = 0.01 m/s change + assert node.current_vel.linear.x == pytest.approx(0.01) + + # Second callback: should continue accelerating + node._timer_callback() + assert node.current_vel.linear.x == pytest.approx(0.02) + + def test_timer_callback_linear_deceleration(self, node): + """Test linear deceleration limiting.""" + # Start with velocity + node.current_vel.linear.x = 1.0 + + # Target: 0 m/s (deceleration) + target = Twist() + target.linear.x = 0.0 + node._on_cmd_vel(target) + + # Callback should decelerate limited by rate limit + node._timer_callback() + # Max deceleration: -0.01 m/s, so 1.0 - 0.01 = 0.99 + assert node.current_vel.linear.x == pytest.approx(0.99) + + def test_timer_callback_angular_acceleration(self, node): + """Test angular acceleration limiting.""" + target = Twist() + target.angular.z = 1.0 + node._on_cmd_vel(target) + + node._timer_callback() + # Max angular acceleration: 1.0 rad/s², period: 0.02s = 0.02 rad/s + assert node.current_vel.angular.z == pytest.approx(0.02) + + def test_timer_callback_all_components(self, node): + """Test rate limiting across all velocity components.""" + target = Twist() + target.linear.x = 1.0 + target.linear.y = 0.5 + target.linear.z = 0.2 + target.angular.x = 0.1 + target.angular.y = 0.2 + target.angular.z = 1.0 + + node._on_cmd_vel(target) + node._timer_callback() + + # All linear components should be limited by linear_accel_limit + assert node.current_vel.linear.x == pytest.approx(0.01) + assert node.current_vel.linear.y == pytest.approx(0.005) + assert node.current_vel.linear.z == pytest.approx(0.002) + + # All angular components should be limited by angular_accel_limit + assert node.current_vel.angular.x == pytest.approx(0.002) + assert node.current_vel.angular.y == pytest.approx(0.004) + assert node.current_vel.angular.z == pytest.approx(0.02) + + def test_timer_reaches_target(self, node): + """Test that velocity eventually reaches target after multiple callbacks.""" + target = Twist() + target.linear.x = 0.1 + node._on_cmd_vel(target) + + # Max change per callback: 0.01 + # Target: 0.1, so needs 10 callbacks to reach it + for _ in range(5): + node._timer_callback() + + # After 5 callbacks: 0.05 + assert node.current_vel.linear.x == pytest.approx(0.05) + + # Continue for 5 more callbacks + for _ in range(5): + node._timer_callback() + + # Should be at target now or very close + assert node.current_vel.linear.x == pytest.approx(0.1) + + def test_timer_stops_at_target(self, node): + """Test that velocity stops changing once target is reached.""" + target = Twist() + target.linear.x = 0.01 + node._on_cmd_vel(target) + + node._timer_callback() + assert node.current_vel.linear.x == pytest.approx(0.01) + + # Next callback should stay at target + node._timer_callback() + assert node.current_vel.linear.x == pytest.approx(0.01) + + +class TestRateLimit: + """Test suite for rate limiting behavior.""" + + def test_linear_vs_angular_different_limits(self, node): + """Test that linear and angular limits are independent.""" + target = Twist() + target.linear.x = 1.0 + target.angular.z = 1.0 + + node._on_cmd_vel(target) + node._timer_callback() + + # Linear limited by 0.5 m/s²: 0.5 * 0.02 = 0.01 + assert node.current_vel.linear.x == pytest.approx(0.01) + + # Angular limited by 1.0 rad/s²: 1.0 * 0.02 = 0.02 + assert node.current_vel.angular.z == pytest.approx(0.02) + + def test_emergency_stop(self, node): + """Test emergency deceleration from high speed.""" + # Start at high speed + node.current_vel.linear.x = 2.0 + + # Sudden stop command + target = Twist() + target.linear.x = 0.0 + node._on_cmd_vel(target) + + # Should decelerate gradually, not instantly + node._timer_callback() + assert node.current_vel.linear.x == pytest.approx(1.99) + assert node.current_vel.linear.x > 0.0 # Not zero yet + + def test_alternating_direction(self, node): + """Test rapid direction changes are smoothed.""" + # Accelerate forward + target1 = Twist() + target1.linear.x = 0.5 + node._on_cmd_vel(target1) + + for _ in range(10): + node._timer_callback() + + vel_forward = node.current_vel.linear.x + + # Suddenly reverse + target2 = Twist() + target2.linear.x = -0.5 + node._on_cmd_vel(target2) + + node._timer_callback() + vel_after_reverse = node.current_vel.linear.x + + # Should still be positive but decreasing + assert vel_after_reverse < vel_forward + assert vel_after_reverse > -0.5 + + +class TestScenarios: + """Integration-style tests for realistic scenarios.""" + + def test_scenario_gradual_acceleration(self, node): + """Scenario: Smooth acceleration from stop to cruising speed.""" + target = Twist() + target.linear.x = 0.5 # Cruise at 0.5 m/s + + node._on_cmd_vel(target) + + velocities = [] + for _ in range(100): + node._timer_callback() + velocities.append(node.current_vel.linear.x) + + # Velocities should be monotonically increasing + for i in range(1, len(velocities)): + assert velocities[i] >= velocities[i - 1] + + # Should reach target + assert velocities[-1] == pytest.approx(0.5) + + def test_scenario_smooth_stop(self, node): + """Scenario: Smooth deceleration to stop.""" + # Start at cruising speed + node.current_vel.linear.x = 0.5 + + # Command to stop + target = Twist() + target.linear.x = 0.0 + node._on_cmd_vel(target) + + velocities = [] + for _ in range(100): + node._timer_callback() + velocities.append(node.current_vel.linear.x) + + # Velocities should be monotonically decreasing + for i in range(1, len(velocities)): + assert velocities[i] <= velocities[i - 1] + + # Should reach zero + assert velocities[-1] == pytest.approx(0.0) + + def test_scenario_turn_while_moving(self, node): + """Scenario: Robot turns while moving forward.""" + node.current_vel.linear.x = 0.5 + node.current_vel.linear.y = 0.0 + + target = Twist() + target.linear.x = 0.5 + target.linear.y = 0.2 + target.angular.z = 0.5 + + node._on_cmd_vel(target) + + # Multiple iterations to see smooth change + for _ in range(50): + node._timer_callback() + + # Forward velocity unchanged (same target) + assert node.current_vel.linear.x == pytest.approx(0.5) + + # Lateral velocity increased toward target + assert node.current_vel.linear.y > 0.0 + assert node.current_vel.linear.y <= 0.2 + + # Angular velocity increased + assert node.current_vel.angular.z > 0.0 + assert node.current_vel.angular.z <= 0.5 + + def test_scenario_obstacle_avoidance(self, node): + """Scenario: Quick lateral movement due to obstacle.""" + target = Twist() + target.linear.x = 0.5 + target.linear.y = 0.3 # Quick dodge sideways + node._on_cmd_vel(target) + + # Y velocity should increase gradually, not jump to 0.3 + accelerations = [] + for _ in range(10): + node._timer_callback() + accelerations.append(node.current_vel.linear.y) + + # Each step should be limited + for i in range(1, len(accelerations)): + delta = accelerations[i] - accelerations[i - 1] + max_delta = 0.5 * 0.02 # linear_accel_limit * period + assert abs(delta) <= max_delta + 1e-6 # Small tolerance + + def test_scenario_continuous_motion(self, node): + """Scenario: Continuous velocity updates (tracking).""" + for i in range(100): + # Gradually increasing target + target = Twist() + target.linear.x = i * 0.001 + + node._on_cmd_vel(target) + node._timer_callback() + + # Current velocity should lag behind target + assert node.current_vel.linear.x < 0.1 + + def test_scenario_oscillating_command(self, node): + """Scenario: Target velocity oscillates.""" + oscillations = [] + + for i in range(200): + # Sinusoidal target + target = Twist() + target.linear.x = 0.5 * (1 + math.sin(i * 0.05)) + + node._on_cmd_vel(target) + node._timer_callback() + oscillations.append(node.current_vel.linear.x) + + # Velocity should lag behind target but not diverge + assert all(0.0 <= v <= 1.0 for v in oscillations)