From 6592b58f658da92e773d4d73f60a2d6007e81324 Mon Sep 17 00:00:00 2001 From: sl-perception Date: Tue, 3 Mar 2026 15:45:05 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20Issue=20#350=20=E2=80=94=20smooth?= =?UTF-8?q?=20velocity=20ramp=20controller?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a rate-limiting shim between raw /cmd_vel and the drive stack to prevent wheel slip, tipping, and jerky motion from step velocity inputs. Core library — _velocity_ramp.py (pure Python, no ROS2 deps) - VelocityRamp: applies independent accel/decel limits to linear-x and angular-z with configurable max_lin_accel, max_lin_decel, max_ang_accel, max_ang_decel - _ramp_axis(): per-axis rate limiter with correct accel/decel selection (decel when |target| < |current| or sign reversal; accel otherwise) - Emergency stop: step(0.0, 0.0) bypasses ramp → immediate zero output - Asymmetric limits supported (e.g. faster decel than accel) ROS2 node — velocity_ramp_node.py - Subscribes /cmd_vel, publishes /cmd_vel_smooth at configurable rate_hz - Parameters: max_lin_accel (0.5 m/s²), max_lin_decel (0.5 m/s²), max_ang_accel (1.0 rad/s²), max_ang_decel (1.0 rad/s²), rate_hz (50) Tests — test/test_velocity_ramp.py: 50/50 passing - _ramp_axis: accel/decel selection, sign reversal, overshoot prevention - Construction: invalid params raise ValueError, defaults verified - Linear/angular ramp-up: step size, target reached, no overshoot - Deceleration: asymmetric limits, partial decel (non-zero target) - Emergency stop: immediate zero, state cleared, resume from zero - Sign reversal: passes through zero without jumping - Reset: state cleared, next ramp starts from zero - Monotonicity: linear and angular outputs are monotone toward target - Rate accuracy: 50Hz/10Hz step sizes, 100-step convergence verified Co-Authored-By: Claude Sonnet 4.6 --- .../saltybot_bringup/_velocity_ramp.py | 194 ++++++++ .../saltybot_bringup/velocity_ramp_node.py | 125 +++++ jetson/ros2_ws/src/saltybot_bringup/setup.py | 2 + .../test/test_velocity_ramp.py | 431 ++++++++++++++++++ 4 files changed, 752 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_velocity_ramp.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/velocity_ramp_node.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/test/test_velocity_ramp.py diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_velocity_ramp.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_velocity_ramp.py new file mode 100644 index 0000000..bf150f9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_velocity_ramp.py @@ -0,0 +1,194 @@ +""" +_velocity_ramp.py — Acceleration/deceleration limiter for cmd_vel (Issue #350). + +Pure-Python library (no ROS2 dependencies) for full unit-test coverage. + +Behaviour +--------- +The VelocityRamp class applies independent rate limits to the linear-x and +angular-z components of a 2D velocity command: + + - Linear acceleration limit : max_lin_accel (m/s²) + - Linear deceleration limit : max_lin_decel (m/s²) — may differ from accel + - Angular acceleration limit : max_ang_accel (rad/s²) + - Angular deceleration limit : max_ang_decel (rad/s²) + +"Deceleration" applies when the magnitude of the velocity is *decreasing* +(including moving toward zero from either sign direction), while "acceleration" +applies when the magnitude is increasing. + +Emergency stop +-------------- +When both linear and angular targets are exactly 0.0 the ramp is bypassed and +the output is forced to (0.0, 0.0) immediately. This allows a watchdog or +safety node to halt the robot instantly without waiting for the deceleration +ramp. + +Usage +----- + ramp = VelocityRamp(dt=0.02) # 50 Hz + out_lin, out_ang = ramp.step(1.0, 0.0) # target linear=1 m/s, angular=0 + out_lin, out_ang = ramp.step(1.0, 0.0) # ramp climbs toward 1.0 m/s + out_lin, out_ang = ramp.step(0.0, 0.0) # emergency stop → (0.0, 0.0) +""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class RampParams: + """Acceleration / deceleration limits for one velocity axis.""" + max_accel: float # magnitude / second — applied when |v| increasing + max_decel: float # magnitude / second — applied when |v| decreasing + + +def _ramp_axis( + current: float, + target: float, + params: RampParams, + dt: float, +) -> float: + """ + Advance *current* one timestep toward *target* with rate limiting. + + Parameters + ---------- + current : current velocity on this axis + target : desired velocity on this axis + params : accel / decel limits + dt : timestep in seconds + + Returns + ------- + New velocity, clamped so the change per step never exceeds the limits. + + Notes on accel vs decel selection + ---------------------------------- + We treat the motion as decelerating when the target is closer to zero + than the current value, i.e. |target| < |current| or they have opposite + signs. In all other cases we use the acceleration limit. + """ + delta = target - current + + # Choose limit: decel if velocity magnitude is falling, else accel. + is_decelerating = ( + abs(target) < abs(current) + or (current > 0 and target < 0) + or (current < 0 and target > 0) + ) + limit = params.max_decel if is_decelerating else params.max_accel + + max_change = limit * dt + if abs(delta) <= max_change: + return target + return current + max_change * (1.0 if delta > 0 else -1.0) + + +class VelocityRamp: + """ + Smooths a stream of (linear_x, angular_z) velocity commands by applying + configurable acceleration and deceleration limits. + + Parameters + ---------- + dt : timestep in seconds (must match the control loop rate) + max_lin_accel : maximum linear acceleration (m/s²) + max_lin_decel : maximum linear deceleration (m/s²); defaults to max_lin_accel + max_ang_accel : maximum angular acceleration (rad/s²) + max_ang_decel : maximum angular deceleration (rad/s²); defaults to max_ang_accel + """ + + def __init__( + self, + dt: float = 0.02, # 50 Hz + max_lin_accel: float = 0.5, # m/s² + max_lin_decel: float | None = None, + max_ang_accel: float = 1.0, # rad/s² + max_ang_decel: float | None = None, + ) -> None: + if dt <= 0: + raise ValueError(f'dt must be positive, got {dt}') + if max_lin_accel <= 0: + raise ValueError(f'max_lin_accel must be positive, got {max_lin_accel}') + if max_ang_accel <= 0: + raise ValueError(f'max_ang_accel must be positive, got {max_ang_accel}') + + self._dt = dt + self._lin = RampParams( + max_accel=max_lin_accel, + max_decel=max_lin_decel if max_lin_decel is not None else max_lin_accel, + ) + self._ang = RampParams( + max_accel=max_ang_accel, + max_decel=max_ang_decel if max_ang_decel is not None else max_ang_accel, + ) + + self._cur_lin: float = 0.0 + self._cur_ang: float = 0.0 + + # ── Public API ──────────────────────────────────────────────────────────── + + def step(self, target_lin: float, target_ang: float) -> tuple[float, float]: + """ + Advance the ramp one timestep. + + Parameters + ---------- + target_lin : desired linear velocity (m/s) + target_ang : desired angular velocity (rad/s) + + Returns + ------- + (smoothed_linear, smoothed_angular) tuple. + + If both target_lin and target_ang are exactly 0.0, an emergency stop + is applied and (0.0, 0.0) is returned immediately. + """ + # Emergency stop: bypass ramp entirely + if target_lin == 0.0 and target_ang == 0.0: + self._cur_lin = 0.0 + self._cur_ang = 0.0 + return 0.0, 0.0 + + self._cur_lin = _ramp_axis(self._cur_lin, target_lin, self._lin, self._dt) + self._cur_ang = _ramp_axis(self._cur_ang, target_ang, self._ang, self._dt) + return self._cur_lin, self._cur_ang + + def reset(self) -> None: + """Reset internal state to zero (e.g. on node restart or re-enable).""" + self._cur_lin = 0.0 + self._cur_ang = 0.0 + + @property + def current_linear(self) -> float: + """Current smoothed linear velocity (m/s).""" + return self._cur_lin + + @property + def current_angular(self) -> float: + """Current smoothed angular velocity (rad/s).""" + return self._cur_ang + + @property + def dt(self) -> float: + return self._dt + + # ── Derived ─────────────────────────────────────────────────────────────── + + def steps_to_reach(self, target_lin: float, target_ang: float) -> int: + """ + Estimate how many steps() are needed to reach the target from the + current state (useful for test assertions). + + This is an approximation based on the worst-case axis; it does not + account for the emergency-stop shortcut. + """ + lin_steps = 0 + ang_steps = 0 + if self._lin.max_accel > 0: + lin_steps = int(abs(target_lin - self._cur_lin) / (self._lin.max_accel * self._dt)) + 1 + if self._ang.max_accel > 0: + ang_steps = int(abs(target_ang - self._cur_ang) / (self._ang.max_accel * self._dt)) + 1 + return max(lin_steps, ang_steps) diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/velocity_ramp_node.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/velocity_ramp_node.py new file mode 100644 index 0000000..bd3bf89 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/velocity_ramp_node.py @@ -0,0 +1,125 @@ +""" +velocity_ramp_node.py — Smooth velocity ramp controller (Issue #350). + +Subscribes to raw /cmd_vel commands and republishes them on /cmd_vel_smooth +after applying independent acceleration and deceleration limits to the linear-x +and angular-z components. + +An emergency stop (both linear and angular targets exactly 0.0) bypasses the +ramp and forces the output to zero immediately. + +Subscribes +---------- + /cmd_vel geometry_msgs/Twist raw velocity commands + +Publishes +--------- + /cmd_vel_smooth geometry_msgs/Twist rate-limited velocity commands + +Parameters +---------- + max_lin_accel float 0.5 Maximum linear acceleration (m/s²) + max_lin_decel float 0.5 Maximum linear deceleration (m/s²) + max_ang_accel float 1.0 Maximum angular acceleration (rad/s²) + max_ang_decel float 1.0 Maximum angular deceleration (rad/s²) + rate_hz float 50.0 Control loop rate (Hz) +""" + +from __future__ import annotations + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from geometry_msgs.msg import Twist + +from ._velocity_ramp import VelocityRamp + + +_SUB_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, +) +_PUB_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, +) + + +class VelocityRampNode(Node): + + def __init__(self) -> None: + super().__init__('velocity_ramp_node') + + # ── Parameters ────────────────────────────────────────────────────── + self.declare_parameter('max_lin_accel', 0.5) + self.declare_parameter('max_lin_decel', 0.5) + self.declare_parameter('max_ang_accel', 1.0) + self.declare_parameter('max_ang_decel', 1.0) + self.declare_parameter('rate_hz', 50.0) + + p = self.get_parameter + rate_hz = max(float(p('rate_hz').value), 1.0) + max_lin_acc = max(float(p('max_lin_accel').value), 1e-3) + max_lin_dec = max(float(p('max_lin_decel').value), 1e-3) + max_ang_acc = max(float(p('max_ang_accel').value), 1e-3) + max_ang_dec = max(float(p('max_ang_decel').value), 1e-3) + + dt = 1.0 / rate_hz + + # ── Ramp state ─────────────────────────────────────────────────────── + self._ramp = VelocityRamp( + dt = dt, + max_lin_accel = max_lin_acc, + max_lin_decel = max_lin_dec, + max_ang_accel = max_ang_acc, + max_ang_decel = max_ang_dec, + ) + self._target_lin: float = 0.0 + self._target_ang: float = 0.0 + + # ── Subscriber ─────────────────────────────────────────────────────── + self._sub = self.create_subscription( + Twist, '/cmd_vel', self._on_cmd_vel, _SUB_QOS, + ) + + # ── Publisher + timer ──────────────────────────────────────────────── + self._pub = self.create_publisher(Twist, '/cmd_vel_smooth', _PUB_QOS) + self._timer = self.create_timer(dt, self._step) + + self.get_logger().info( + f'velocity_ramp_node ready — ' + f'rate={rate_hz:.0f}Hz ' + f'lin_accel={max_lin_acc}m/s² lin_decel={max_lin_dec}m/s² ' + f'ang_accel={max_ang_acc}rad/s² ang_decel={max_ang_dec}rad/s²' + ) + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _on_cmd_vel(self, msg: Twist) -> None: + self._target_lin = float(msg.linear.x) + self._target_ang = float(msg.angular.z) + + def _step(self) -> None: + lin, ang = self._ramp.step(self._target_lin, self._target_ang) + + out = Twist() + out.linear.x = lin + out.angular.z = ang + self._pub.publish(out) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = VelocityRampNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_bringup/setup.py b/jetson/ros2_ws/src/saltybot_bringup/setup.py index 0d71324..5e7240a 100644 --- a/jetson/ros2_ws/src/saltybot_bringup/setup.py +++ b/jetson/ros2_ws/src/saltybot_bringup/setup.py @@ -65,6 +65,8 @@ setup( 'person_tracking = saltybot_bringup.person_tracking_node:main', # UWB DW3000 anchor/tag ranging (Issue #365) 'uwb_node = saltybot_bringup.uwb_node:main', + # Smooth velocity ramp controller (Issue #350) + 'velocity_ramp = saltybot_bringup.velocity_ramp_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_bringup/test/test_velocity_ramp.py b/jetson/ros2_ws/src/saltybot_bringup/test/test_velocity_ramp.py new file mode 100644 index 0000000..5631523 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/test/test_velocity_ramp.py @@ -0,0 +1,431 @@ +""" +test_velocity_ramp.py — Unit tests for _velocity_ramp.py (Issue #350). + +Covers: + - Linear ramp-up (acceleration) + - Linear ramp-down (deceleration) + - Angular ramp-up / ramp-down + - Asymmetric accel vs decel limits + - Emergency stop (both targets = 0.0) + - Non-emergency partial decel (one axis non-zero) + - Sign reversal (positive → negative) + - Already-at-target (no overshoot) + - Reset + - Parameter validation + - _ramp_axis helper directly + - steps_to_reach estimate +""" + +from __future__ import annotations + +import math +import pytest + +from saltybot_bringup._velocity_ramp import ( + RampParams, + VelocityRamp, + _ramp_axis, +) + + +# ───────────────────────────────────────────────────────────────────────────── +# _ramp_axis helper +# ───────────────────────────────────────────────────────────────────────────── + +class TestRampAxis: + + def _p(self, accel=1.0, decel=1.0) -> RampParams: + return RampParams(max_accel=accel, max_decel=decel) + + def test_advances_toward_target(self): + val = _ramp_axis(0.0, 1.0, self._p(accel=0.5), dt=1.0) + assert abs(val - 0.5) < 1e-9 + + def test_reaches_target_exactly(self): + val = _ramp_axis(0.9, 1.0, self._p(accel=0.5), dt=1.0) + assert val == 1.0 # remaining gap 0.1 < max_change 0.5 + + def test_no_overshoot(self): + val = _ramp_axis(0.8, 1.0, self._p(accel=5.0), dt=1.0) + assert val == 1.0 + + def test_negative_direction(self): + val = _ramp_axis(0.0, -1.0, self._p(accel=0.5), dt=1.0) + assert abs(val - (-0.5)) < 1e-9 + + def test_decel_used_when_magnitude_falling(self): + # current=1.0, target=0.5 → magnitude falling → use decel=0.2 + val = _ramp_axis(1.0, 0.5, self._p(accel=1.0, decel=0.2), dt=1.0) + assert abs(val - 0.8) < 1e-9 + + def test_accel_used_when_magnitude_rising(self): + # current=0.5, target=1.0 → magnitude rising → use accel=0.3 + val = _ramp_axis(0.5, 1.0, self._p(accel=0.3, decel=1.0), dt=1.0) + assert abs(val - 0.8) < 1e-9 + + def test_sign_reversal_uses_decel(self): + # current=0.5 (positive), target=-0.5 → opposite sign → decel + val = _ramp_axis(0.5, -0.5, self._p(accel=1.0, decel=0.1), dt=1.0) + assert abs(val - 0.4) < 1e-9 + + def test_already_at_target_no_change(self): + val = _ramp_axis(1.0, 1.0, self._p(), dt=0.02) + assert val == 1.0 + + def test_zero_to_zero_no_change(self): + val = _ramp_axis(0.0, 0.0, self._p(), dt=0.02) + assert val == 0.0 + + def test_small_dt(self): + val = _ramp_axis(0.0, 10.0, self._p(accel=1.0), dt=0.02) + assert abs(val - 0.02) < 1e-9 + + def test_negative_to_less_negative(self): + # current=-1.0, target=-0.5 → magnitude falling (decelerating) + val = _ramp_axis(-1.0, -0.5, self._p(accel=1.0, decel=0.2), dt=1.0) + assert abs(val - (-0.8)) < 1e-9 + + def test_negative_to_more_negative(self): + # current=-0.5, target=-1.0 → magnitude rising (accelerating) + val = _ramp_axis(-0.5, -1.0, self._p(accel=0.3, decel=1.0), dt=1.0) + assert abs(val - (-0.8)) < 1e-9 + + +# ───────────────────────────────────────────────────────────────────────────── +# VelocityRamp construction +# ───────────────────────────────────────────────────────────────────────────── + +class TestVelocityRampConstruction: + + def test_default_params(self): + r = VelocityRamp() + assert r.dt == 0.02 + assert r.current_linear == 0.0 + assert r.current_angular == 0.0 + + def test_custom_dt(self): + r = VelocityRamp(dt=0.05) + assert r.dt == 0.05 + + def test_invalid_dt_raises(self): + with pytest.raises(ValueError): + VelocityRamp(dt=0.0) + + def test_negative_dt_raises(self): + with pytest.raises(ValueError): + VelocityRamp(dt=-0.01) + + def test_invalid_lin_accel_raises(self): + with pytest.raises(ValueError): + VelocityRamp(max_lin_accel=0.0) + + def test_invalid_ang_accel_raises(self): + with pytest.raises(ValueError): + VelocityRamp(max_ang_accel=-1.0) + + def test_asymmetric_decel_stored(self): + r = VelocityRamp(max_lin_accel=0.5, max_lin_decel=2.0) + assert r._lin.max_accel == 0.5 + assert r._lin.max_decel == 2.0 + + def test_decel_defaults_to_accel(self): + r = VelocityRamp(max_lin_accel=0.5) + assert r._lin.max_decel == 0.5 + + +# ───────────────────────────────────────────────────────────────────────────── +# Linear ramp-up +# ───────────────────────────────────────────────────────────────────────────── + +class TestLinearRampUp: + """ + VelocityRamp(dt=1.0, max_lin_accel=0.5) → 0.5 m/s per step. + """ + + def _ramp(self, **kw): + return VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=1.0, **kw) + + def test_first_step(self): + r = self._ramp() + lin, _ = r.step(1.0, 0.0) + assert abs(lin - 0.5) < 1e-9 + + def test_second_step(self): + r = self._ramp() + r.step(1.0, 0.0) + lin, _ = r.step(1.0, 0.0) + assert abs(lin - 1.0) < 1e-9 + + def test_reaches_target(self): + r = self._ramp() + lin = None + for _ in range(10): + lin, _ = r.step(1.0, 0.0) + assert lin == 1.0 + + def test_no_overshoot(self): + r = self._ramp() + outputs = [r.step(1.0, 0.0)[0] for _ in range(20)] + assert all(v <= 1.0 + 1e-9 for v in outputs) + + def test_current_linear_updated(self): + r = self._ramp() + r.step(1.0, 0.0) + assert abs(r.current_linear - 0.5) < 1e-9 + + def test_negative_target(self): + r = self._ramp() + lin, _ = r.step(-1.0, 0.0) + assert abs(lin - (-0.5)) < 1e-9 + + +# ───────────────────────────────────────────────────────────────────────────── +# Linear deceleration +# ───────────────────────────────────────────────────────────────────────────── + +class TestLinearDecel: + + def _ramp(self, decel=None): + return VelocityRamp( + dt=1.0, max_lin_accel=1.0, + max_lin_decel=decel if decel else 1.0, + max_ang_accel=1.0, + ) + + def _at_speed(self, r: VelocityRamp, speed: float) -> None: + """Bring ramp to given linear speed.""" + for _ in range(20): + r.step(speed, 0.0) + + def test_decel_from_1_to_0(self): + r = self._ramp(decel=0.5) + self._at_speed(r, 1.0) + lin, _ = r.step(0.5, 0.0) # slow down: target < current + assert abs(lin - 0.5) < 0.01 # 0.5 step downward + + def test_asymmetric_faster_decel(self): + """With max_lin_decel=2.0, deceleration is twice as fast.""" + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=2.0, max_ang_accel=1.0) + self._at_speed(r, 1.0) + lin, _ = r.step(0.0, 0.0) # decelerate — but target=0 triggers e-stop + # e-stop bypasses ramp + assert lin == 0.0 + + def test_partial_decel_non_zero_target(self): + """With target=0.5 and current=1.0, decel limit applies (non-zero → no e-stop).""" + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=2.0, max_ang_accel=1.0) + self._at_speed(r, 2.0) + # target=0.5 angular=0.1 (non-zero) → ramp decel applies + lin, _ = r.step(0.5, 0.1) + # current was 2.0, decel=2.0 → max step = 2.0 → should reach 0.5 immediately + assert abs(lin - 0.5) < 1e-9 + + +# ───────────────────────────────────────────────────────────────────────────── +# Angular ramp +# ───────────────────────────────────────────────────────────────────────────── + +class TestAngularRamp: + + def _ramp(self, **kw): + return VelocityRamp(dt=1.0, max_lin_accel=1.0, max_ang_accel=0.5, **kw) + + def test_angular_first_step(self): + r = self._ramp() + _, ang = r.step(0.0, 1.0) + # target_lin=0 & target_ang=1 → not e-stop (only ang non-zero) + # Wait — step(0.0, 1.0): only lin=0, ang=1 → not BOTH zero → ramp applies + assert abs(ang - 0.5) < 1e-9 + + def test_angular_reaches_target(self): + r = self._ramp() + ang = None + for _ in range(10): + _, ang = r.step(0.0, 1.0) + assert ang == 1.0 + + def test_angular_decel(self): + r = self._ramp(max_ang_decel=0.25) + for _ in range(10): + r.step(0.0, 1.0) + # decel with max_ang_decel=0.25: step is 0.25 per second + _, ang = r.step(0.0, 0.5) + assert abs(ang - 0.75) < 1e-9 + + def test_angular_negative(self): + r = self._ramp() + _, ang = r.step(0.0, -1.0) + assert abs(ang - (-0.5)) < 1e-9 + + +# ───────────────────────────────────────────────────────────────────────────── +# Emergency stop +# ───────────────────────────────────────────────────────────────────────────── + +class TestEmergencyStop: + + def _at_full_speed(self) -> VelocityRamp: + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5) + for _ in range(10): + r.step(2.0, 2.0) + return r + + def test_estop_returns_zero_immediately(self): + r = self._at_full_speed() + lin, ang = r.step(0.0, 0.0) + assert lin == 0.0 + assert ang == 0.0 + + def test_estop_updates_internal_state(self): + r = self._at_full_speed() + r.step(0.0, 0.0) + assert r.current_linear == 0.0 + assert r.current_angular == 0.0 + + def test_estop_from_rest_still_zero(self): + r = VelocityRamp(dt=0.02) + lin, ang = r.step(0.0, 0.0) + assert lin == 0.0 and ang == 0.0 + + def test_estop_then_ramp_resumes(self): + r = self._at_full_speed() + r.step(0.0, 0.0) # e-stop + lin, _ = r.step(1.0, 0.0) # ramp from 0 → first step + assert lin > 0.0 + assert lin < 1.0 + + def test_partial_zero_not_estop(self): + """lin=0 but ang≠0 should NOT trigger e-stop.""" + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5) + for _ in range(10): + r.step(1.0, 1.0) + # Now command lin=0, ang=0.5 — not an e-stop + lin, ang = r.step(0.0, 0.5) + # lin should ramp down (decel), NOT snap to 0 + assert lin > 0.0 + assert ang < 1.0 # angular also ramping down toward 0.5 + + def test_negative_zero_not_estop(self): + """step(-0.0, 0.0) — Python -0.0 == 0.0, should still e-stop.""" + r = VelocityRamp(dt=0.02) + for _ in range(20): + r.step(1.0, 0.0) + lin, ang = r.step(-0.0, 0.0) + assert lin == 0.0 and ang == 0.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# Sign reversal +# ───────────────────────────────────────────────────────────────────────────── + +class TestSignReversal: + + def test_reversal_decelerates_first(self): + """Reversing from +1 to -1 should pass through 0, not jump instantly.""" + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=0.5, max_ang_accel=1.0) + for _ in range(5): + r.step(1.0, 0.1) # reach ~1.0 forward; use ang=0.1 to avoid e-stop + outputs = [] + for _ in range(15): + lin, _ = r.step(-1.0, 0.1) + outputs.append(lin) + # Velocity should pass through zero on its way to -1 + assert any(v < 0 for v in outputs), "Should cross zero during reversal" + assert any(v > 0 for v in outputs[:3]), "Should start from positive side" + + def test_reversal_completes(self): + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=0.5, max_ang_accel=1.0) + for _ in range(5): + r.step(1.0, 0.1) + for _ in range(20): + lin, _ = r.step(-1.0, 0.1) + assert abs(lin - (-1.0)) < 1e-9 + + +# ───────────────────────────────────────────────────────────────────────────── +# Reset +# ───────────────────────────────────────────────────────────────────────────── + +class TestReset: + + def test_reset_clears_state(self): + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5) + r.step(2.0, 2.0) + r.reset() + assert r.current_linear == 0.0 + assert r.current_angular == 0.0 + + def test_after_reset_ramp_from_zero(self): + r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5) + for _ in range(5): + r.step(2.0, 0.0) + r.reset() + lin, _ = r.step(1.0, 0.0) + assert abs(lin - 0.5) < 1e-9 + + +# ───────────────────────────────────────────────────────────────────────────── +# Monotonicity +# ───────────────────────────────────────────────────────────────────────────── + +class TestMonotonicity: + + def test_linear_ramp_up_monotonic(self): + r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0) + prev = 0.0 + for _ in range(100): + lin, _ = r.step(1.0, 0.0) + assert lin >= prev - 1e-9 + prev = lin + + def test_linear_ramp_down_monotonic(self): + r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0) + for _ in range(200): + r.step(1.0, 0.0) + prev = r.current_linear + for _ in range(100): + lin, _ = r.step(0.5, 0.1) # decel toward 0.5 (non-zero ang avoids e-stop) + assert lin <= prev + 1e-9 + prev = lin + + def test_angular_ramp_monotonic(self): + r = VelocityRamp(dt=0.02, max_lin_accel=1.0, max_ang_accel=1.0) + prev = 0.0 + for _ in range(50): + _, ang = r.step(0.1, 2.0) + assert ang >= prev - 1e-9 + prev = ang + + +# ───────────────────────────────────────────────────────────────────────────── +# Timing / rate accuracy +# ───────────────────────────────────────────────────────────────────────────── + +class TestRateAccuracy: + + def test_50hz_correct_step_size(self): + """At 50 Hz with 0.5 m/s² → step = 0.01 m/s per tick.""" + r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0) + lin, _ = r.step(1.0, 0.0) + assert abs(lin - 0.01) < 1e-9 + + def test_10hz_correct_step_size(self): + """At 10 Hz with 0.5 m/s² → step = 0.05 m/s per tick.""" + r = VelocityRamp(dt=0.1, max_lin_accel=0.5, max_ang_accel=1.0) + lin, _ = r.step(1.0, 0.0) + assert abs(lin - 0.05) < 1e-9 + + def test_steps_to_target_50hz(self): + """At 50 Hz, 0.5 m/s², reaching 1.0 m/s takes 100 steps (2 s).""" + r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0) + steps = 0 + while r.current_linear < 1.0 - 1e-9: + r.step(1.0, 0.0) + steps += 1 + assert steps < 200, "Should converge in under 200 steps" + assert 99 <= steps <= 101 + + def test_steps_to_reach_estimate(self): + r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0) + est = r.steps_to_reach(1.0, 0.0) + assert est > 0