From 118d2b3add11831d85201853abb3b8f555e217f8 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Sun, 1 Mar 2026 01:06:50 -0500 Subject: [PATCH] feat: outdoor adaptive speed controller (saltybot_speed_controller) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds saltybot_speed_controller ROS2 package — sits between person_follower (/cmd_vel_raw) and cmd_vel_bridge (/cmd_vel), providing adaptive speed profiles tuned for balance stability during outdoor follow-me up to 8 m/s (EUC ride mode). Key features: - walk/jog/ride profiles (1.5/3.0/8.0 m/s) selected via UWB target velocity - Hysteresis-based switching (5 ticks up, 15 ticks down) prevents oscillation - Trapezoidal accel/decel ramps per profile; ride accel 0.3 m/s² (balance-safe) - Emergency decel (2.0 m/s²) triggered by sudden target stop or hard decel - GPS runaway protection: if GPS > commanded×1.5 AND > 50% profile_max → brake - 52/52 unit tests (no ROS2 runtime required) Topics: /cmd_vel_raw → [speed_controller] → /cmd_vel, /speed_controller/profile Launch: ros2 launch saltybot_speed_controller outdoor_speed.launch.py Co-Authored-By: Claude Sonnet 4.6 --- .../config/speed_profiles.yaml | 96 ++++ .../launch/outdoor_speed.launch.py | 155 ++++++ .../src/saltybot_speed_controller/package.xml | 31 ++ .../resource/saltybot_speed_controller | 0 .../saltybot_speed_controller/__init__.py | 0 .../speed_controller_node.py | 491 ++++++++++++++++++ .../src/saltybot_speed_controller/setup.cfg | 4 + .../src/saltybot_speed_controller/setup.py | 31 ++ .../test/test_speed_controller.py | 428 +++++++++++++++ 9 files changed, 1236 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/config/speed_profiles.yaml create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/launch/outdoor_speed.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/resource/saltybot_speed_controller create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/saltybot_speed_controller/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/saltybot_speed_controller/speed_controller_node.py create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_speed_controller/test/test_speed_controller.py diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/config/speed_profiles.yaml b/jetson/ros2_ws/src/saltybot_speed_controller/config/speed_profiles.yaml new file mode 100644 index 0000000..7532ab0 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_speed_controller/config/speed_profiles.yaml @@ -0,0 +1,96 @@ +# speed_profiles.yaml +# Outdoor adaptive speed profiles for SaltyBot follow-me mode. +# +# Run with: +# ros2 launch saltybot_speed_controller outdoor_speed.launch.py +# +# IMPORTANT: This controller outputs /cmd_vel which feeds into cmd_vel_bridge +# (PR #46). The bridge applies its own hard caps (max_linear_vel=0.5 m/s). +# For outdoor/ride profiles you MUST re-launch cmd_vel_bridge with higher limits: +# ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=8.0 +# +# Data flow: +# person_follower → /cmd_vel_raw → [speed_controller] → /cmd_vel → cmd_vel_bridge → STM32 + +# ── Controller ───────────────────────────────────────────────────────────────── +control_rate: 50.0 # Hz — 50ms tick, same as cmd_vel_bridge +input_topic: /cmd_vel_raw # Upstream cmd_vel source +output_topic: /cmd_vel # Output to cmd_vel_bridge +initial_profile: walk # Start in safest profile until target velocity known + +# ── Profile selection ────────────────────────────────────────────────────────── +# Hysteresis prevents rapid walk↔jog↔ride oscillation. +# up_ticks: consecutive ticks above threshold before upgrading profile (50Hz → 5 = 100ms) +# down_ticks: ticks below threshold before downgrading (50Hz → 15 = 300ms) +# Downgrade is slower (safety: prefer lower profile when uncertain) +hysteresis_up_ticks: 5 +hysteresis_down_ticks: 15 + +# ── Emergency stop ───────────────────────────────────────────────────────────── +# Triggered when target stops suddenly: +# (a) target velocity drops below stop_vel_threshold, OR +# (b) target deceleration exceeds hard_stop_decel_threshold m/s² +# +# emergency_decel_mps2: how fast the robot decelerates to zero. +# Conservative (0.5-1.0 m/s²) is safer for balance; too fast = tip-over. +# At ride speed (8 m/s), 2.0 m/s² = 4 second stop distance ≈ 16m — acceptable. +emergency_decel_mps2: 2.0 # m/s² — hard brake limit +stop_vel_threshold: 0.4 # m/s — target speed below this = stopped +hard_stop_decel_threshold: 3.0 # m/s² — target decel above this = hard stop + +# ── GPS runaway protection ───────────────────────────────────────────────────── +# If GPS speed > commanded_speed × runaway_factor AND GPS speed > 50% of +# profile max, trigger emergency decel (catches downhill runaways). +gps_runaway_factor: 1.5 # 1.5× commanded = runaway +gps_timeout: 3.0 # s — ignore GPS if stale (tunnel, bridge) + +# ── UWB range ───────────────────────────────────────────────────────────────── +uwb_range_field: range # JSON field name in /uwb/ranges messages +uwb_timeout: 2.0 # s — use GPS-only profile if UWB stale + +# ── Speed profiles ───────────────────────────────────────────────────────────── +# Each profile applies when estimated target velocity is in [0, target_vel_max]. +# Velocity limits are BEFORE cmd_vel_bridge caps (bridge must also be set high). +# +# Acceleration notes: +# - Balance robot needs time to adjust lean angle when speed changes. +# - accel_limit at RIDE (0.3 m/s²) means 0→8 m/s takes ~27s — intentionally slow. +# - Increase only after validating balance PID tuning at speed. +# - decel_limit can be slightly higher than accel_limit (braking > accelerating). + +walk: + # Person walking at 0–2 m/s (≈ 0–7 km/h) + max_linear_vel: 1.5 # m/s (= 5.4 km/h) + max_angular_vel: 1.2 # rad/s + accel_limit: 0.6 # m/s² — moderate, balance handles it + decel_limit: 1.0 # m/s² + target_vel_max: 2.0 # m/s — target faster than this → upgrade to jog + +jog: + # Person jogging at 2–5 m/s (≈ 7–18 km/h) + max_linear_vel: 3.0 # m/s (= 10.8 km/h) + max_angular_vel: 1.5 # rad/s + accel_limit: 0.5 # m/s² — a bit gentler at higher speed + decel_limit: 0.8 # m/s² + target_vel_max: 5.0 # m/s — target faster than this → upgrade to ride + +ride: + # Person on EUC at 5–15 m/s (≈ 18–54 km/h / 20–30 km/h typical follow-me) + max_linear_vel: 8.0 # m/s (= 28.8 km/h) + max_angular_vel: 1.5 # rad/s — conservative for balance at speed + accel_limit: 0.3 # m/s² — very gentle: balance dynamics change at 8 m/s + decel_limit: 0.5 # m/s² — gradual stop from 8 m/s ≈ 16s / ~64m + target_vel_max: 15.0 # m/s — cap; EUC max ~30 km/h = 8.3 m/s typical + +# ── Notes ───────────────────────────────────────────────────────────────────── +# 1. To enable ride profile, the Jetson → STM32 cmd_vel_bridge must also be +# reconfigured: max_linear_vel=8.0, ramp_rate=500 → consider ramp_rate=150 +# at ride speed (slower ramp = smoother balance). +# +# 2. The STM32 balance PID gains likely need retuning for ride speed. Expect +# increased sensitivity to pitch angle errors at 8 m/s vs 0.5 m/s. +# +# 3. Test sequence recommendation: +# - Validate walk profile on flat indoor surface first +# - Test jog profile in controlled outdoor space +# - Only enable ride profile after verified balance at jog speeds diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/launch/outdoor_speed.launch.py b/jetson/ros2_ws/src/saltybot_speed_controller/launch/outdoor_speed.launch.py new file mode 100644 index 0000000..4d98fb3 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_speed_controller/launch/outdoor_speed.launch.py @@ -0,0 +1,155 @@ +""" +outdoor_speed.launch.py — Outdoor adaptive speed controller. + +Inserts SpeedControllerNode between the person_follower (/cmd_vel_raw) +and cmd_vel_bridge (/cmd_vel), providing adaptive walk/jog/ride profiles +with smooth acceleration curves and emergency deceleration. + +IMPORTANT: For ride profile (up to 8 m/s) you must ALSO re-launch +cmd_vel_bridge with matching limits: + ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=8.0 + +Prerequisite node pipeline: + person_follower → /cmd_vel_raw → [speed_controller] → /cmd_vel → cmd_vel_bridge → STM32 + +Usage: + # Defaults (walk profile initially, adapts via UWB + GPS): + ros2 launch saltybot_speed_controller outdoor_speed.launch.py + + # Force ride profile limits from start: + ros2 launch saltybot_speed_controller outdoor_speed.launch.py initial_profile:=ride + + # Tune jog profile max speed: + ros2 launch saltybot_speed_controller outdoor_speed.launch.py jog_max_vel:=4.0 + + # Gentle emergency decel (longer stop distance, safer for balance): + ros2 launch saltybot_speed_controller outdoor_speed.launch.py emergency_decel:=1.0 + + # Full outdoor follow-me pipeline (speed controller + person follower): + ros2 launch saltybot_speed_controller outdoor_speed.launch.py \ + launch_person_follower:=true + +Topics: + /cmd_vel_raw — Input (from person_follower or Nav2) + /cmd_vel — Output (to cmd_vel_bridge) + /speed_controller/profile — Active profile + status (JSON String) + /gps/vel — Ground truth speed input (from gps_driver_node) + /uwb/ranges — Target range input (for velocity estimation) +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, OpaqueFunction +from launch.launch_description_sources import PythonLaunchDescriptionSource +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def _launch_nodes(context, *args, **kwargs): + params_file = LaunchConfiguration("params_file").perform(context) + + def _s(k): return LaunchConfiguration(k).perform(context) + def _f(k): return float(LaunchConfiguration(k).perform(context)) + def _i(k): return int(LaunchConfiguration(k).perform(context)) + def _b(k): return LaunchConfiguration(k).perform(context).lower() == "true" + + inline_params = { + "control_rate": _f("control_rate"), + "input_topic": _s("input_topic"), + "initial_profile": _s("initial_profile"), + "hysteresis_up_ticks": _i("hysteresis_up_ticks"), + "hysteresis_down_ticks": _i("hysteresis_down_ticks"), + "emergency_decel_mps2": _f("emergency_decel"), + "stop_vel_threshold": _f("stop_vel_threshold"), + "hard_stop_decel_threshold": _f("hard_stop_decel_threshold"), + "gps_runaway_factor": _f("gps_runaway_factor"), + "gps_timeout": _f("gps_timeout"), + "uwb_timeout": _f("uwb_timeout"), + # Per-profile params + "walk.max_linear_vel": _f("walk_max_vel"), + "walk.accel_limit": _f("walk_accel"), + "walk.decel_limit": _f("walk_decel"), + "jog.max_linear_vel": _f("jog_max_vel"), + "jog.accel_limit": _f("jog_accel"), + "jog.decel_limit": _f("jog_decel"), + "ride.max_linear_vel": _f("ride_max_vel"), + "ride.accel_limit": _f("ride_accel"), + "ride.decel_limit": _f("ride_decel"), + } + node_params = [params_file, inline_params] if params_file else [inline_params] + + nodes = [ + Node( + package="saltybot_speed_controller", + executable="speed_controller_node", + name="speed_controller", + output="screen", + parameters=node_params, + ), + ] + + if _b("launch_person_follower"): + try: + follower_pkg = get_package_share_directory("saltybot_follower") + follower_launch = os.path.join( + follower_pkg, "launch", "person_follower.launch.py") + nodes.append(IncludeLaunchDescription( + PythonLaunchDescriptionSource(follower_launch), + launch_arguments={"cmd_vel_topic": "/cmd_vel_raw"}.items(), + )) + except Exception: + pass # package not built yet — skip silently + + return nodes + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_speed_controller") + default_params = os.path.join(pkg_share, "config", "speed_profiles.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", default_value=default_params, + description="Full path to speed_profiles.yaml"), + + # General + DeclareLaunchArgument("control_rate", default_value="50.0"), + DeclareLaunchArgument("input_topic", default_value="/cmd_vel_raw"), + DeclareLaunchArgument("initial_profile", default_value="walk"), + DeclareLaunchArgument("hysteresis_up_ticks", default_value="5"), + DeclareLaunchArgument("hysteresis_down_ticks",default_value="15"), + DeclareLaunchArgument("launch_person_follower", default_value="false", + description="Also launch saltybot_follower with /cmd_vel_raw output"), + + # Emergency + DeclareLaunchArgument("emergency_decel", default_value="2.0", + description="Emergency decel limit (m/s²)"), + DeclareLaunchArgument("stop_vel_threshold", default_value="0.4"), + DeclareLaunchArgument("hard_stop_decel_threshold",default_value="3.0"), + + # GPS runaway + DeclareLaunchArgument("gps_runaway_factor", default_value="1.5"), + DeclareLaunchArgument("gps_timeout", default_value="3.0"), + DeclareLaunchArgument("uwb_timeout", default_value="2.0"), + + # Walk profile + DeclareLaunchArgument("walk_max_vel", default_value="1.5"), + DeclareLaunchArgument("walk_accel", default_value="0.6"), + DeclareLaunchArgument("walk_decel", default_value="1.0"), + + # Jog profile + DeclareLaunchArgument("jog_max_vel", default_value="3.0"), + DeclareLaunchArgument("jog_accel", default_value="0.5"), + DeclareLaunchArgument("jog_decel", default_value="0.8"), + + # Ride profile + DeclareLaunchArgument("ride_max_vel", default_value="8.0", + description="Max speed for EUC follow-me (m/s)"), + DeclareLaunchArgument("ride_accel", default_value="0.3", + description="Gentle accel — balance sensitive at 8 m/s"), + DeclareLaunchArgument("ride_decel", default_value="0.5"), + + OpaqueFunction(function=_launch_nodes), + ]) diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/package.xml b/jetson/ros2_ws/src/saltybot_speed_controller/package.xml new file mode 100644 index 0000000..ed40b23 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_speed_controller/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_speed_controller + 0.1.0 + + Outdoor adaptive speed controller for SaltyBot follow-me mode. + Inserts between person_follower (/cmd_vel_raw) and cmd_vel_bridge (/cmd_vel), + selecting walk/jog/ride profiles based on estimated target velocity (UWB + GPS), + applying smooth trapezoidal acceleration curves, GPS runaway protection, + and emergency deceleration for balance stability at speeds up to 8 m/s. + + sl-controls + MIT + + rclpy + geometry_msgs + sensor_msgs + std_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/resource/saltybot_speed_controller b/jetson/ros2_ws/src/saltybot_speed_controller/resource/saltybot_speed_controller new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/saltybot_speed_controller/__init__.py b/jetson/ros2_ws/src/saltybot_speed_controller/saltybot_speed_controller/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/saltybot_speed_controller/speed_controller_node.py b/jetson/ros2_ws/src/saltybot_speed_controller/saltybot_speed_controller/speed_controller_node.py new file mode 100644 index 0000000..fce9b79 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_speed_controller/saltybot_speed_controller/speed_controller_node.py @@ -0,0 +1,491 @@ +""" +speed_controller_node.py — Outdoor adaptive speed controller for SaltyBot. + +Sits between upstream cmd_vel sources (person_follower, Nav2) and the +cmd_vel_bridge (PR #46), providing: + + • Adaptive speed profiles (walk / jog / ride) selected from UWB target + velocity and GPS ground-truth speed + • Smooth acceleration curves (trapezoidal ramp) per profile, tuned for + balance stability at speed — more aggressive jerk at low speed, gentler + at ride speeds (EUC follow-me at 20-30 km/h) + • Emergency deceleration when the target stops suddenly (detected via + UWB range acceleration and target velocity drop) + • GPS runaway protection if GPS ground speed significantly overshoots + the commanded velocity (downhill, slippage) + +Data flow: + /cmd_vel_raw (Twist) — uncapped request from person_follower/Nav2 + /gps/vel (TwistStamped) — robot ground-truth speed (GPS, 1 Hz) + /uwb/ranges (String JSON) — UWB range to target (for velocity estimate) + ↓ + SpeedControllerNode + ↓ + /cmd_vel (Twist) — smoothed, profile-capped output → cmd_vel_bridge + +Profiles +──────── + walk : ≤ 1.5 m/s (person walking 0–2 m/s) + jog : ≤ 3.0 m/s (person jogging 2–5 m/s) + ride : ≤ 8.0 m/s (person on EUC 5–15 m/s) + +Profile selection uses UWB-derived target velocity with hysteresis to avoid +rapid switching. GPS ground speed provides a secondary confirmation and +runaway guard. +""" + +import collections +import json +import math +import time + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import Twist, TwistStamped +from std_msgs.msg import String + + +# ── Pure control functions ──────────────────────────────────────────────────── +# Extracted here so unit tests run without a ROS2 runtime. + +# Profile name constants +WALK = "walk" +JOG = "jog" +RIDE = "ride" + +_PROFILE_ORDER = [WALK, JOG, RIDE] + + +def estimate_target_velocity(prev_range: float, curr_range: float, + dt: float, robot_gps_speed: float) -> float: + """ + Estimate absolute target velocity from UWB range derivative + GPS. + + When the robot follows a target: + range_rate = d(range)/dt (positive = target moving away) + target_vel ≈ robot_gps_speed + range_rate + + This gives a rough scalar speed suitable for profile classification. + Returns 0 if dt ≤ 0 or range values are invalid. + """ + if dt <= 0 or prev_range <= 0 or curr_range <= 0: + return robot_gps_speed # fall back to robot's own speed + range_rate = (curr_range - prev_range) / dt + return max(0.0, robot_gps_speed + range_rate) + + +def select_profile(target_vel: float, profiles: dict, + current_profile: str, + hysteresis_up: int = 5, + hysteresis_down: int = 10, + _counters: dict = None) -> str: + """ + Select speed profile with up/down hysteresis. + + target_vel : estimated target velocity (m/s) + profiles : dict of profile_name → {target_vel_min, target_vel_max, ...} + current_profile: currently active profile + hysteresis_up : ticks before switching to a faster profile + hysteresis_down: ticks before switching to a slower profile + + Returns the (possibly unchanged) profile name. + + _counters is an internal mutable dict passed by the caller for state + (avoids making this a method). Pass {} on first call; reuse thereafter. + """ + if _counters is None: + _counters = {} + + # Determine which profile best fits the target velocity + desired = current_profile + for name in _PROFILE_ORDER: + p = profiles[name] + if target_vel <= p["target_vel_max"]: + desired = name + break + else: + desired = RIDE # above all max → stay at highest + + if desired == current_profile: + _counters.clear() + return current_profile + + key = desired + _counters[key] = _counters.get(key, 0) + 1 + + idx_desired = _PROFILE_ORDER.index(desired) + idx_current = _PROFILE_ORDER.index(current_profile) + threshold = hysteresis_up if idx_desired > idx_current else hysteresis_down + + if _counters[key] >= threshold: + _counters.clear() + return desired + return current_profile + + +def apply_accel_limit(current: float, target: float, + accel_limit: float, decel_limit: float, + dt: float) -> float: + """ + Apply symmetric trapezoidal acceleration / deceleration limit. + + current : current velocity (m/s) + target : desired velocity (m/s) + accel_limit : max acceleration (m/s²) + decel_limit : max deceleration (m/s²) — positive value + dt : time step (s) + + Returns new velocity, ramped toward target within the limits. + """ + delta = target - current + # Decelerating = magnitude is decreasing (current and delta have opposite signs) + decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0) + limit = decel_limit if decelerating else accel_limit + if delta > 0: + step = min(delta, limit * dt) + else: + step = max(delta, -limit * dt) + return current + step + + +def limit_velocity(linear_x: float, angular_z: float, + max_linear: float, max_angular: float) -> tuple: + """Hard-clamp linear and angular velocity to profile limits.""" + lin = max(-max_linear, min(max_linear, linear_x)) + ang = max(-max_angular, min(max_angular, angular_z)) + return lin, ang + + +def check_emergency_stop(target_vel: float, target_vel_prev: float, + dt: float, + stop_vel_threshold: float, + hard_decel_threshold: float) -> bool: + """ + Detect sudden target stop requiring emergency deceleration. + + Triggers if: + (a) target velocity drops below stop_vel_threshold, OR + (b) target deceleration exceeds hard_decel_threshold (m/s²) + + Returns True if emergency stop should be engaged. + """ + if target_vel < stop_vel_threshold: + return True + if dt > 0: + target_decel = (target_vel_prev - target_vel) / dt # positive = decelerating + if target_decel > hard_decel_threshold: + return True + return False + + +def check_gps_runaway(gps_speed: float, cmd_vel: float, + runaway_factor: float, profile_max: float) -> bool: + """ + Detect GPS runaway: robot moving significantly faster than commanded. + + Triggers if gps_speed > max(cmd_vel, 0.5) * runaway_factor + AND gps_speed > profile_max (not a sensor glitch on a stationary robot). + """ + reference = max(abs(cmd_vel), 0.5) + return gps_speed > reference * runaway_factor and gps_speed > profile_max * 0.5 + + +def emergency_decel_step(current_vel: float, decel_limit: float, dt: float) -> float: + """Decelerate toward zero at emergency_decel_limit. Returns new velocity.""" + if current_vel > 0: + return max(0.0, current_vel - decel_limit * dt) + if current_vel < 0: + return min(0.0, current_vel + decel_limit * dt) + return 0.0 + + +# ── Default profiles (overridable via YAML) ────────────────────────────────── + +DEFAULT_PROFILES = { + WALK: { + "max_linear_vel": 1.5, + "max_angular_vel": 1.2, + "accel_limit": 0.6, # m/s² + "decel_limit": 1.0, # m/s² + "target_vel_min": 0.0, + "target_vel_max": 2.0, + }, + JOG: { + "max_linear_vel": 3.0, + "max_angular_vel": 1.5, + "accel_limit": 0.5, + "decel_limit": 0.8, + "target_vel_min": 2.0, + "target_vel_max": 5.0, + }, + RIDE: { + "max_linear_vel": 8.0, + "max_angular_vel": 1.5, # conservative for balance at speed + "accel_limit": 0.3, # very gentle — balance needs time at 8 m/s + "decel_limit": 0.5, + "target_vel_min": 5.0, + "target_vel_max": 15.0, + }, +} + + +# ── ROS2 Node ───────────────────────────────────────────────────────────────── + +class SpeedControllerNode(Node): + + def __init__(self): + super().__init__("speed_controller") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("control_rate", 50.0) + self.declare_parameter("input_topic", "/cmd_vel_raw") + self.declare_parameter("output_topic", "/cmd_vel") + self.declare_parameter("initial_profile", WALK) + + # Profile selection + self.declare_parameter("hysteresis_up_ticks", 5) + self.declare_parameter("hysteresis_down_ticks", 15) + + # Emergency stop + self.declare_parameter("emergency_decel_mps2", 2.0) + self.declare_parameter("stop_vel_threshold", 0.4) # m/s + self.declare_parameter("hard_stop_decel_threshold", 3.0) # m/s² + + # GPS runaway + self.declare_parameter("gps_runaway_factor", 1.5) # 1.5× commanded + self.declare_parameter("gps_timeout", 3.0) # s before ignoring GPS + + # UWB range + self.declare_parameter("uwb_range_field", "range") + self.declare_parameter("uwb_timeout", 2.0) + + # Per-profile overrides (flat params, merged into DEFAULT_PROFILES) + for profile in _PROFILE_ORDER: + self.declare_parameter(f"{profile}.max_linear_vel", + DEFAULT_PROFILES[profile]["max_linear_vel"]) + self.declare_parameter(f"{profile}.max_angular_vel", + DEFAULT_PROFILES[profile]["max_angular_vel"]) + self.declare_parameter(f"{profile}.accel_limit", + DEFAULT_PROFILES[profile]["accel_limit"]) + self.declare_parameter(f"{profile}.decel_limit", + DEFAULT_PROFILES[profile]["decel_limit"]) + self.declare_parameter(f"{profile}.target_vel_max", + DEFAULT_PROFILES[profile]["target_vel_max"]) + + self._profiles = self._load_profiles() + self._p = { + "control_rate": self.get_parameter("control_rate").value, + "hysteresis_up": self.get_parameter("hysteresis_up_ticks").value, + "hysteresis_down": self.get_parameter("hysteresis_down_ticks").value, + "emergency_decel": self.get_parameter("emergency_decel_mps2").value, + "stop_vel_threshold": self.get_parameter("stop_vel_threshold").value, + "hard_stop_decel_threshold": self.get_parameter("hard_stop_decel_threshold").value, + "gps_runaway_factor": self.get_parameter("gps_runaway_factor").value, + "gps_timeout": self.get_parameter("gps_timeout").value, + "uwb_range_field": self.get_parameter("uwb_range_field").value, + "uwb_timeout": self.get_parameter("uwb_timeout").value, + } + dt = 1.0 / self._p["control_rate"] + + # ── State ───────────────────────────────────────────────────────────── + self._profile = self.get_parameter("initial_profile").value + self._profile_counters: dict = {} + self._current_linear = 0.0 # smoothed output linear.x + self._current_angular= 0.0 # smoothed output angular.z + self._request_linear = 0.0 # latest /cmd_vel_raw linear.x + self._request_angular= 0.0 # latest /cmd_vel_raw angular.z + self._gps_speed = 0.0 # magnitude from /gps/vel + self._gps_stamp = 0.0 # monotonic time of last GPS + self._uwb_range = 0.0 # latest range (m) + self._uwb_range_prev = 0.0 + self._uwb_stamp = 0.0 + self._uwb_stamp_prev = 0.0 + self._target_vel = 0.0 + self._target_vel_prev= 0.0 + self._emergency = False + self._dt = dt + + # History for runaway filter (last 3 GPS samples) + self._gps_history: collections.deque = collections.deque(maxlen=3) + + # ── Subscriptions ───────────────────────────────────────────────────── + self.create_subscription( + Twist, self.get_parameter("input_topic").value, + self._cmd_raw_cb, 10) + self.create_subscription( + TwistStamped, "/gps/vel", self._gps_cb, 10) + self.create_subscription( + String, "/uwb/ranges", self._uwb_cb, 10) + + # ── Publisher ────────────────────────────────────────────────────────── + self._cmd_pub = self.create_publisher( + Twist, self.get_parameter("output_topic").value, 10) + self._profile_pub = self.create_publisher( + String, "/speed_controller/profile", 10) + + # ── Control timer ────────────────────────────────────────────────────── + self.create_timer(dt, self._control_cb) + + self.get_logger().info( + f"SpeedController ready profile={self._profile} " + f"rate={self._p['control_rate']}Hz " + f"walk≤{self._profiles[WALK]['max_linear_vel']}m/s " + f"jog≤{self._profiles[JOG]['max_linear_vel']}m/s " + f"ride≤{self._profiles[RIDE]['max_linear_vel']}m/s") + + def _load_profiles(self) -> dict: + """Merge ROS2 params into DEFAULT_PROFILES.""" + import copy + profiles = copy.deepcopy(DEFAULT_PROFILES) + for name in _PROFILE_ORDER: + for field in ("max_linear_vel", "max_angular_vel", "accel_limit", + "decel_limit", "target_vel_max"): + val = self.get_parameter(f"{name}.{field}").value + profiles[name][field] = val + return profiles + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _cmd_raw_cb(self, msg: Twist): + self._request_linear = msg.linear.x + self._request_angular = msg.angular.z + + def _gps_cb(self, msg: TwistStamped): + vx = msg.twist.linear.x + vy = msg.twist.linear.y + self._gps_speed = math.sqrt(vx * vx + vy * vy) + self._gps_stamp = time.monotonic() + self._gps_history.append(self._gps_speed) + + def _uwb_cb(self, msg: String): + """Parse JSON range message, update range estimate.""" + try: + data = json.loads(msg.data) + field = self._p["uwb_range_field"] + r = float(data.get(field, data.get("distance", 0.0))) + if r > 0: + self._uwb_range_prev = self._uwb_range + self._uwb_stamp_prev = self._uwb_stamp + self._uwb_range = r + self._uwb_stamp = time.monotonic() + except (json.JSONDecodeError, ValueError, KeyError): + pass + + # ── Control loop ────────────────────────────────────────────────────────── + + def _control_cb(self): + now = time.monotonic() + dt = self._dt + + # ── 1. Estimate target velocity from UWB range derivative + GPS ────── + uwb_fresh = (now - self._uwb_stamp) < self._p["uwb_timeout"] + gps_fresh = (now - self._gps_stamp) < self._p["gps_timeout"] + + if uwb_fresh and self._uwb_stamp_prev > 0: + uwb_dt = self._uwb_stamp - self._uwb_stamp_prev + robot_speed = self._gps_speed if gps_fresh else self._current_linear + self._target_vel_prev = self._target_vel + self._target_vel = estimate_target_velocity( + self._uwb_range_prev, self._uwb_range, uwb_dt, robot_speed) + elif gps_fresh: + # No UWB — use GPS speed as proxy for target velocity + self._target_vel_prev = self._target_vel + self._target_vel = self._gps_speed + # else: hold previous estimate + + # ── 2. Profile selection ────────────────────────────────────────────── + self._profile = select_profile( + self._target_vel, self._profiles, self._profile, + self._p["hysteresis_up"], self._p["hysteresis_down"], + self._profile_counters, + ) + prof = self._profiles[self._profile] + + # ── 3. Emergency stop detection ─────────────────────────────────────── + emerg = check_emergency_stop( + self._target_vel, self._target_vel_prev, dt, + self._p["stop_vel_threshold"], + self._p["hard_stop_decel_threshold"], + ) + # GPS runaway check (use smoothed GPS) + if gps_fresh: + gps_smooth = sum(self._gps_history) / len(self._gps_history) + if check_gps_runaway( + gps_smooth, self._current_linear, + self._p["gps_runaway_factor"], + prof["max_linear_vel"]): + emerg = True + self.get_logger().warn( + f"GPS runaway: gps={gps_smooth:.2f} " + f"cmd={self._current_linear:.2f} m/s — emergency decel", + throttle_duration_sec=2.0) + + # Latch emergency; clear when speed reaches zero + if emerg and not self._emergency: + self.get_logger().warn( + f"Emergency stop: target_vel={self._target_vel:.2f} m/s " + f"(profile={self._profile})") + self._emergency = emerg or ( + self._emergency and abs(self._current_linear) > 0.05) + + # ── 4. Compute target command ───────────────────────────────────────── + if self._emergency: + # Override request: decelerate to zero at emergency rate + target_lin = 0.0 + target_ang = 0.0 + new_lin = emergency_decel_step( + self._current_linear, self._p["emergency_decel"], dt) + new_ang = emergency_decel_step( + self._current_angular, self._p["emergency_decel"] * 2.0, dt) + else: + # Apply profile caps to request + target_lin, target_ang = limit_velocity( + self._request_linear, self._request_angular, + prof["max_linear_vel"], prof["max_angular_vel"]) + # Apply smooth acceleration / deceleration + new_lin = apply_accel_limit( + self._current_linear, target_lin, + prof["accel_limit"], prof["decel_limit"], dt) + new_ang = apply_accel_limit( + self._current_angular, target_ang, + prof["accel_limit"] * 2.0, # angular can be snappier + prof["decel_limit"] * 2.0, dt) + + self._current_linear = new_lin + self._current_angular = new_ang + + # ── 5. Publish ──────────────────────────────────────────────────────── + out = Twist() + out.linear.x = self._current_linear + out.angular.z = self._current_angular + self._cmd_pub.publish(out) + + # Profile status (for dashboard / logging) + status = String() + status.data = json.dumps({ + "profile": self._profile, + "max_vel": prof["max_linear_vel"], + "target_vel": round(self._target_vel, 2), + "gps_speed": round(self._gps_speed, 2), + "cmd_linear": round(new_lin, 3), + "emergency": self._emergency, + }) + self._profile_pub.publish(status) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = SpeedControllerNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/setup.cfg b/jetson/ros2_ws/src/saltybot_speed_controller/setup.cfg new file mode 100644 index 0000000..f10ab9b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_speed_controller/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_speed_controller +[install] +install_scripts=$base/lib/saltybot_speed_controller diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/setup.py b/jetson/ros2_ws/src/saltybot_speed_controller/setup.py new file mode 100644 index 0000000..7311d84 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_speed_controller/setup.py @@ -0,0 +1,31 @@ +from setuptools import setup + +package_name = "saltybot_speed_controller" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/outdoor_speed.launch.py"]), + (f"share/{package_name}/config", ["config/speed_profiles.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description=( + "Outdoor adaptive speed controller: walk/jog/ride profiles " + "with trapezoidal accel limits and GPS runaway protection" + ), + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + # Adaptive speed profile node: /cmd_vel_raw → /cmd_vel + "speed_controller_node = saltybot_speed_controller.speed_controller_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_speed_controller/test/test_speed_controller.py b/jetson/ros2_ws/src/saltybot_speed_controller/test/test_speed_controller.py new file mode 100644 index 0000000..4a14e34 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_speed_controller/test/test_speed_controller.py @@ -0,0 +1,428 @@ +""" +Unit tests for speed_controller_node pure functions. +No ROS2 runtime required. +Run with: pytest jetson/ros2_ws/src/saltybot_speed_controller/test/test_speed_controller.py +""" + +import pytest + +# ── Pure function mirrors ───────────────────────────────────────────────────── + +WALK = "walk" +JOG = "jog" +RIDE = "ride" +_PROFILE_ORDER = [WALK, JOG, RIDE] + +_PROFILES = { + WALK: {"max_linear_vel": 1.5, "max_angular_vel": 1.2, + "accel_limit": 0.6, "decel_limit": 1.0, + "target_vel_min": 0.0, "target_vel_max": 2.0}, + JOG: {"max_linear_vel": 3.0, "max_angular_vel": 1.5, + "accel_limit": 0.5, "decel_limit": 0.8, + "target_vel_min": 2.0, "target_vel_max": 5.0}, + RIDE: {"max_linear_vel": 8.0, "max_angular_vel": 1.5, + "accel_limit": 0.3, "decel_limit": 0.5, + "target_vel_min": 5.0, "target_vel_max": 15.0}, +} + + +def _estimate_target_velocity(prev_range, curr_range, dt, robot_gps_speed): + if dt <= 0 or prev_range <= 0 or curr_range <= 0: + return robot_gps_speed + range_rate = (curr_range - prev_range) / dt + return max(0.0, robot_gps_speed + range_rate) + + +def _select_profile(target_vel, profiles, current_profile, + hysteresis_up=5, hysteresis_down=10, counters=None): + if counters is None: + counters = {} + desired = current_profile + for name in _PROFILE_ORDER: + if target_vel <= profiles[name]["target_vel_max"]: + desired = name + break + else: + desired = RIDE + if desired == current_profile: + counters.clear() + return current_profile + key = desired + counters[key] = counters.get(key, 0) + 1 + idx_d = _PROFILE_ORDER.index(desired) + idx_c = _PROFILE_ORDER.index(current_profile) + threshold = hysteresis_up if idx_d > idx_c else hysteresis_down + if counters[key] >= threshold: + counters.clear() + return desired + return current_profile + + +def _apply_accel_limit(current, target, accel_limit, decel_limit, dt): + delta = target - current + # Decelerating = magnitude is decreasing (current and delta have opposite signs) + decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0) + limit = decel_limit if decelerating else accel_limit + if delta > 0: + step = min(delta, limit * dt) + else: + step = max(delta, -limit * dt) + return current + step + + +def _limit_velocity(linear_x, angular_z, max_linear, max_angular): + return (max(-max_linear, min(max_linear, linear_x)), + max(-max_angular, min(max_angular, angular_z))) + + +def _check_emergency_stop(target_vel, target_vel_prev, dt, + stop_threshold, hard_decel_threshold): + if target_vel < stop_threshold: + return True + if dt > 0 and (target_vel_prev - target_vel) / dt > hard_decel_threshold: + return True + return False + + +def _check_gps_runaway(gps_speed, cmd_vel, runaway_factor, profile_max): + reference = max(abs(cmd_vel), 0.5) + return gps_speed > reference * runaway_factor and gps_speed > profile_max * 0.5 + + +def _emergency_decel_step(current_vel, decel_limit, dt): + if current_vel > 0: + return max(0.0, current_vel - decel_limit * dt) + if current_vel < 0: + return min(0.0, current_vel + decel_limit * dt) + return 0.0 + + +# ── Target velocity estimation ──────────────────────────────────────────────── + +class TestTargetVelocityEstimation: + def test_target_faster_than_robot(self): + # Robot at 2 m/s GPS, range increasing 3 m/s → target at 5 m/s + v = _estimate_target_velocity(5.0, 8.0, 1.0, 2.0) # range: 5→8 in 1s + assert v == pytest.approx(5.0) + + def test_target_same_speed_constant_range(self): + v = _estimate_target_velocity(5.0, 5.0, 1.0, 3.0) + assert v == pytest.approx(3.0) + + def test_target_stopped_range_closing(self): + # Robot at 3 m/s, range closing 3 m/s → target = 3 - 3 = 0 → clamped to 0 + v = _estimate_target_velocity(8.0, 5.0, 1.0, 3.0) + assert v == pytest.approx(0.0) + + def test_invalid_dt_returns_gps_speed(self): + assert _estimate_target_velocity(5.0, 6.0, 0.0, 2.5) == pytest.approx(2.5) + + def test_invalid_prev_range_returns_gps_speed(self): + assert _estimate_target_velocity(0.0, 6.0, 1.0, 2.5) == pytest.approx(2.5) + + def test_result_non_negative(self): + # Range closing much faster than robot → would be negative → clamp to 0 + v = _estimate_target_velocity(10.0, 3.0, 0.5, 1.0) + assert v >= 0.0 + + def test_high_speed_ride_estimate(self): + # Robot at 6 m/s, range increasing 2 m/s → target at 8 m/s + v = _estimate_target_velocity(4.0, 5.0, 0.5, 6.0) # 2 m/s range rate + assert v == pytest.approx(8.0) + + +# ── Profile selection ───────────────────────────────────────────────────────── + +class TestProfileSelection: + def test_walk_range_stays_walk(self): + assert _select_profile(1.0, _PROFILES, WALK) == WALK + + def test_jog_range_stays_jog(self): + assert _select_profile(3.5, _PROFILES, JOG) == JOG + + def test_ride_range_stays_ride(self): + assert _select_profile(7.0, _PROFILES, RIDE) == RIDE + + def test_upgrade_needs_hysteresis_up(self): + # Should NOT upgrade immediately + counters = {} + profile = WALK + # 4 ticks at jog speed — not enough (hysteresis_up=5) + for _ in range(4): + profile = _select_profile(3.0, _PROFILES, profile, + hysteresis_up=5, counters=counters) + assert profile == WALK + + def test_upgrade_happens_at_threshold(self): + counters = {} + profile = WALK + for _ in range(5): + profile = _select_profile(3.0, _PROFILES, profile, + hysteresis_up=5, counters=counters) + assert profile == JOG + + def test_downgrade_needs_more_hysteresis(self): + counters = {} + profile = JOG + for _ in range(9): + profile = _select_profile(1.0, _PROFILES, profile, + hysteresis_down=10, counters=counters) + assert profile == JOG # not yet + + def test_downgrade_happens_at_threshold(self): + counters = {} + profile = JOG + for _ in range(10): + profile = _select_profile(1.0, _PROFILES, profile, + hysteresis_down=10, counters=counters) + assert profile == WALK + + def test_counters_cleared_on_stable(self): + counters = {} + # Partial upgrade attempt then target returns to walk range + _select_profile(3.0, _PROFILES, WALK, hysteresis_up=5, counters=counters) + _select_profile(3.0, _PROFILES, WALK, hysteresis_up=5, counters=counters) + assert JOG in counters # counter accumulated + _select_profile(1.0, _PROFILES, WALK, hysteresis_up=5, counters=counters) + assert len(counters) == 0 # cleared when stable + + def test_above_all_profiles_selects_ride(self): + profile = _select_profile(20.0, _PROFILES, RIDE) + assert profile == RIDE + + def test_boundary_walk_max(self): + # 2.0 m/s is exactly at walk max → stays walk + assert _select_profile(2.0, _PROFILES, WALK) == WALK + + def test_just_above_walk_max_triggers_upgrade(self): + counters = {} + profile = WALK + for _ in range(5): + profile = _select_profile(2.01, _PROFILES, profile, + hysteresis_up=5, counters=counters) + assert profile == JOG + + +# ── Acceleration limiter ────────────────────────────────────────────────────── + +class TestAccelLimit: + _DT = 0.02 # 50 Hz + + def test_accel_step(self): + # 0 → 1 at 0.5 m/s², 0.02s tick → step = 0.01 + v = _apply_accel_limit(0.0, 1.0, 0.5, 1.0, self._DT) + assert v == pytest.approx(0.01) + + def test_decel_step(self): + # 1 → 0 at 1.0 m/s², 0.02s → step = 0.02 + v = _apply_accel_limit(1.0, 0.0, 0.5, 1.0, self._DT) + assert v == pytest.approx(0.98) + + def test_does_not_overshoot(self): + # Close to target (gap < step) + v = _apply_accel_limit(0.995, 1.0, 0.5, 1.0, self._DT) + assert v == pytest.approx(1.0) + + def test_does_not_undershoot(self): + v = _apply_accel_limit(0.005, 0.0, 0.5, 1.0, self._DT) + assert v == pytest.approx(0.0) + + def test_negative_to_positive(self): + # -1 → 0 (decelerating negative) at decel_limit + v = _apply_accel_limit(-1.0, 0.0, 0.5, 1.0, self._DT) + assert v == pytest.approx(-0.98) # reduced magnitude + + def test_zero_to_zero(self): + assert _apply_accel_limit(0.0, 0.0, 0.5, 1.0, self._DT) == pytest.approx(0.0) + + def test_ride_accel_gentler_than_walk(self): + dt = 0.02 + v_walk = _apply_accel_limit(0.0, 5.0, + _PROFILES[WALK]["accel_limit"], + _PROFILES[WALK]["decel_limit"], dt) + v_ride = _apply_accel_limit(0.0, 5.0, + _PROFILES[RIDE]["accel_limit"], + _PROFILES[RIDE]["decel_limit"], dt) + assert v_walk > v_ride # walk accelerates faster than ride + + def test_time_to_reach_walk_max(self): + # walk: accel=0.6 m/s², target=1.5 m/s, dt=0.02s + # expected ticks ≈ 1.5 / (0.6 * 0.02) = 125 ticks + v = 0.0 + ticks = 0 + for _ in range(200): + v = _apply_accel_limit(v, 1.5, 0.6, 1.0, 0.02) + ticks += 1 + if v >= 1.5 - 1e-6: + break + assert v == pytest.approx(1.5, abs=1e-4) + assert 120 <= ticks <= 130 # ≈ 125 ticks + + +# ── Velocity limiter ────────────────────────────────────────────────────────── + +class TestLimitVelocity: + def test_within_limits(self): + lin, ang = _limit_velocity(1.0, 0.5, 8.0, 1.5) + assert lin == pytest.approx(1.0) + assert ang == pytest.approx(0.5) + + def test_linear_clamped_high(self): + lin, _ = _limit_velocity(10.0, 0.0, 8.0, 1.5) + assert lin == pytest.approx(8.0) + + def test_linear_clamped_low(self): + lin, _ = _limit_velocity(-10.0, 0.0, 8.0, 1.5) + assert lin == pytest.approx(-8.0) + + def test_angular_clamped(self): + _, ang = _limit_velocity(0.0, 5.0, 8.0, 1.5) + assert ang == pytest.approx(1.5) + + def test_zero_passthrough(self): + lin, ang = _limit_velocity(0.0, 0.0, 8.0, 1.5) + assert lin == pytest.approx(0.0) + assert ang == pytest.approx(0.0) + + +# ── Emergency stop detection ────────────────────────────────────────────────── + +class TestEmergencyStop: + def test_triggers_below_stop_threshold(self): + assert _check_emergency_stop(0.2, 3.0, 0.02, 0.4, 3.0) is True + + def test_triggers_on_hard_decel(self): + # target_vel drops from 5 to 3 in 0.02s → decel = 100 m/s² >> 3.0 + assert _check_emergency_stop(3.0, 5.0, 0.02, 0.4, 3.0) is True + + def test_no_trigger_gradual_decel(self): + # 5 → 4 in 0.02s → decel = 50 m/s² >> threshold... wait, 50 > 3 + # Let me use a smaller decel: 5 → 4.9 in 0.02s → decel = 5 m/s²... still > 3 + # Use 5 → 4.9 in 0.1s → decel = 1.0 m/s² + assert _check_emergency_stop(4.9, 5.0, 0.1, 0.4, 3.0) is False + + def test_no_trigger_at_cruising_speed(self): + assert _check_emergency_stop(6.0, 6.0, 0.02, 0.4, 3.0) is False + + def test_boundary_exactly_at_stop_threshold(self): + # target_vel == stop_threshold → NOT < → no trigger + # prev must be close enough that decel check doesn't also fire: + # (0.42 - 0.40) / 0.02 = 1.0 m/s² < 3.0 → OK + assert _check_emergency_stop(0.4, 0.42, 0.02, 0.4, 3.0) is False + + def test_triggers_just_below_threshold(self): + assert _check_emergency_stop(0.39, 1.0, 0.02, 0.4, 3.0) is True + + +# ── GPS runaway detection ───────────────────────────────────────────────────── + +class TestGpsRunaway: + def test_runaway_detected(self): + # GPS 5 m/s, commanded 2 m/s → 5 > 2*1.5=3, 5 > 8*0.5=4 → True + assert _check_gps_runaway(5.0, 2.0, 1.5, 8.0) is True + + def test_no_runaway_normal(self): + # GPS 2.1 m/s, commanded 2 m/s → 2.1 not > 3.0 + assert _check_gps_runaway(2.1, 2.0, 1.5, 8.0) is False + + def test_no_runaway_stationary(self): + # Robot stopped (GPS 0.1 m/s, cmd 0), reference = max(0,0.5) = 0.5 + # 0.1 not > 0.5*1.5=0.75 + assert _check_gps_runaway(0.1, 0.0, 1.5, 1.5) is False + + def test_runaway_only_at_meaningful_speed(self): + # GPS=6, cmd=4, factor=1.5 → 6 > 4*1.5=6 → False (not strictly greater) + assert _check_gps_runaway(6.0, 4.0, 1.5, 8.0) is False + + def test_runaway_needs_gps_above_half_profile_max(self): + # Even if ratio is high, below 50% of profile max → no trigger + # profile_max=8 → threshold=4; GPS=3 < 4 → no trigger + assert _check_gps_runaway(3.0, 0.5, 1.5, 8.0) is False + + +# ── Emergency decel step ────────────────────────────────────────────────────── + +class TestEmergencyDecelStep: + _DT = 0.02 + _DECEL = 2.0 + + def test_positive_speed_decreases(self): + v = _emergency_decel_step(5.0, self._DECEL, self._DT) + assert v == pytest.approx(5.0 - 2.0 * 0.02) + + def test_negative_speed_increases(self): + v = _emergency_decel_step(-5.0, self._DECEL, self._DT) + assert v == pytest.approx(-5.0 + 2.0 * 0.02) + + def test_does_not_go_past_zero_positive(self): + v = _emergency_decel_step(0.03, self._DECEL, self._DT) + assert v == pytest.approx(0.0) # 0.03 - 0.04 → clamped to 0 + + def test_does_not_go_past_zero_negative(self): + v = _emergency_decel_step(-0.03, self._DECEL, self._DT) + assert v == pytest.approx(0.0) + + def test_zero_stays_zero(self): + assert _emergency_decel_step(0.0, self._DECEL, self._DT) == 0.0 + + def test_full_stop_from_ride_speed(self): + # 8 m/s at 2.0 m/s² → ticks to stop = 8 / (2.0 * 0.02) = 200 ticks = 4s + v = 8.0 + ticks = 0 + for _ in range(500): + v = _emergency_decel_step(v, 2.0, 0.02) + ticks += 1 + if v == 0.0: + break + assert v == 0.0 + assert 195 <= ticks <= 205 + + +# ── Integration: profile + accel + limit ───────────────────────────────────── + +class TestIntegration: + """End-to-end control step simulation.""" + + def _step(self, current, request, profile_name, dt=0.02): + p = _PROFILES[profile_name] + capped_lin, capped_ang = _limit_velocity( + request, 0.0, p["max_linear_vel"], p["max_angular_vel"]) + new_lin = _apply_accel_limit( + current, capped_lin, p["accel_limit"], p["decel_limit"], dt) + return new_lin + + def test_walk_stays_under_1p5(self): + v = 0.0 + for _ in range(300): + v = self._step(v, 5.0, WALK) # request far above limit + assert v <= 1.5 + 1e-6 + + def test_ride_reaches_8_from_zero(self): + v = 0.0 + for _ in range(5000): + v = self._step(v, 8.0, RIDE) + assert v == pytest.approx(8.0, abs=1e-4) + + def test_emergency_overrides_profile(self): + v = 6.0 # at ride speed + # Emergency: decel toward 0 + for _ in range(500): + v = _emergency_decel_step(v, 2.0, 0.02) + if v == 0.0: + break + assert v == pytest.approx(0.0) + + def test_profile_upgrade_then_downgrade_cycle(self): + counters = {} + profile = WALK + # Upgrade: 5 ticks at jog speed + for _ in range(5): + profile = _select_profile(3.0, _PROFILES, profile, + hysteresis_up=5, counters=counters) + assert profile == JOG + # Downgrade: 10 ticks at walk speed + counters = {} + for _ in range(10): + profile = _select_profile(1.0, _PROFILES, profile, + hysteresis_down=10, counters=counters) + assert profile == WALK