feat: smooth velocity ramp controller (Issue #350) #372
@ -0,0 +1,194 @@
|
||||
"""
|
||||
_velocity_ramp.py — Acceleration/deceleration limiter for cmd_vel (Issue #350).
|
||||
|
||||
Pure-Python library (no ROS2 dependencies) for full unit-test coverage.
|
||||
|
||||
Behaviour
|
||||
---------
|
||||
The VelocityRamp class applies independent rate limits to the linear-x and
|
||||
angular-z components of a 2D velocity command:
|
||||
|
||||
- Linear acceleration limit : max_lin_accel (m/s²)
|
||||
- Linear deceleration limit : max_lin_decel (m/s²) — may differ from accel
|
||||
- Angular acceleration limit : max_ang_accel (rad/s²)
|
||||
- Angular deceleration limit : max_ang_decel (rad/s²)
|
||||
|
||||
"Deceleration" applies when the magnitude of the velocity is *decreasing*
|
||||
(including moving toward zero from either sign direction), while "acceleration"
|
||||
applies when the magnitude is increasing.
|
||||
|
||||
Emergency stop
|
||||
--------------
|
||||
When both linear and angular targets are exactly 0.0 the ramp is bypassed and
|
||||
the output is forced to (0.0, 0.0) immediately. This allows a watchdog or
|
||||
safety node to halt the robot instantly without waiting for the deceleration
|
||||
ramp.
|
||||
|
||||
Usage
|
||||
-----
|
||||
ramp = VelocityRamp(dt=0.02) # 50 Hz
|
||||
out_lin, out_ang = ramp.step(1.0, 0.0) # target linear=1 m/s, angular=0
|
||||
out_lin, out_ang = ramp.step(1.0, 0.0) # ramp climbs toward 1.0 m/s
|
||||
out_lin, out_ang = ramp.step(0.0, 0.0) # emergency stop → (0.0, 0.0)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class RampParams:
|
||||
"""Acceleration / deceleration limits for one velocity axis."""
|
||||
max_accel: float # magnitude / second — applied when |v| increasing
|
||||
max_decel: float # magnitude / second — applied when |v| decreasing
|
||||
|
||||
|
||||
def _ramp_axis(
|
||||
current: float,
|
||||
target: float,
|
||||
params: RampParams,
|
||||
dt: float,
|
||||
) -> float:
|
||||
"""
|
||||
Advance *current* one timestep toward *target* with rate limiting.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
current : current velocity on this axis
|
||||
target : desired velocity on this axis
|
||||
params : accel / decel limits
|
||||
dt : timestep in seconds
|
||||
|
||||
Returns
|
||||
-------
|
||||
New velocity, clamped so the change per step never exceeds the limits.
|
||||
|
||||
Notes on accel vs decel selection
|
||||
----------------------------------
|
||||
We treat the motion as decelerating when the target is closer to zero
|
||||
than the current value, i.e. |target| < |current| or they have opposite
|
||||
signs. In all other cases we use the acceleration limit.
|
||||
"""
|
||||
delta = target - current
|
||||
|
||||
# Choose limit: decel if velocity magnitude is falling, else accel.
|
||||
is_decelerating = (
|
||||
abs(target) < abs(current)
|
||||
or (current > 0 and target < 0)
|
||||
or (current < 0 and target > 0)
|
||||
)
|
||||
limit = params.max_decel if is_decelerating else params.max_accel
|
||||
|
||||
max_change = limit * dt
|
||||
if abs(delta) <= max_change:
|
||||
return target
|
||||
return current + max_change * (1.0 if delta > 0 else -1.0)
|
||||
|
||||
|
||||
class VelocityRamp:
|
||||
"""
|
||||
Smooths a stream of (linear_x, angular_z) velocity commands by applying
|
||||
configurable acceleration and deceleration limits.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dt : timestep in seconds (must match the control loop rate)
|
||||
max_lin_accel : maximum linear acceleration (m/s²)
|
||||
max_lin_decel : maximum linear deceleration (m/s²); defaults to max_lin_accel
|
||||
max_ang_accel : maximum angular acceleration (rad/s²)
|
||||
max_ang_decel : maximum angular deceleration (rad/s²); defaults to max_ang_accel
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
dt: float = 0.02, # 50 Hz
|
||||
max_lin_accel: float = 0.5, # m/s²
|
||||
max_lin_decel: float | None = None,
|
||||
max_ang_accel: float = 1.0, # rad/s²
|
||||
max_ang_decel: float | None = None,
|
||||
) -> None:
|
||||
if dt <= 0:
|
||||
raise ValueError(f'dt must be positive, got {dt}')
|
||||
if max_lin_accel <= 0:
|
||||
raise ValueError(f'max_lin_accel must be positive, got {max_lin_accel}')
|
||||
if max_ang_accel <= 0:
|
||||
raise ValueError(f'max_ang_accel must be positive, got {max_ang_accel}')
|
||||
|
||||
self._dt = dt
|
||||
self._lin = RampParams(
|
||||
max_accel=max_lin_accel,
|
||||
max_decel=max_lin_decel if max_lin_decel is not None else max_lin_accel,
|
||||
)
|
||||
self._ang = RampParams(
|
||||
max_accel=max_ang_accel,
|
||||
max_decel=max_ang_decel if max_ang_decel is not None else max_ang_accel,
|
||||
)
|
||||
|
||||
self._cur_lin: float = 0.0
|
||||
self._cur_ang: float = 0.0
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
def step(self, target_lin: float, target_ang: float) -> tuple[float, float]:
|
||||
"""
|
||||
Advance the ramp one timestep.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
target_lin : desired linear velocity (m/s)
|
||||
target_ang : desired angular velocity (rad/s)
|
||||
|
||||
Returns
|
||||
-------
|
||||
(smoothed_linear, smoothed_angular) tuple.
|
||||
|
||||
If both target_lin and target_ang are exactly 0.0, an emergency stop
|
||||
is applied and (0.0, 0.0) is returned immediately.
|
||||
"""
|
||||
# Emergency stop: bypass ramp entirely
|
||||
if target_lin == 0.0 and target_ang == 0.0:
|
||||
self._cur_lin = 0.0
|
||||
self._cur_ang = 0.0
|
||||
return 0.0, 0.0
|
||||
|
||||
self._cur_lin = _ramp_axis(self._cur_lin, target_lin, self._lin, self._dt)
|
||||
self._cur_ang = _ramp_axis(self._cur_ang, target_ang, self._ang, self._dt)
|
||||
return self._cur_lin, self._cur_ang
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset internal state to zero (e.g. on node restart or re-enable)."""
|
||||
self._cur_lin = 0.0
|
||||
self._cur_ang = 0.0
|
||||
|
||||
@property
|
||||
def current_linear(self) -> float:
|
||||
"""Current smoothed linear velocity (m/s)."""
|
||||
return self._cur_lin
|
||||
|
||||
@property
|
||||
def current_angular(self) -> float:
|
||||
"""Current smoothed angular velocity (rad/s)."""
|
||||
return self._cur_ang
|
||||
|
||||
@property
|
||||
def dt(self) -> float:
|
||||
return self._dt
|
||||
|
||||
# ── Derived ───────────────────────────────────────────────────────────────
|
||||
|
||||
def steps_to_reach(self, target_lin: float, target_ang: float) -> int:
|
||||
"""
|
||||
Estimate how many steps() are needed to reach the target from the
|
||||
current state (useful for test assertions).
|
||||
|
||||
This is an approximation based on the worst-case axis; it does not
|
||||
account for the emergency-stop shortcut.
|
||||
"""
|
||||
lin_steps = 0
|
||||
ang_steps = 0
|
||||
if self._lin.max_accel > 0:
|
||||
lin_steps = int(abs(target_lin - self._cur_lin) / (self._lin.max_accel * self._dt)) + 1
|
||||
if self._ang.max_accel > 0:
|
||||
ang_steps = int(abs(target_ang - self._cur_ang) / (self._ang.max_accel * self._dt)) + 1
|
||||
return max(lin_steps, ang_steps)
|
||||
@ -0,0 +1,125 @@
|
||||
"""
|
||||
velocity_ramp_node.py — Smooth velocity ramp controller (Issue #350).
|
||||
|
||||
Subscribes to raw /cmd_vel commands and republishes them on /cmd_vel_smooth
|
||||
after applying independent acceleration and deceleration limits to the linear-x
|
||||
and angular-z components.
|
||||
|
||||
An emergency stop (both linear and angular targets exactly 0.0) bypasses the
|
||||
ramp and forces the output to zero immediately.
|
||||
|
||||
Subscribes
|
||||
----------
|
||||
/cmd_vel geometry_msgs/Twist raw velocity commands
|
||||
|
||||
Publishes
|
||||
---------
|
||||
/cmd_vel_smooth geometry_msgs/Twist rate-limited velocity commands
|
||||
|
||||
Parameters
|
||||
----------
|
||||
max_lin_accel float 0.5 Maximum linear acceleration (m/s²)
|
||||
max_lin_decel float 0.5 Maximum linear deceleration (m/s²)
|
||||
max_ang_accel float 1.0 Maximum angular acceleration (rad/s²)
|
||||
max_ang_decel float 1.0 Maximum angular deceleration (rad/s²)
|
||||
rate_hz float 50.0 Control loop rate (Hz)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from geometry_msgs.msg import Twist
|
||||
|
||||
from ._velocity_ramp import VelocityRamp
|
||||
|
||||
|
||||
_SUB_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
_PUB_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
|
||||
|
||||
class VelocityRampNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('velocity_ramp_node')
|
||||
|
||||
# ── Parameters ──────────────────────────────────────────────────────
|
||||
self.declare_parameter('max_lin_accel', 0.5)
|
||||
self.declare_parameter('max_lin_decel', 0.5)
|
||||
self.declare_parameter('max_ang_accel', 1.0)
|
||||
self.declare_parameter('max_ang_decel', 1.0)
|
||||
self.declare_parameter('rate_hz', 50.0)
|
||||
|
||||
p = self.get_parameter
|
||||
rate_hz = max(float(p('rate_hz').value), 1.0)
|
||||
max_lin_acc = max(float(p('max_lin_accel').value), 1e-3)
|
||||
max_lin_dec = max(float(p('max_lin_decel').value), 1e-3)
|
||||
max_ang_acc = max(float(p('max_ang_accel').value), 1e-3)
|
||||
max_ang_dec = max(float(p('max_ang_decel').value), 1e-3)
|
||||
|
||||
dt = 1.0 / rate_hz
|
||||
|
||||
# ── Ramp state ───────────────────────────────────────────────────────
|
||||
self._ramp = VelocityRamp(
|
||||
dt = dt,
|
||||
max_lin_accel = max_lin_acc,
|
||||
max_lin_decel = max_lin_dec,
|
||||
max_ang_accel = max_ang_acc,
|
||||
max_ang_decel = max_ang_dec,
|
||||
)
|
||||
self._target_lin: float = 0.0
|
||||
self._target_ang: float = 0.0
|
||||
|
||||
# ── Subscriber ───────────────────────────────────────────────────────
|
||||
self._sub = self.create_subscription(
|
||||
Twist, '/cmd_vel', self._on_cmd_vel, _SUB_QOS,
|
||||
)
|
||||
|
||||
# ── Publisher + timer ────────────────────────────────────────────────
|
||||
self._pub = self.create_publisher(Twist, '/cmd_vel_smooth', _PUB_QOS)
|
||||
self._timer = self.create_timer(dt, self._step)
|
||||
|
||||
self.get_logger().info(
|
||||
f'velocity_ramp_node ready — '
|
||||
f'rate={rate_hz:.0f}Hz '
|
||||
f'lin_accel={max_lin_acc}m/s² lin_decel={max_lin_dec}m/s² '
|
||||
f'ang_accel={max_ang_acc}rad/s² ang_decel={max_ang_dec}rad/s²'
|
||||
)
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||
self._target_lin = float(msg.linear.x)
|
||||
self._target_ang = float(msg.angular.z)
|
||||
|
||||
def _step(self) -> None:
|
||||
lin, ang = self._ramp.step(self._target_lin, self._target_ang)
|
||||
|
||||
out = Twist()
|
||||
out.linear.x = lin
|
||||
out.angular.z = ang
|
||||
self._pub.publish(out)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = VelocityRampNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -65,6 +65,8 @@ setup(
|
||||
'person_tracking = saltybot_bringup.person_tracking_node:main',
|
||||
# UWB DW3000 anchor/tag ranging (Issue #365)
|
||||
'uwb_node = saltybot_bringup.uwb_node:main',
|
||||
# Smooth velocity ramp controller (Issue #350)
|
||||
'velocity_ramp = saltybot_bringup.velocity_ramp_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
431
jetson/ros2_ws/src/saltybot_bringup/test/test_velocity_ramp.py
Normal file
431
jetson/ros2_ws/src/saltybot_bringup/test/test_velocity_ramp.py
Normal file
@ -0,0 +1,431 @@
|
||||
"""
|
||||
test_velocity_ramp.py — Unit tests for _velocity_ramp.py (Issue #350).
|
||||
|
||||
Covers:
|
||||
- Linear ramp-up (acceleration)
|
||||
- Linear ramp-down (deceleration)
|
||||
- Angular ramp-up / ramp-down
|
||||
- Asymmetric accel vs decel limits
|
||||
- Emergency stop (both targets = 0.0)
|
||||
- Non-emergency partial decel (one axis non-zero)
|
||||
- Sign reversal (positive → negative)
|
||||
- Already-at-target (no overshoot)
|
||||
- Reset
|
||||
- Parameter validation
|
||||
- _ramp_axis helper directly
|
||||
- steps_to_reach estimate
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import pytest
|
||||
|
||||
from saltybot_bringup._velocity_ramp import (
|
||||
RampParams,
|
||||
VelocityRamp,
|
||||
_ramp_axis,
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# _ramp_axis helper
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestRampAxis:
|
||||
|
||||
def _p(self, accel=1.0, decel=1.0) -> RampParams:
|
||||
return RampParams(max_accel=accel, max_decel=decel)
|
||||
|
||||
def test_advances_toward_target(self):
|
||||
val = _ramp_axis(0.0, 1.0, self._p(accel=0.5), dt=1.0)
|
||||
assert abs(val - 0.5) < 1e-9
|
||||
|
||||
def test_reaches_target_exactly(self):
|
||||
val = _ramp_axis(0.9, 1.0, self._p(accel=0.5), dt=1.0)
|
||||
assert val == 1.0 # remaining gap 0.1 < max_change 0.5
|
||||
|
||||
def test_no_overshoot(self):
|
||||
val = _ramp_axis(0.8, 1.0, self._p(accel=5.0), dt=1.0)
|
||||
assert val == 1.0
|
||||
|
||||
def test_negative_direction(self):
|
||||
val = _ramp_axis(0.0, -1.0, self._p(accel=0.5), dt=1.0)
|
||||
assert abs(val - (-0.5)) < 1e-9
|
||||
|
||||
def test_decel_used_when_magnitude_falling(self):
|
||||
# current=1.0, target=0.5 → magnitude falling → use decel=0.2
|
||||
val = _ramp_axis(1.0, 0.5, self._p(accel=1.0, decel=0.2), dt=1.0)
|
||||
assert abs(val - 0.8) < 1e-9
|
||||
|
||||
def test_accel_used_when_magnitude_rising(self):
|
||||
# current=0.5, target=1.0 → magnitude rising → use accel=0.3
|
||||
val = _ramp_axis(0.5, 1.0, self._p(accel=0.3, decel=1.0), dt=1.0)
|
||||
assert abs(val - 0.8) < 1e-9
|
||||
|
||||
def test_sign_reversal_uses_decel(self):
|
||||
# current=0.5 (positive), target=-0.5 → opposite sign → decel
|
||||
val = _ramp_axis(0.5, -0.5, self._p(accel=1.0, decel=0.1), dt=1.0)
|
||||
assert abs(val - 0.4) < 1e-9
|
||||
|
||||
def test_already_at_target_no_change(self):
|
||||
val = _ramp_axis(1.0, 1.0, self._p(), dt=0.02)
|
||||
assert val == 1.0
|
||||
|
||||
def test_zero_to_zero_no_change(self):
|
||||
val = _ramp_axis(0.0, 0.0, self._p(), dt=0.02)
|
||||
assert val == 0.0
|
||||
|
||||
def test_small_dt(self):
|
||||
val = _ramp_axis(0.0, 10.0, self._p(accel=1.0), dt=0.02)
|
||||
assert abs(val - 0.02) < 1e-9
|
||||
|
||||
def test_negative_to_less_negative(self):
|
||||
# current=-1.0, target=-0.5 → magnitude falling (decelerating)
|
||||
val = _ramp_axis(-1.0, -0.5, self._p(accel=1.0, decel=0.2), dt=1.0)
|
||||
assert abs(val - (-0.8)) < 1e-9
|
||||
|
||||
def test_negative_to_more_negative(self):
|
||||
# current=-0.5, target=-1.0 → magnitude rising (accelerating)
|
||||
val = _ramp_axis(-0.5, -1.0, self._p(accel=0.3, decel=1.0), dt=1.0)
|
||||
assert abs(val - (-0.8)) < 1e-9
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# VelocityRamp construction
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestVelocityRampConstruction:
|
||||
|
||||
def test_default_params(self):
|
||||
r = VelocityRamp()
|
||||
assert r.dt == 0.02
|
||||
assert r.current_linear == 0.0
|
||||
assert r.current_angular == 0.0
|
||||
|
||||
def test_custom_dt(self):
|
||||
r = VelocityRamp(dt=0.05)
|
||||
assert r.dt == 0.05
|
||||
|
||||
def test_invalid_dt_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
VelocityRamp(dt=0.0)
|
||||
|
||||
def test_negative_dt_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
VelocityRamp(dt=-0.01)
|
||||
|
||||
def test_invalid_lin_accel_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
VelocityRamp(max_lin_accel=0.0)
|
||||
|
||||
def test_invalid_ang_accel_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
VelocityRamp(max_ang_accel=-1.0)
|
||||
|
||||
def test_asymmetric_decel_stored(self):
|
||||
r = VelocityRamp(max_lin_accel=0.5, max_lin_decel=2.0)
|
||||
assert r._lin.max_accel == 0.5
|
||||
assert r._lin.max_decel == 2.0
|
||||
|
||||
def test_decel_defaults_to_accel(self):
|
||||
r = VelocityRamp(max_lin_accel=0.5)
|
||||
assert r._lin.max_decel == 0.5
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Linear ramp-up
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestLinearRampUp:
|
||||
"""
|
||||
VelocityRamp(dt=1.0, max_lin_accel=0.5) → 0.5 m/s per step.
|
||||
"""
|
||||
|
||||
def _ramp(self, **kw):
|
||||
return VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=1.0, **kw)
|
||||
|
||||
def test_first_step(self):
|
||||
r = self._ramp()
|
||||
lin, _ = r.step(1.0, 0.0)
|
||||
assert abs(lin - 0.5) < 1e-9
|
||||
|
||||
def test_second_step(self):
|
||||
r = self._ramp()
|
||||
r.step(1.0, 0.0)
|
||||
lin, _ = r.step(1.0, 0.0)
|
||||
assert abs(lin - 1.0) < 1e-9
|
||||
|
||||
def test_reaches_target(self):
|
||||
r = self._ramp()
|
||||
lin = None
|
||||
for _ in range(10):
|
||||
lin, _ = r.step(1.0, 0.0)
|
||||
assert lin == 1.0
|
||||
|
||||
def test_no_overshoot(self):
|
||||
r = self._ramp()
|
||||
outputs = [r.step(1.0, 0.0)[0] for _ in range(20)]
|
||||
assert all(v <= 1.0 + 1e-9 for v in outputs)
|
||||
|
||||
def test_current_linear_updated(self):
|
||||
r = self._ramp()
|
||||
r.step(1.0, 0.0)
|
||||
assert abs(r.current_linear - 0.5) < 1e-9
|
||||
|
||||
def test_negative_target(self):
|
||||
r = self._ramp()
|
||||
lin, _ = r.step(-1.0, 0.0)
|
||||
assert abs(lin - (-0.5)) < 1e-9
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Linear deceleration
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestLinearDecel:
|
||||
|
||||
def _ramp(self, decel=None):
|
||||
return VelocityRamp(
|
||||
dt=1.0, max_lin_accel=1.0,
|
||||
max_lin_decel=decel if decel else 1.0,
|
||||
max_ang_accel=1.0,
|
||||
)
|
||||
|
||||
def _at_speed(self, r: VelocityRamp, speed: float) -> None:
|
||||
"""Bring ramp to given linear speed."""
|
||||
for _ in range(20):
|
||||
r.step(speed, 0.0)
|
||||
|
||||
def test_decel_from_1_to_0(self):
|
||||
r = self._ramp(decel=0.5)
|
||||
self._at_speed(r, 1.0)
|
||||
lin, _ = r.step(0.5, 0.0) # slow down: target < current
|
||||
assert abs(lin - 0.5) < 0.01 # 0.5 step downward
|
||||
|
||||
def test_asymmetric_faster_decel(self):
|
||||
"""With max_lin_decel=2.0, deceleration is twice as fast."""
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=2.0, max_ang_accel=1.0)
|
||||
self._at_speed(r, 1.0)
|
||||
lin, _ = r.step(0.0, 0.0) # decelerate — but target=0 triggers e-stop
|
||||
# e-stop bypasses ramp
|
||||
assert lin == 0.0
|
||||
|
||||
def test_partial_decel_non_zero_target(self):
|
||||
"""With target=0.5 and current=1.0, decel limit applies (non-zero → no e-stop)."""
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=2.0, max_ang_accel=1.0)
|
||||
self._at_speed(r, 2.0)
|
||||
# target=0.5 angular=0.1 (non-zero) → ramp decel applies
|
||||
lin, _ = r.step(0.5, 0.1)
|
||||
# current was 2.0, decel=2.0 → max step = 2.0 → should reach 0.5 immediately
|
||||
assert abs(lin - 0.5) < 1e-9
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Angular ramp
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestAngularRamp:
|
||||
|
||||
def _ramp(self, **kw):
|
||||
return VelocityRamp(dt=1.0, max_lin_accel=1.0, max_ang_accel=0.5, **kw)
|
||||
|
||||
def test_angular_first_step(self):
|
||||
r = self._ramp()
|
||||
_, ang = r.step(0.0, 1.0)
|
||||
# target_lin=0 & target_ang=1 → not e-stop (only ang non-zero)
|
||||
# Wait — step(0.0, 1.0): only lin=0, ang=1 → not BOTH zero → ramp applies
|
||||
assert abs(ang - 0.5) < 1e-9
|
||||
|
||||
def test_angular_reaches_target(self):
|
||||
r = self._ramp()
|
||||
ang = None
|
||||
for _ in range(10):
|
||||
_, ang = r.step(0.0, 1.0)
|
||||
assert ang == 1.0
|
||||
|
||||
def test_angular_decel(self):
|
||||
r = self._ramp(max_ang_decel=0.25)
|
||||
for _ in range(10):
|
||||
r.step(0.0, 1.0)
|
||||
# decel with max_ang_decel=0.25: step is 0.25 per second
|
||||
_, ang = r.step(0.0, 0.5)
|
||||
assert abs(ang - 0.75) < 1e-9
|
||||
|
||||
def test_angular_negative(self):
|
||||
r = self._ramp()
|
||||
_, ang = r.step(0.0, -1.0)
|
||||
assert abs(ang - (-0.5)) < 1e-9
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Emergency stop
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestEmergencyStop:
|
||||
|
||||
def _at_full_speed(self) -> VelocityRamp:
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5)
|
||||
for _ in range(10):
|
||||
r.step(2.0, 2.0)
|
||||
return r
|
||||
|
||||
def test_estop_returns_zero_immediately(self):
|
||||
r = self._at_full_speed()
|
||||
lin, ang = r.step(0.0, 0.0)
|
||||
assert lin == 0.0
|
||||
assert ang == 0.0
|
||||
|
||||
def test_estop_updates_internal_state(self):
|
||||
r = self._at_full_speed()
|
||||
r.step(0.0, 0.0)
|
||||
assert r.current_linear == 0.0
|
||||
assert r.current_angular == 0.0
|
||||
|
||||
def test_estop_from_rest_still_zero(self):
|
||||
r = VelocityRamp(dt=0.02)
|
||||
lin, ang = r.step(0.0, 0.0)
|
||||
assert lin == 0.0 and ang == 0.0
|
||||
|
||||
def test_estop_then_ramp_resumes(self):
|
||||
r = self._at_full_speed()
|
||||
r.step(0.0, 0.0) # e-stop
|
||||
lin, _ = r.step(1.0, 0.0) # ramp from 0 → first step
|
||||
assert lin > 0.0
|
||||
assert lin < 1.0
|
||||
|
||||
def test_partial_zero_not_estop(self):
|
||||
"""lin=0 but ang≠0 should NOT trigger e-stop."""
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5)
|
||||
for _ in range(10):
|
||||
r.step(1.0, 1.0)
|
||||
# Now command lin=0, ang=0.5 — not an e-stop
|
||||
lin, ang = r.step(0.0, 0.5)
|
||||
# lin should ramp down (decel), NOT snap to 0
|
||||
assert lin > 0.0
|
||||
assert ang < 1.0 # angular also ramping down toward 0.5
|
||||
|
||||
def test_negative_zero_not_estop(self):
|
||||
"""step(-0.0, 0.0) — Python -0.0 == 0.0, should still e-stop."""
|
||||
r = VelocityRamp(dt=0.02)
|
||||
for _ in range(20):
|
||||
r.step(1.0, 0.0)
|
||||
lin, ang = r.step(-0.0, 0.0)
|
||||
assert lin == 0.0 and ang == 0.0
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Sign reversal
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSignReversal:
|
||||
|
||||
def test_reversal_decelerates_first(self):
|
||||
"""Reversing from +1 to -1 should pass through 0, not jump instantly."""
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=0.5, max_ang_accel=1.0)
|
||||
for _ in range(5):
|
||||
r.step(1.0, 0.1) # reach ~1.0 forward; use ang=0.1 to avoid e-stop
|
||||
outputs = []
|
||||
for _ in range(15):
|
||||
lin, _ = r.step(-1.0, 0.1)
|
||||
outputs.append(lin)
|
||||
# Velocity should pass through zero on its way to -1
|
||||
assert any(v < 0 for v in outputs), "Should cross zero during reversal"
|
||||
assert any(v > 0 for v in outputs[:3]), "Should start from positive side"
|
||||
|
||||
def test_reversal_completes(self):
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_lin_decel=0.5, max_ang_accel=1.0)
|
||||
for _ in range(5):
|
||||
r.step(1.0, 0.1)
|
||||
for _ in range(20):
|
||||
lin, _ = r.step(-1.0, 0.1)
|
||||
assert abs(lin - (-1.0)) < 1e-9
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Reset
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestReset:
|
||||
|
||||
def test_reset_clears_state(self):
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5)
|
||||
r.step(2.0, 2.0)
|
||||
r.reset()
|
||||
assert r.current_linear == 0.0
|
||||
assert r.current_angular == 0.0
|
||||
|
||||
def test_after_reset_ramp_from_zero(self):
|
||||
r = VelocityRamp(dt=1.0, max_lin_accel=0.5, max_ang_accel=0.5)
|
||||
for _ in range(5):
|
||||
r.step(2.0, 0.0)
|
||||
r.reset()
|
||||
lin, _ = r.step(1.0, 0.0)
|
||||
assert abs(lin - 0.5) < 1e-9
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Monotonicity
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestMonotonicity:
|
||||
|
||||
def test_linear_ramp_up_monotonic(self):
|
||||
r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0)
|
||||
prev = 0.0
|
||||
for _ in range(100):
|
||||
lin, _ = r.step(1.0, 0.0)
|
||||
assert lin >= prev - 1e-9
|
||||
prev = lin
|
||||
|
||||
def test_linear_ramp_down_monotonic(self):
|
||||
r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0)
|
||||
for _ in range(200):
|
||||
r.step(1.0, 0.0)
|
||||
prev = r.current_linear
|
||||
for _ in range(100):
|
||||
lin, _ = r.step(0.5, 0.1) # decel toward 0.5 (non-zero ang avoids e-stop)
|
||||
assert lin <= prev + 1e-9
|
||||
prev = lin
|
||||
|
||||
def test_angular_ramp_monotonic(self):
|
||||
r = VelocityRamp(dt=0.02, max_lin_accel=1.0, max_ang_accel=1.0)
|
||||
prev = 0.0
|
||||
for _ in range(50):
|
||||
_, ang = r.step(0.1, 2.0)
|
||||
assert ang >= prev - 1e-9
|
||||
prev = ang
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Timing / rate accuracy
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestRateAccuracy:
|
||||
|
||||
def test_50hz_correct_step_size(self):
|
||||
"""At 50 Hz with 0.5 m/s² → step = 0.01 m/s per tick."""
|
||||
r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0)
|
||||
lin, _ = r.step(1.0, 0.0)
|
||||
assert abs(lin - 0.01) < 1e-9
|
||||
|
||||
def test_10hz_correct_step_size(self):
|
||||
"""At 10 Hz with 0.5 m/s² → step = 0.05 m/s per tick."""
|
||||
r = VelocityRamp(dt=0.1, max_lin_accel=0.5, max_ang_accel=1.0)
|
||||
lin, _ = r.step(1.0, 0.0)
|
||||
assert abs(lin - 0.05) < 1e-9
|
||||
|
||||
def test_steps_to_target_50hz(self):
|
||||
"""At 50 Hz, 0.5 m/s², reaching 1.0 m/s takes 100 steps (2 s)."""
|
||||
r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0)
|
||||
steps = 0
|
||||
while r.current_linear < 1.0 - 1e-9:
|
||||
r.step(1.0, 0.0)
|
||||
steps += 1
|
||||
assert steps < 200, "Should converge in under 200 steps"
|
||||
assert 99 <= steps <= 101
|
||||
|
||||
def test_steps_to_reach_estimate(self):
|
||||
r = VelocityRamp(dt=0.02, max_lin_accel=0.5, max_ang_accel=1.0)
|
||||
est = r.steps_to_reach(1.0, 0.0)
|
||||
assert est > 0
|
||||
Loading…
x
Reference in New Issue
Block a user