Merge pull request 'feat: outdoor adaptive speed controller — walk/jog/ride profiles up to 8 m/s' (#76) from sl-controls/outdoor-speed into main
This commit is contained in:
commit
4be93669a1
@ -0,0 +1,96 @@
|
|||||||
|
# speed_profiles.yaml
|
||||||
|
# Outdoor adaptive speed profiles for SaltyBot follow-me mode.
|
||||||
|
#
|
||||||
|
# Run with:
|
||||||
|
# ros2 launch saltybot_speed_controller outdoor_speed.launch.py
|
||||||
|
#
|
||||||
|
# IMPORTANT: This controller outputs /cmd_vel which feeds into cmd_vel_bridge
|
||||||
|
# (PR #46). The bridge applies its own hard caps (max_linear_vel=0.5 m/s).
|
||||||
|
# For outdoor/ride profiles you MUST re-launch cmd_vel_bridge with higher limits:
|
||||||
|
# ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=8.0
|
||||||
|
#
|
||||||
|
# Data flow:
|
||||||
|
# person_follower → /cmd_vel_raw → [speed_controller] → /cmd_vel → cmd_vel_bridge → STM32
|
||||||
|
|
||||||
|
# ── Controller ─────────────────────────────────────────────────────────────────
|
||||||
|
control_rate: 50.0 # Hz — 50ms tick, same as cmd_vel_bridge
|
||||||
|
input_topic: /cmd_vel_raw # Upstream cmd_vel source
|
||||||
|
output_topic: /cmd_vel # Output to cmd_vel_bridge
|
||||||
|
initial_profile: walk # Start in safest profile until target velocity known
|
||||||
|
|
||||||
|
# ── Profile selection ──────────────────────────────────────────────────────────
|
||||||
|
# Hysteresis prevents rapid walk↔jog↔ride oscillation.
|
||||||
|
# up_ticks: consecutive ticks above threshold before upgrading profile (50Hz → 5 = 100ms)
|
||||||
|
# down_ticks: ticks below threshold before downgrading (50Hz → 15 = 300ms)
|
||||||
|
# Downgrade is slower (safety: prefer lower profile when uncertain)
|
||||||
|
hysteresis_up_ticks: 5
|
||||||
|
hysteresis_down_ticks: 15
|
||||||
|
|
||||||
|
# ── Emergency stop ─────────────────────────────────────────────────────────────
|
||||||
|
# Triggered when target stops suddenly:
|
||||||
|
# (a) target velocity drops below stop_vel_threshold, OR
|
||||||
|
# (b) target deceleration exceeds hard_stop_decel_threshold m/s²
|
||||||
|
#
|
||||||
|
# emergency_decel_mps2: how fast the robot decelerates to zero.
|
||||||
|
# Conservative (0.5-1.0 m/s²) is safer for balance; too fast = tip-over.
|
||||||
|
# At ride speed (8 m/s), 2.0 m/s² = 4 second stop distance ≈ 16m — acceptable.
|
||||||
|
emergency_decel_mps2: 2.0 # m/s² — hard brake limit
|
||||||
|
stop_vel_threshold: 0.4 # m/s — target speed below this = stopped
|
||||||
|
hard_stop_decel_threshold: 3.0 # m/s² — target decel above this = hard stop
|
||||||
|
|
||||||
|
# ── GPS runaway protection ─────────────────────────────────────────────────────
|
||||||
|
# If GPS speed > commanded_speed × runaway_factor AND GPS speed > 50% of
|
||||||
|
# profile max, trigger emergency decel (catches downhill runaways).
|
||||||
|
gps_runaway_factor: 1.5 # 1.5× commanded = runaway
|
||||||
|
gps_timeout: 3.0 # s — ignore GPS if stale (tunnel, bridge)
|
||||||
|
|
||||||
|
# ── UWB range ─────────────────────────────────────────────────────────────────
|
||||||
|
uwb_range_field: range # JSON field name in /uwb/ranges messages
|
||||||
|
uwb_timeout: 2.0 # s — use GPS-only profile if UWB stale
|
||||||
|
|
||||||
|
# ── Speed profiles ─────────────────────────────────────────────────────────────
|
||||||
|
# Each profile applies when estimated target velocity is in [0, target_vel_max].
|
||||||
|
# Velocity limits are BEFORE cmd_vel_bridge caps (bridge must also be set high).
|
||||||
|
#
|
||||||
|
# Acceleration notes:
|
||||||
|
# - Balance robot needs time to adjust lean angle when speed changes.
|
||||||
|
# - accel_limit at RIDE (0.3 m/s²) means 0→8 m/s takes ~27s — intentionally slow.
|
||||||
|
# - Increase only after validating balance PID tuning at speed.
|
||||||
|
# - decel_limit can be slightly higher than accel_limit (braking > accelerating).
|
||||||
|
|
||||||
|
walk:
|
||||||
|
# Person walking at 0–2 m/s (≈ 0–7 km/h)
|
||||||
|
max_linear_vel: 1.5 # m/s (= 5.4 km/h)
|
||||||
|
max_angular_vel: 1.2 # rad/s
|
||||||
|
accel_limit: 0.6 # m/s² — moderate, balance handles it
|
||||||
|
decel_limit: 1.0 # m/s²
|
||||||
|
target_vel_max: 2.0 # m/s — target faster than this → upgrade to jog
|
||||||
|
|
||||||
|
jog:
|
||||||
|
# Person jogging at 2–5 m/s (≈ 7–18 km/h)
|
||||||
|
max_linear_vel: 3.0 # m/s (= 10.8 km/h)
|
||||||
|
max_angular_vel: 1.5 # rad/s
|
||||||
|
accel_limit: 0.5 # m/s² — a bit gentler at higher speed
|
||||||
|
decel_limit: 0.8 # m/s²
|
||||||
|
target_vel_max: 5.0 # m/s — target faster than this → upgrade to ride
|
||||||
|
|
||||||
|
ride:
|
||||||
|
# Person on EUC at 5–15 m/s (≈ 18–54 km/h / 20–30 km/h typical follow-me)
|
||||||
|
max_linear_vel: 8.0 # m/s (= 28.8 km/h)
|
||||||
|
max_angular_vel: 1.5 # rad/s — conservative for balance at speed
|
||||||
|
accel_limit: 0.3 # m/s² — very gentle: balance dynamics change at 8 m/s
|
||||||
|
decel_limit: 0.5 # m/s² — gradual stop from 8 m/s ≈ 16s / ~64m
|
||||||
|
target_vel_max: 15.0 # m/s — cap; EUC max ~30 km/h = 8.3 m/s typical
|
||||||
|
|
||||||
|
# ── Notes ─────────────────────────────────────────────────────────────────────
|
||||||
|
# 1. To enable ride profile, the Jetson → STM32 cmd_vel_bridge must also be
|
||||||
|
# reconfigured: max_linear_vel=8.0, ramp_rate=500 → consider ramp_rate=150
|
||||||
|
# at ride speed (slower ramp = smoother balance).
|
||||||
|
#
|
||||||
|
# 2. The STM32 balance PID gains likely need retuning for ride speed. Expect
|
||||||
|
# increased sensitivity to pitch angle errors at 8 m/s vs 0.5 m/s.
|
||||||
|
#
|
||||||
|
# 3. Test sequence recommendation:
|
||||||
|
# - Validate walk profile on flat indoor surface first
|
||||||
|
# - Test jog profile in controlled outdoor space
|
||||||
|
# - Only enable ride profile after verified balance at jog speeds
|
||||||
@ -0,0 +1,155 @@
|
|||||||
|
"""
|
||||||
|
outdoor_speed.launch.py — Outdoor adaptive speed controller.
|
||||||
|
|
||||||
|
Inserts SpeedControllerNode between the person_follower (/cmd_vel_raw)
|
||||||
|
and cmd_vel_bridge (/cmd_vel), providing adaptive walk/jog/ride profiles
|
||||||
|
with smooth acceleration curves and emergency deceleration.
|
||||||
|
|
||||||
|
IMPORTANT: For ride profile (up to 8 m/s) you must ALSO re-launch
|
||||||
|
cmd_vel_bridge with matching limits:
|
||||||
|
ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=8.0
|
||||||
|
|
||||||
|
Prerequisite node pipeline:
|
||||||
|
person_follower → /cmd_vel_raw → [speed_controller] → /cmd_vel → cmd_vel_bridge → STM32
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
# Defaults (walk profile initially, adapts via UWB + GPS):
|
||||||
|
ros2 launch saltybot_speed_controller outdoor_speed.launch.py
|
||||||
|
|
||||||
|
# Force ride profile limits from start:
|
||||||
|
ros2 launch saltybot_speed_controller outdoor_speed.launch.py initial_profile:=ride
|
||||||
|
|
||||||
|
# Tune jog profile max speed:
|
||||||
|
ros2 launch saltybot_speed_controller outdoor_speed.launch.py jog_max_vel:=4.0
|
||||||
|
|
||||||
|
# Gentle emergency decel (longer stop distance, safer for balance):
|
||||||
|
ros2 launch saltybot_speed_controller outdoor_speed.launch.py emergency_decel:=1.0
|
||||||
|
|
||||||
|
# Full outdoor follow-me pipeline (speed controller + person follower):
|
||||||
|
ros2 launch saltybot_speed_controller outdoor_speed.launch.py \
|
||||||
|
launch_person_follower:=true
|
||||||
|
|
||||||
|
Topics:
|
||||||
|
/cmd_vel_raw — Input (from person_follower or Nav2)
|
||||||
|
/cmd_vel — Output (to cmd_vel_bridge)
|
||||||
|
/speed_controller/profile — Active profile + status (JSON String)
|
||||||
|
/gps/vel — Ground truth speed input (from gps_driver_node)
|
||||||
|
/uwb/ranges — Target range input (for velocity estimation)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, OpaqueFunction
|
||||||
|
from launch.launch_description_sources import PythonLaunchDescriptionSource
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def _launch_nodes(context, *args, **kwargs):
|
||||||
|
params_file = LaunchConfiguration("params_file").perform(context)
|
||||||
|
|
||||||
|
def _s(k): return LaunchConfiguration(k).perform(context)
|
||||||
|
def _f(k): return float(LaunchConfiguration(k).perform(context))
|
||||||
|
def _i(k): return int(LaunchConfiguration(k).perform(context))
|
||||||
|
def _b(k): return LaunchConfiguration(k).perform(context).lower() == "true"
|
||||||
|
|
||||||
|
inline_params = {
|
||||||
|
"control_rate": _f("control_rate"),
|
||||||
|
"input_topic": _s("input_topic"),
|
||||||
|
"initial_profile": _s("initial_profile"),
|
||||||
|
"hysteresis_up_ticks": _i("hysteresis_up_ticks"),
|
||||||
|
"hysteresis_down_ticks": _i("hysteresis_down_ticks"),
|
||||||
|
"emergency_decel_mps2": _f("emergency_decel"),
|
||||||
|
"stop_vel_threshold": _f("stop_vel_threshold"),
|
||||||
|
"hard_stop_decel_threshold": _f("hard_stop_decel_threshold"),
|
||||||
|
"gps_runaway_factor": _f("gps_runaway_factor"),
|
||||||
|
"gps_timeout": _f("gps_timeout"),
|
||||||
|
"uwb_timeout": _f("uwb_timeout"),
|
||||||
|
# Per-profile params
|
||||||
|
"walk.max_linear_vel": _f("walk_max_vel"),
|
||||||
|
"walk.accel_limit": _f("walk_accel"),
|
||||||
|
"walk.decel_limit": _f("walk_decel"),
|
||||||
|
"jog.max_linear_vel": _f("jog_max_vel"),
|
||||||
|
"jog.accel_limit": _f("jog_accel"),
|
||||||
|
"jog.decel_limit": _f("jog_decel"),
|
||||||
|
"ride.max_linear_vel": _f("ride_max_vel"),
|
||||||
|
"ride.accel_limit": _f("ride_accel"),
|
||||||
|
"ride.decel_limit": _f("ride_decel"),
|
||||||
|
}
|
||||||
|
node_params = [params_file, inline_params] if params_file else [inline_params]
|
||||||
|
|
||||||
|
nodes = [
|
||||||
|
Node(
|
||||||
|
package="saltybot_speed_controller",
|
||||||
|
executable="speed_controller_node",
|
||||||
|
name="speed_controller",
|
||||||
|
output="screen",
|
||||||
|
parameters=node_params,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
if _b("launch_person_follower"):
|
||||||
|
try:
|
||||||
|
follower_pkg = get_package_share_directory("saltybot_follower")
|
||||||
|
follower_launch = os.path.join(
|
||||||
|
follower_pkg, "launch", "person_follower.launch.py")
|
||||||
|
nodes.append(IncludeLaunchDescription(
|
||||||
|
PythonLaunchDescriptionSource(follower_launch),
|
||||||
|
launch_arguments={"cmd_vel_topic": "/cmd_vel_raw"}.items(),
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
pass # package not built yet — skip silently
|
||||||
|
|
||||||
|
return nodes
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg_share = get_package_share_directory("saltybot_speed_controller")
|
||||||
|
default_params = os.path.join(pkg_share, "config", "speed_profiles.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"params_file", default_value=default_params,
|
||||||
|
description="Full path to speed_profiles.yaml"),
|
||||||
|
|
||||||
|
# General
|
||||||
|
DeclareLaunchArgument("control_rate", default_value="50.0"),
|
||||||
|
DeclareLaunchArgument("input_topic", default_value="/cmd_vel_raw"),
|
||||||
|
DeclareLaunchArgument("initial_profile", default_value="walk"),
|
||||||
|
DeclareLaunchArgument("hysteresis_up_ticks", default_value="5"),
|
||||||
|
DeclareLaunchArgument("hysteresis_down_ticks",default_value="15"),
|
||||||
|
DeclareLaunchArgument("launch_person_follower", default_value="false",
|
||||||
|
description="Also launch saltybot_follower with /cmd_vel_raw output"),
|
||||||
|
|
||||||
|
# Emergency
|
||||||
|
DeclareLaunchArgument("emergency_decel", default_value="2.0",
|
||||||
|
description="Emergency decel limit (m/s²)"),
|
||||||
|
DeclareLaunchArgument("stop_vel_threshold", default_value="0.4"),
|
||||||
|
DeclareLaunchArgument("hard_stop_decel_threshold",default_value="3.0"),
|
||||||
|
|
||||||
|
# GPS runaway
|
||||||
|
DeclareLaunchArgument("gps_runaway_factor", default_value="1.5"),
|
||||||
|
DeclareLaunchArgument("gps_timeout", default_value="3.0"),
|
||||||
|
DeclareLaunchArgument("uwb_timeout", default_value="2.0"),
|
||||||
|
|
||||||
|
# Walk profile
|
||||||
|
DeclareLaunchArgument("walk_max_vel", default_value="1.5"),
|
||||||
|
DeclareLaunchArgument("walk_accel", default_value="0.6"),
|
||||||
|
DeclareLaunchArgument("walk_decel", default_value="1.0"),
|
||||||
|
|
||||||
|
# Jog profile
|
||||||
|
DeclareLaunchArgument("jog_max_vel", default_value="3.0"),
|
||||||
|
DeclareLaunchArgument("jog_accel", default_value="0.5"),
|
||||||
|
DeclareLaunchArgument("jog_decel", default_value="0.8"),
|
||||||
|
|
||||||
|
# Ride profile
|
||||||
|
DeclareLaunchArgument("ride_max_vel", default_value="8.0",
|
||||||
|
description="Max speed for EUC follow-me (m/s)"),
|
||||||
|
DeclareLaunchArgument("ride_accel", default_value="0.3",
|
||||||
|
description="Gentle accel — balance sensitive at 8 m/s"),
|
||||||
|
DeclareLaunchArgument("ride_decel", default_value="0.5"),
|
||||||
|
|
||||||
|
OpaqueFunction(function=_launch_nodes),
|
||||||
|
])
|
||||||
31
jetson/ros2_ws/src/saltybot_speed_controller/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_speed_controller/package.xml
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_speed_controller</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Outdoor adaptive speed controller for SaltyBot follow-me mode.
|
||||||
|
Inserts between person_follower (/cmd_vel_raw) and cmd_vel_bridge (/cmd_vel),
|
||||||
|
selecting walk/jog/ride profiles based on estimated target velocity (UWB + GPS),
|
||||||
|
applying smooth trapezoidal acceleration curves, GPS runaway protection,
|
||||||
|
and emergency deceleration for balance stability at speeds up to 8 m/s.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,491 @@
|
|||||||
|
"""
|
||||||
|
speed_controller_node.py — Outdoor adaptive speed controller for SaltyBot.
|
||||||
|
|
||||||
|
Sits between upstream cmd_vel sources (person_follower, Nav2) and the
|
||||||
|
cmd_vel_bridge (PR #46), providing:
|
||||||
|
|
||||||
|
• Adaptive speed profiles (walk / jog / ride) selected from UWB target
|
||||||
|
velocity and GPS ground-truth speed
|
||||||
|
• Smooth acceleration curves (trapezoidal ramp) per profile, tuned for
|
||||||
|
balance stability at speed — more aggressive jerk at low speed, gentler
|
||||||
|
at ride speeds (EUC follow-me at 20-30 km/h)
|
||||||
|
• Emergency deceleration when the target stops suddenly (detected via
|
||||||
|
UWB range acceleration and target velocity drop)
|
||||||
|
• GPS runaway protection if GPS ground speed significantly overshoots
|
||||||
|
the commanded velocity (downhill, slippage)
|
||||||
|
|
||||||
|
Data flow:
|
||||||
|
/cmd_vel_raw (Twist) — uncapped request from person_follower/Nav2
|
||||||
|
/gps/vel (TwistStamped) — robot ground-truth speed (GPS, 1 Hz)
|
||||||
|
/uwb/ranges (String JSON) — UWB range to target (for velocity estimate)
|
||||||
|
↓
|
||||||
|
SpeedControllerNode
|
||||||
|
↓
|
||||||
|
/cmd_vel (Twist) — smoothed, profile-capped output → cmd_vel_bridge
|
||||||
|
|
||||||
|
Profiles
|
||||||
|
────────
|
||||||
|
walk : ≤ 1.5 m/s (person walking 0–2 m/s)
|
||||||
|
jog : ≤ 3.0 m/s (person jogging 2–5 m/s)
|
||||||
|
ride : ≤ 8.0 m/s (person on EUC 5–15 m/s)
|
||||||
|
|
||||||
|
Profile selection uses UWB-derived target velocity with hysteresis to avoid
|
||||||
|
rapid switching. GPS ground speed provides a secondary confirmation and
|
||||||
|
runaway guard.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import collections
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from geometry_msgs.msg import Twist, TwistStamped
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure control functions ────────────────────────────────────────────────────
|
||||||
|
# Extracted here so unit tests run without a ROS2 runtime.
|
||||||
|
|
||||||
|
# Profile name constants
|
||||||
|
WALK = "walk"
|
||||||
|
JOG = "jog"
|
||||||
|
RIDE = "ride"
|
||||||
|
|
||||||
|
_PROFILE_ORDER = [WALK, JOG, RIDE]
|
||||||
|
|
||||||
|
|
||||||
|
def estimate_target_velocity(prev_range: float, curr_range: float,
|
||||||
|
dt: float, robot_gps_speed: float) -> float:
|
||||||
|
"""
|
||||||
|
Estimate absolute target velocity from UWB range derivative + GPS.
|
||||||
|
|
||||||
|
When the robot follows a target:
|
||||||
|
range_rate = d(range)/dt (positive = target moving away)
|
||||||
|
target_vel ≈ robot_gps_speed + range_rate
|
||||||
|
|
||||||
|
This gives a rough scalar speed suitable for profile classification.
|
||||||
|
Returns 0 if dt ≤ 0 or range values are invalid.
|
||||||
|
"""
|
||||||
|
if dt <= 0 or prev_range <= 0 or curr_range <= 0:
|
||||||
|
return robot_gps_speed # fall back to robot's own speed
|
||||||
|
range_rate = (curr_range - prev_range) / dt
|
||||||
|
return max(0.0, robot_gps_speed + range_rate)
|
||||||
|
|
||||||
|
|
||||||
|
def select_profile(target_vel: float, profiles: dict,
|
||||||
|
current_profile: str,
|
||||||
|
hysteresis_up: int = 5,
|
||||||
|
hysteresis_down: int = 10,
|
||||||
|
_counters: dict = None) -> str:
|
||||||
|
"""
|
||||||
|
Select speed profile with up/down hysteresis.
|
||||||
|
|
||||||
|
target_vel : estimated target velocity (m/s)
|
||||||
|
profiles : dict of profile_name → {target_vel_min, target_vel_max, ...}
|
||||||
|
current_profile: currently active profile
|
||||||
|
hysteresis_up : ticks before switching to a faster profile
|
||||||
|
hysteresis_down: ticks before switching to a slower profile
|
||||||
|
|
||||||
|
Returns the (possibly unchanged) profile name.
|
||||||
|
|
||||||
|
_counters is an internal mutable dict passed by the caller for state
|
||||||
|
(avoids making this a method). Pass {} on first call; reuse thereafter.
|
||||||
|
"""
|
||||||
|
if _counters is None:
|
||||||
|
_counters = {}
|
||||||
|
|
||||||
|
# Determine which profile best fits the target velocity
|
||||||
|
desired = current_profile
|
||||||
|
for name in _PROFILE_ORDER:
|
||||||
|
p = profiles[name]
|
||||||
|
if target_vel <= p["target_vel_max"]:
|
||||||
|
desired = name
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
desired = RIDE # above all max → stay at highest
|
||||||
|
|
||||||
|
if desired == current_profile:
|
||||||
|
_counters.clear()
|
||||||
|
return current_profile
|
||||||
|
|
||||||
|
key = desired
|
||||||
|
_counters[key] = _counters.get(key, 0) + 1
|
||||||
|
|
||||||
|
idx_desired = _PROFILE_ORDER.index(desired)
|
||||||
|
idx_current = _PROFILE_ORDER.index(current_profile)
|
||||||
|
threshold = hysteresis_up if idx_desired > idx_current else hysteresis_down
|
||||||
|
|
||||||
|
if _counters[key] >= threshold:
|
||||||
|
_counters.clear()
|
||||||
|
return desired
|
||||||
|
return current_profile
|
||||||
|
|
||||||
|
|
||||||
|
def apply_accel_limit(current: float, target: float,
|
||||||
|
accel_limit: float, decel_limit: float,
|
||||||
|
dt: float) -> float:
|
||||||
|
"""
|
||||||
|
Apply symmetric trapezoidal acceleration / deceleration limit.
|
||||||
|
|
||||||
|
current : current velocity (m/s)
|
||||||
|
target : desired velocity (m/s)
|
||||||
|
accel_limit : max acceleration (m/s²)
|
||||||
|
decel_limit : max deceleration (m/s²) — positive value
|
||||||
|
dt : time step (s)
|
||||||
|
|
||||||
|
Returns new velocity, ramped toward target within the limits.
|
||||||
|
"""
|
||||||
|
delta = target - current
|
||||||
|
# Decelerating = magnitude is decreasing (current and delta have opposite signs)
|
||||||
|
decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0)
|
||||||
|
limit = decel_limit if decelerating else accel_limit
|
||||||
|
if delta > 0:
|
||||||
|
step = min(delta, limit * dt)
|
||||||
|
else:
|
||||||
|
step = max(delta, -limit * dt)
|
||||||
|
return current + step
|
||||||
|
|
||||||
|
|
||||||
|
def limit_velocity(linear_x: float, angular_z: float,
|
||||||
|
max_linear: float, max_angular: float) -> tuple:
|
||||||
|
"""Hard-clamp linear and angular velocity to profile limits."""
|
||||||
|
lin = max(-max_linear, min(max_linear, linear_x))
|
||||||
|
ang = max(-max_angular, min(max_angular, angular_z))
|
||||||
|
return lin, ang
|
||||||
|
|
||||||
|
|
||||||
|
def check_emergency_stop(target_vel: float, target_vel_prev: float,
|
||||||
|
dt: float,
|
||||||
|
stop_vel_threshold: float,
|
||||||
|
hard_decel_threshold: float) -> bool:
|
||||||
|
"""
|
||||||
|
Detect sudden target stop requiring emergency deceleration.
|
||||||
|
|
||||||
|
Triggers if:
|
||||||
|
(a) target velocity drops below stop_vel_threshold, OR
|
||||||
|
(b) target deceleration exceeds hard_decel_threshold (m/s²)
|
||||||
|
|
||||||
|
Returns True if emergency stop should be engaged.
|
||||||
|
"""
|
||||||
|
if target_vel < stop_vel_threshold:
|
||||||
|
return True
|
||||||
|
if dt > 0:
|
||||||
|
target_decel = (target_vel_prev - target_vel) / dt # positive = decelerating
|
||||||
|
if target_decel > hard_decel_threshold:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def check_gps_runaway(gps_speed: float, cmd_vel: float,
|
||||||
|
runaway_factor: float, profile_max: float) -> bool:
|
||||||
|
"""
|
||||||
|
Detect GPS runaway: robot moving significantly faster than commanded.
|
||||||
|
|
||||||
|
Triggers if gps_speed > max(cmd_vel, 0.5) * runaway_factor
|
||||||
|
AND gps_speed > profile_max (not a sensor glitch on a stationary robot).
|
||||||
|
"""
|
||||||
|
reference = max(abs(cmd_vel), 0.5)
|
||||||
|
return gps_speed > reference * runaway_factor and gps_speed > profile_max * 0.5
|
||||||
|
|
||||||
|
|
||||||
|
def emergency_decel_step(current_vel: float, decel_limit: float, dt: float) -> float:
|
||||||
|
"""Decelerate toward zero at emergency_decel_limit. Returns new velocity."""
|
||||||
|
if current_vel > 0:
|
||||||
|
return max(0.0, current_vel - decel_limit * dt)
|
||||||
|
if current_vel < 0:
|
||||||
|
return min(0.0, current_vel + decel_limit * dt)
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Default profiles (overridable via YAML) ──────────────────────────────────
|
||||||
|
|
||||||
|
DEFAULT_PROFILES = {
|
||||||
|
WALK: {
|
||||||
|
"max_linear_vel": 1.5,
|
||||||
|
"max_angular_vel": 1.2,
|
||||||
|
"accel_limit": 0.6, # m/s²
|
||||||
|
"decel_limit": 1.0, # m/s²
|
||||||
|
"target_vel_min": 0.0,
|
||||||
|
"target_vel_max": 2.0,
|
||||||
|
},
|
||||||
|
JOG: {
|
||||||
|
"max_linear_vel": 3.0,
|
||||||
|
"max_angular_vel": 1.5,
|
||||||
|
"accel_limit": 0.5,
|
||||||
|
"decel_limit": 0.8,
|
||||||
|
"target_vel_min": 2.0,
|
||||||
|
"target_vel_max": 5.0,
|
||||||
|
},
|
||||||
|
RIDE: {
|
||||||
|
"max_linear_vel": 8.0,
|
||||||
|
"max_angular_vel": 1.5, # conservative for balance at speed
|
||||||
|
"accel_limit": 0.3, # very gentle — balance needs time at 8 m/s
|
||||||
|
"decel_limit": 0.5,
|
||||||
|
"target_vel_min": 5.0,
|
||||||
|
"target_vel_max": 15.0,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class SpeedControllerNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("speed_controller")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("control_rate", 50.0)
|
||||||
|
self.declare_parameter("input_topic", "/cmd_vel_raw")
|
||||||
|
self.declare_parameter("output_topic", "/cmd_vel")
|
||||||
|
self.declare_parameter("initial_profile", WALK)
|
||||||
|
|
||||||
|
# Profile selection
|
||||||
|
self.declare_parameter("hysteresis_up_ticks", 5)
|
||||||
|
self.declare_parameter("hysteresis_down_ticks", 15)
|
||||||
|
|
||||||
|
# Emergency stop
|
||||||
|
self.declare_parameter("emergency_decel_mps2", 2.0)
|
||||||
|
self.declare_parameter("stop_vel_threshold", 0.4) # m/s
|
||||||
|
self.declare_parameter("hard_stop_decel_threshold", 3.0) # m/s²
|
||||||
|
|
||||||
|
# GPS runaway
|
||||||
|
self.declare_parameter("gps_runaway_factor", 1.5) # 1.5× commanded
|
||||||
|
self.declare_parameter("gps_timeout", 3.0) # s before ignoring GPS
|
||||||
|
|
||||||
|
# UWB range
|
||||||
|
self.declare_parameter("uwb_range_field", "range")
|
||||||
|
self.declare_parameter("uwb_timeout", 2.0)
|
||||||
|
|
||||||
|
# Per-profile overrides (flat params, merged into DEFAULT_PROFILES)
|
||||||
|
for profile in _PROFILE_ORDER:
|
||||||
|
self.declare_parameter(f"{profile}.max_linear_vel",
|
||||||
|
DEFAULT_PROFILES[profile]["max_linear_vel"])
|
||||||
|
self.declare_parameter(f"{profile}.max_angular_vel",
|
||||||
|
DEFAULT_PROFILES[profile]["max_angular_vel"])
|
||||||
|
self.declare_parameter(f"{profile}.accel_limit",
|
||||||
|
DEFAULT_PROFILES[profile]["accel_limit"])
|
||||||
|
self.declare_parameter(f"{profile}.decel_limit",
|
||||||
|
DEFAULT_PROFILES[profile]["decel_limit"])
|
||||||
|
self.declare_parameter(f"{profile}.target_vel_max",
|
||||||
|
DEFAULT_PROFILES[profile]["target_vel_max"])
|
||||||
|
|
||||||
|
self._profiles = self._load_profiles()
|
||||||
|
self._p = {
|
||||||
|
"control_rate": self.get_parameter("control_rate").value,
|
||||||
|
"hysteresis_up": self.get_parameter("hysteresis_up_ticks").value,
|
||||||
|
"hysteresis_down": self.get_parameter("hysteresis_down_ticks").value,
|
||||||
|
"emergency_decel": self.get_parameter("emergency_decel_mps2").value,
|
||||||
|
"stop_vel_threshold": self.get_parameter("stop_vel_threshold").value,
|
||||||
|
"hard_stop_decel_threshold": self.get_parameter("hard_stop_decel_threshold").value,
|
||||||
|
"gps_runaway_factor": self.get_parameter("gps_runaway_factor").value,
|
||||||
|
"gps_timeout": self.get_parameter("gps_timeout").value,
|
||||||
|
"uwb_range_field": self.get_parameter("uwb_range_field").value,
|
||||||
|
"uwb_timeout": self.get_parameter("uwb_timeout").value,
|
||||||
|
}
|
||||||
|
dt = 1.0 / self._p["control_rate"]
|
||||||
|
|
||||||
|
# ── State ─────────────────────────────────────────────────────────────
|
||||||
|
self._profile = self.get_parameter("initial_profile").value
|
||||||
|
self._profile_counters: dict = {}
|
||||||
|
self._current_linear = 0.0 # smoothed output linear.x
|
||||||
|
self._current_angular= 0.0 # smoothed output angular.z
|
||||||
|
self._request_linear = 0.0 # latest /cmd_vel_raw linear.x
|
||||||
|
self._request_angular= 0.0 # latest /cmd_vel_raw angular.z
|
||||||
|
self._gps_speed = 0.0 # magnitude from /gps/vel
|
||||||
|
self._gps_stamp = 0.0 # monotonic time of last GPS
|
||||||
|
self._uwb_range = 0.0 # latest range (m)
|
||||||
|
self._uwb_range_prev = 0.0
|
||||||
|
self._uwb_stamp = 0.0
|
||||||
|
self._uwb_stamp_prev = 0.0
|
||||||
|
self._target_vel = 0.0
|
||||||
|
self._target_vel_prev= 0.0
|
||||||
|
self._emergency = False
|
||||||
|
self._dt = dt
|
||||||
|
|
||||||
|
# History for runaway filter (last 3 GPS samples)
|
||||||
|
self._gps_history: collections.deque = collections.deque(maxlen=3)
|
||||||
|
|
||||||
|
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(
|
||||||
|
Twist, self.get_parameter("input_topic").value,
|
||||||
|
self._cmd_raw_cb, 10)
|
||||||
|
self.create_subscription(
|
||||||
|
TwistStamped, "/gps/vel", self._gps_cb, 10)
|
||||||
|
self.create_subscription(
|
||||||
|
String, "/uwb/ranges", self._uwb_cb, 10)
|
||||||
|
|
||||||
|
# ── Publisher ──────────────────────────────────────────────────────────
|
||||||
|
self._cmd_pub = self.create_publisher(
|
||||||
|
Twist, self.get_parameter("output_topic").value, 10)
|
||||||
|
self._profile_pub = self.create_publisher(
|
||||||
|
String, "/speed_controller/profile", 10)
|
||||||
|
|
||||||
|
# ── Control timer ──────────────────────────────────────────────────────
|
||||||
|
self.create_timer(dt, self._control_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"SpeedController ready profile={self._profile} "
|
||||||
|
f"rate={self._p['control_rate']}Hz "
|
||||||
|
f"walk≤{self._profiles[WALK]['max_linear_vel']}m/s "
|
||||||
|
f"jog≤{self._profiles[JOG]['max_linear_vel']}m/s "
|
||||||
|
f"ride≤{self._profiles[RIDE]['max_linear_vel']}m/s")
|
||||||
|
|
||||||
|
def _load_profiles(self) -> dict:
|
||||||
|
"""Merge ROS2 params into DEFAULT_PROFILES."""
|
||||||
|
import copy
|
||||||
|
profiles = copy.deepcopy(DEFAULT_PROFILES)
|
||||||
|
for name in _PROFILE_ORDER:
|
||||||
|
for field in ("max_linear_vel", "max_angular_vel", "accel_limit",
|
||||||
|
"decel_limit", "target_vel_max"):
|
||||||
|
val = self.get_parameter(f"{name}.{field}").value
|
||||||
|
profiles[name][field] = val
|
||||||
|
return profiles
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _cmd_raw_cb(self, msg: Twist):
|
||||||
|
self._request_linear = msg.linear.x
|
||||||
|
self._request_angular = msg.angular.z
|
||||||
|
|
||||||
|
def _gps_cb(self, msg: TwistStamped):
|
||||||
|
vx = msg.twist.linear.x
|
||||||
|
vy = msg.twist.linear.y
|
||||||
|
self._gps_speed = math.sqrt(vx * vx + vy * vy)
|
||||||
|
self._gps_stamp = time.monotonic()
|
||||||
|
self._gps_history.append(self._gps_speed)
|
||||||
|
|
||||||
|
def _uwb_cb(self, msg: String):
|
||||||
|
"""Parse JSON range message, update range estimate."""
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
field = self._p["uwb_range_field"]
|
||||||
|
r = float(data.get(field, data.get("distance", 0.0)))
|
||||||
|
if r > 0:
|
||||||
|
self._uwb_range_prev = self._uwb_range
|
||||||
|
self._uwb_stamp_prev = self._uwb_stamp
|
||||||
|
self._uwb_range = r
|
||||||
|
self._uwb_stamp = time.monotonic()
|
||||||
|
except (json.JSONDecodeError, ValueError, KeyError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ── Control loop ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _control_cb(self):
|
||||||
|
now = time.monotonic()
|
||||||
|
dt = self._dt
|
||||||
|
|
||||||
|
# ── 1. Estimate target velocity from UWB range derivative + GPS ──────
|
||||||
|
uwb_fresh = (now - self._uwb_stamp) < self._p["uwb_timeout"]
|
||||||
|
gps_fresh = (now - self._gps_stamp) < self._p["gps_timeout"]
|
||||||
|
|
||||||
|
if uwb_fresh and self._uwb_stamp_prev > 0:
|
||||||
|
uwb_dt = self._uwb_stamp - self._uwb_stamp_prev
|
||||||
|
robot_speed = self._gps_speed if gps_fresh else self._current_linear
|
||||||
|
self._target_vel_prev = self._target_vel
|
||||||
|
self._target_vel = estimate_target_velocity(
|
||||||
|
self._uwb_range_prev, self._uwb_range, uwb_dt, robot_speed)
|
||||||
|
elif gps_fresh:
|
||||||
|
# No UWB — use GPS speed as proxy for target velocity
|
||||||
|
self._target_vel_prev = self._target_vel
|
||||||
|
self._target_vel = self._gps_speed
|
||||||
|
# else: hold previous estimate
|
||||||
|
|
||||||
|
# ── 2. Profile selection ──────────────────────────────────────────────
|
||||||
|
self._profile = select_profile(
|
||||||
|
self._target_vel, self._profiles, self._profile,
|
||||||
|
self._p["hysteresis_up"], self._p["hysteresis_down"],
|
||||||
|
self._profile_counters,
|
||||||
|
)
|
||||||
|
prof = self._profiles[self._profile]
|
||||||
|
|
||||||
|
# ── 3. Emergency stop detection ───────────────────────────────────────
|
||||||
|
emerg = check_emergency_stop(
|
||||||
|
self._target_vel, self._target_vel_prev, dt,
|
||||||
|
self._p["stop_vel_threshold"],
|
||||||
|
self._p["hard_stop_decel_threshold"],
|
||||||
|
)
|
||||||
|
# GPS runaway check (use smoothed GPS)
|
||||||
|
if gps_fresh:
|
||||||
|
gps_smooth = sum(self._gps_history) / len(self._gps_history)
|
||||||
|
if check_gps_runaway(
|
||||||
|
gps_smooth, self._current_linear,
|
||||||
|
self._p["gps_runaway_factor"],
|
||||||
|
prof["max_linear_vel"]):
|
||||||
|
emerg = True
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"GPS runaway: gps={gps_smooth:.2f} "
|
||||||
|
f"cmd={self._current_linear:.2f} m/s — emergency decel",
|
||||||
|
throttle_duration_sec=2.0)
|
||||||
|
|
||||||
|
# Latch emergency; clear when speed reaches zero
|
||||||
|
if emerg and not self._emergency:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Emergency stop: target_vel={self._target_vel:.2f} m/s "
|
||||||
|
f"(profile={self._profile})")
|
||||||
|
self._emergency = emerg or (
|
||||||
|
self._emergency and abs(self._current_linear) > 0.05)
|
||||||
|
|
||||||
|
# ── 4. Compute target command ─────────────────────────────────────────
|
||||||
|
if self._emergency:
|
||||||
|
# Override request: decelerate to zero at emergency rate
|
||||||
|
target_lin = 0.0
|
||||||
|
target_ang = 0.0
|
||||||
|
new_lin = emergency_decel_step(
|
||||||
|
self._current_linear, self._p["emergency_decel"], dt)
|
||||||
|
new_ang = emergency_decel_step(
|
||||||
|
self._current_angular, self._p["emergency_decel"] * 2.0, dt)
|
||||||
|
else:
|
||||||
|
# Apply profile caps to request
|
||||||
|
target_lin, target_ang = limit_velocity(
|
||||||
|
self._request_linear, self._request_angular,
|
||||||
|
prof["max_linear_vel"], prof["max_angular_vel"])
|
||||||
|
# Apply smooth acceleration / deceleration
|
||||||
|
new_lin = apply_accel_limit(
|
||||||
|
self._current_linear, target_lin,
|
||||||
|
prof["accel_limit"], prof["decel_limit"], dt)
|
||||||
|
new_ang = apply_accel_limit(
|
||||||
|
self._current_angular, target_ang,
|
||||||
|
prof["accel_limit"] * 2.0, # angular can be snappier
|
||||||
|
prof["decel_limit"] * 2.0, dt)
|
||||||
|
|
||||||
|
self._current_linear = new_lin
|
||||||
|
self._current_angular = new_ang
|
||||||
|
|
||||||
|
# ── 5. Publish ────────────────────────────────────────────────────────
|
||||||
|
out = Twist()
|
||||||
|
out.linear.x = self._current_linear
|
||||||
|
out.angular.z = self._current_angular
|
||||||
|
self._cmd_pub.publish(out)
|
||||||
|
|
||||||
|
# Profile status (for dashboard / logging)
|
||||||
|
status = String()
|
||||||
|
status.data = json.dumps({
|
||||||
|
"profile": self._profile,
|
||||||
|
"max_vel": prof["max_linear_vel"],
|
||||||
|
"target_vel": round(self._target_vel, 2),
|
||||||
|
"gps_speed": round(self._gps_speed, 2),
|
||||||
|
"cmd_linear": round(new_lin, 3),
|
||||||
|
"emergency": self._emergency,
|
||||||
|
})
|
||||||
|
self._profile_pub.publish(status)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = SpeedControllerNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_speed_controller/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_speed_controller/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_speed_controller
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_speed_controller
|
||||||
31
jetson/ros2_ws/src/saltybot_speed_controller/setup.py
Normal file
31
jetson/ros2_ws/src/saltybot_speed_controller/setup.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_speed_controller"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/outdoor_speed.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/speed_profiles.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description=(
|
||||||
|
"Outdoor adaptive speed controller: walk/jog/ride profiles "
|
||||||
|
"with trapezoidal accel limits and GPS runaway protection"
|
||||||
|
),
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
# Adaptive speed profile node: /cmd_vel_raw → /cmd_vel
|
||||||
|
"speed_controller_node = saltybot_speed_controller.speed_controller_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,428 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for speed_controller_node pure functions.
|
||||||
|
No ROS2 runtime required.
|
||||||
|
Run with: pytest jetson/ros2_ws/src/saltybot_speed_controller/test/test_speed_controller.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# ── Pure function mirrors ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
WALK = "walk"
|
||||||
|
JOG = "jog"
|
||||||
|
RIDE = "ride"
|
||||||
|
_PROFILE_ORDER = [WALK, JOG, RIDE]
|
||||||
|
|
||||||
|
_PROFILES = {
|
||||||
|
WALK: {"max_linear_vel": 1.5, "max_angular_vel": 1.2,
|
||||||
|
"accel_limit": 0.6, "decel_limit": 1.0,
|
||||||
|
"target_vel_min": 0.0, "target_vel_max": 2.0},
|
||||||
|
JOG: {"max_linear_vel": 3.0, "max_angular_vel": 1.5,
|
||||||
|
"accel_limit": 0.5, "decel_limit": 0.8,
|
||||||
|
"target_vel_min": 2.0, "target_vel_max": 5.0},
|
||||||
|
RIDE: {"max_linear_vel": 8.0, "max_angular_vel": 1.5,
|
||||||
|
"accel_limit": 0.3, "decel_limit": 0.5,
|
||||||
|
"target_vel_min": 5.0, "target_vel_max": 15.0},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _estimate_target_velocity(prev_range, curr_range, dt, robot_gps_speed):
|
||||||
|
if dt <= 0 or prev_range <= 0 or curr_range <= 0:
|
||||||
|
return robot_gps_speed
|
||||||
|
range_rate = (curr_range - prev_range) / dt
|
||||||
|
return max(0.0, robot_gps_speed + range_rate)
|
||||||
|
|
||||||
|
|
||||||
|
def _select_profile(target_vel, profiles, current_profile,
|
||||||
|
hysteresis_up=5, hysteresis_down=10, counters=None):
|
||||||
|
if counters is None:
|
||||||
|
counters = {}
|
||||||
|
desired = current_profile
|
||||||
|
for name in _PROFILE_ORDER:
|
||||||
|
if target_vel <= profiles[name]["target_vel_max"]:
|
||||||
|
desired = name
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
desired = RIDE
|
||||||
|
if desired == current_profile:
|
||||||
|
counters.clear()
|
||||||
|
return current_profile
|
||||||
|
key = desired
|
||||||
|
counters[key] = counters.get(key, 0) + 1
|
||||||
|
idx_d = _PROFILE_ORDER.index(desired)
|
||||||
|
idx_c = _PROFILE_ORDER.index(current_profile)
|
||||||
|
threshold = hysteresis_up if idx_d > idx_c else hysteresis_down
|
||||||
|
if counters[key] >= threshold:
|
||||||
|
counters.clear()
|
||||||
|
return desired
|
||||||
|
return current_profile
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_accel_limit(current, target, accel_limit, decel_limit, dt):
|
||||||
|
delta = target - current
|
||||||
|
# Decelerating = magnitude is decreasing (current and delta have opposite signs)
|
||||||
|
decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0)
|
||||||
|
limit = decel_limit if decelerating else accel_limit
|
||||||
|
if delta > 0:
|
||||||
|
step = min(delta, limit * dt)
|
||||||
|
else:
|
||||||
|
step = max(delta, -limit * dt)
|
||||||
|
return current + step
|
||||||
|
|
||||||
|
|
||||||
|
def _limit_velocity(linear_x, angular_z, max_linear, max_angular):
|
||||||
|
return (max(-max_linear, min(max_linear, linear_x)),
|
||||||
|
max(-max_angular, min(max_angular, angular_z)))
|
||||||
|
|
||||||
|
|
||||||
|
def _check_emergency_stop(target_vel, target_vel_prev, dt,
|
||||||
|
stop_threshold, hard_decel_threshold):
|
||||||
|
if target_vel < stop_threshold:
|
||||||
|
return True
|
||||||
|
if dt > 0 and (target_vel_prev - target_vel) / dt > hard_decel_threshold:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _check_gps_runaway(gps_speed, cmd_vel, runaway_factor, profile_max):
|
||||||
|
reference = max(abs(cmd_vel), 0.5)
|
||||||
|
return gps_speed > reference * runaway_factor and gps_speed > profile_max * 0.5
|
||||||
|
|
||||||
|
|
||||||
|
def _emergency_decel_step(current_vel, decel_limit, dt):
|
||||||
|
if current_vel > 0:
|
||||||
|
return max(0.0, current_vel - decel_limit * dt)
|
||||||
|
if current_vel < 0:
|
||||||
|
return min(0.0, current_vel + decel_limit * dt)
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Target velocity estimation ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestTargetVelocityEstimation:
|
||||||
|
def test_target_faster_than_robot(self):
|
||||||
|
# Robot at 2 m/s GPS, range increasing 3 m/s → target at 5 m/s
|
||||||
|
v = _estimate_target_velocity(5.0, 8.0, 1.0, 2.0) # range: 5→8 in 1s
|
||||||
|
assert v == pytest.approx(5.0)
|
||||||
|
|
||||||
|
def test_target_same_speed_constant_range(self):
|
||||||
|
v = _estimate_target_velocity(5.0, 5.0, 1.0, 3.0)
|
||||||
|
assert v == pytest.approx(3.0)
|
||||||
|
|
||||||
|
def test_target_stopped_range_closing(self):
|
||||||
|
# Robot at 3 m/s, range closing 3 m/s → target = 3 - 3 = 0 → clamped to 0
|
||||||
|
v = _estimate_target_velocity(8.0, 5.0, 1.0, 3.0)
|
||||||
|
assert v == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_invalid_dt_returns_gps_speed(self):
|
||||||
|
assert _estimate_target_velocity(5.0, 6.0, 0.0, 2.5) == pytest.approx(2.5)
|
||||||
|
|
||||||
|
def test_invalid_prev_range_returns_gps_speed(self):
|
||||||
|
assert _estimate_target_velocity(0.0, 6.0, 1.0, 2.5) == pytest.approx(2.5)
|
||||||
|
|
||||||
|
def test_result_non_negative(self):
|
||||||
|
# Range closing much faster than robot → would be negative → clamp to 0
|
||||||
|
v = _estimate_target_velocity(10.0, 3.0, 0.5, 1.0)
|
||||||
|
assert v >= 0.0
|
||||||
|
|
||||||
|
def test_high_speed_ride_estimate(self):
|
||||||
|
# Robot at 6 m/s, range increasing 2 m/s → target at 8 m/s
|
||||||
|
v = _estimate_target_velocity(4.0, 5.0, 0.5, 6.0) # 2 m/s range rate
|
||||||
|
assert v == pytest.approx(8.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Profile selection ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestProfileSelection:
|
||||||
|
def test_walk_range_stays_walk(self):
|
||||||
|
assert _select_profile(1.0, _PROFILES, WALK) == WALK
|
||||||
|
|
||||||
|
def test_jog_range_stays_jog(self):
|
||||||
|
assert _select_profile(3.5, _PROFILES, JOG) == JOG
|
||||||
|
|
||||||
|
def test_ride_range_stays_ride(self):
|
||||||
|
assert _select_profile(7.0, _PROFILES, RIDE) == RIDE
|
||||||
|
|
||||||
|
def test_upgrade_needs_hysteresis_up(self):
|
||||||
|
# Should NOT upgrade immediately
|
||||||
|
counters = {}
|
||||||
|
profile = WALK
|
||||||
|
# 4 ticks at jog speed — not enough (hysteresis_up=5)
|
||||||
|
for _ in range(4):
|
||||||
|
profile = _select_profile(3.0, _PROFILES, profile,
|
||||||
|
hysteresis_up=5, counters=counters)
|
||||||
|
assert profile == WALK
|
||||||
|
|
||||||
|
def test_upgrade_happens_at_threshold(self):
|
||||||
|
counters = {}
|
||||||
|
profile = WALK
|
||||||
|
for _ in range(5):
|
||||||
|
profile = _select_profile(3.0, _PROFILES, profile,
|
||||||
|
hysteresis_up=5, counters=counters)
|
||||||
|
assert profile == JOG
|
||||||
|
|
||||||
|
def test_downgrade_needs_more_hysteresis(self):
|
||||||
|
counters = {}
|
||||||
|
profile = JOG
|
||||||
|
for _ in range(9):
|
||||||
|
profile = _select_profile(1.0, _PROFILES, profile,
|
||||||
|
hysteresis_down=10, counters=counters)
|
||||||
|
assert profile == JOG # not yet
|
||||||
|
|
||||||
|
def test_downgrade_happens_at_threshold(self):
|
||||||
|
counters = {}
|
||||||
|
profile = JOG
|
||||||
|
for _ in range(10):
|
||||||
|
profile = _select_profile(1.0, _PROFILES, profile,
|
||||||
|
hysteresis_down=10, counters=counters)
|
||||||
|
assert profile == WALK
|
||||||
|
|
||||||
|
def test_counters_cleared_on_stable(self):
|
||||||
|
counters = {}
|
||||||
|
# Partial upgrade attempt then target returns to walk range
|
||||||
|
_select_profile(3.0, _PROFILES, WALK, hysteresis_up=5, counters=counters)
|
||||||
|
_select_profile(3.0, _PROFILES, WALK, hysteresis_up=5, counters=counters)
|
||||||
|
assert JOG in counters # counter accumulated
|
||||||
|
_select_profile(1.0, _PROFILES, WALK, hysteresis_up=5, counters=counters)
|
||||||
|
assert len(counters) == 0 # cleared when stable
|
||||||
|
|
||||||
|
def test_above_all_profiles_selects_ride(self):
|
||||||
|
profile = _select_profile(20.0, _PROFILES, RIDE)
|
||||||
|
assert profile == RIDE
|
||||||
|
|
||||||
|
def test_boundary_walk_max(self):
|
||||||
|
# 2.0 m/s is exactly at walk max → stays walk
|
||||||
|
assert _select_profile(2.0, _PROFILES, WALK) == WALK
|
||||||
|
|
||||||
|
def test_just_above_walk_max_triggers_upgrade(self):
|
||||||
|
counters = {}
|
||||||
|
profile = WALK
|
||||||
|
for _ in range(5):
|
||||||
|
profile = _select_profile(2.01, _PROFILES, profile,
|
||||||
|
hysteresis_up=5, counters=counters)
|
||||||
|
assert profile == JOG
|
||||||
|
|
||||||
|
|
||||||
|
# ── Acceleration limiter ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestAccelLimit:
|
||||||
|
_DT = 0.02 # 50 Hz
|
||||||
|
|
||||||
|
def test_accel_step(self):
|
||||||
|
# 0 → 1 at 0.5 m/s², 0.02s tick → step = 0.01
|
||||||
|
v = _apply_accel_limit(0.0, 1.0, 0.5, 1.0, self._DT)
|
||||||
|
assert v == pytest.approx(0.01)
|
||||||
|
|
||||||
|
def test_decel_step(self):
|
||||||
|
# 1 → 0 at 1.0 m/s², 0.02s → step = 0.02
|
||||||
|
v = _apply_accel_limit(1.0, 0.0, 0.5, 1.0, self._DT)
|
||||||
|
assert v == pytest.approx(0.98)
|
||||||
|
|
||||||
|
def test_does_not_overshoot(self):
|
||||||
|
# Close to target (gap < step)
|
||||||
|
v = _apply_accel_limit(0.995, 1.0, 0.5, 1.0, self._DT)
|
||||||
|
assert v == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_does_not_undershoot(self):
|
||||||
|
v = _apply_accel_limit(0.005, 0.0, 0.5, 1.0, self._DT)
|
||||||
|
assert v == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_negative_to_positive(self):
|
||||||
|
# -1 → 0 (decelerating negative) at decel_limit
|
||||||
|
v = _apply_accel_limit(-1.0, 0.0, 0.5, 1.0, self._DT)
|
||||||
|
assert v == pytest.approx(-0.98) # reduced magnitude
|
||||||
|
|
||||||
|
def test_zero_to_zero(self):
|
||||||
|
assert _apply_accel_limit(0.0, 0.0, 0.5, 1.0, self._DT) == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_ride_accel_gentler_than_walk(self):
|
||||||
|
dt = 0.02
|
||||||
|
v_walk = _apply_accel_limit(0.0, 5.0,
|
||||||
|
_PROFILES[WALK]["accel_limit"],
|
||||||
|
_PROFILES[WALK]["decel_limit"], dt)
|
||||||
|
v_ride = _apply_accel_limit(0.0, 5.0,
|
||||||
|
_PROFILES[RIDE]["accel_limit"],
|
||||||
|
_PROFILES[RIDE]["decel_limit"], dt)
|
||||||
|
assert v_walk > v_ride # walk accelerates faster than ride
|
||||||
|
|
||||||
|
def test_time_to_reach_walk_max(self):
|
||||||
|
# walk: accel=0.6 m/s², target=1.5 m/s, dt=0.02s
|
||||||
|
# expected ticks ≈ 1.5 / (0.6 * 0.02) = 125 ticks
|
||||||
|
v = 0.0
|
||||||
|
ticks = 0
|
||||||
|
for _ in range(200):
|
||||||
|
v = _apply_accel_limit(v, 1.5, 0.6, 1.0, 0.02)
|
||||||
|
ticks += 1
|
||||||
|
if v >= 1.5 - 1e-6:
|
||||||
|
break
|
||||||
|
assert v == pytest.approx(1.5, abs=1e-4)
|
||||||
|
assert 120 <= ticks <= 130 # ≈ 125 ticks
|
||||||
|
|
||||||
|
|
||||||
|
# ── Velocity limiter ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestLimitVelocity:
|
||||||
|
def test_within_limits(self):
|
||||||
|
lin, ang = _limit_velocity(1.0, 0.5, 8.0, 1.5)
|
||||||
|
assert lin == pytest.approx(1.0)
|
||||||
|
assert ang == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_linear_clamped_high(self):
|
||||||
|
lin, _ = _limit_velocity(10.0, 0.0, 8.0, 1.5)
|
||||||
|
assert lin == pytest.approx(8.0)
|
||||||
|
|
||||||
|
def test_linear_clamped_low(self):
|
||||||
|
lin, _ = _limit_velocity(-10.0, 0.0, 8.0, 1.5)
|
||||||
|
assert lin == pytest.approx(-8.0)
|
||||||
|
|
||||||
|
def test_angular_clamped(self):
|
||||||
|
_, ang = _limit_velocity(0.0, 5.0, 8.0, 1.5)
|
||||||
|
assert ang == pytest.approx(1.5)
|
||||||
|
|
||||||
|
def test_zero_passthrough(self):
|
||||||
|
lin, ang = _limit_velocity(0.0, 0.0, 8.0, 1.5)
|
||||||
|
assert lin == pytest.approx(0.0)
|
||||||
|
assert ang == pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Emergency stop detection ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestEmergencyStop:
|
||||||
|
def test_triggers_below_stop_threshold(self):
|
||||||
|
assert _check_emergency_stop(0.2, 3.0, 0.02, 0.4, 3.0) is True
|
||||||
|
|
||||||
|
def test_triggers_on_hard_decel(self):
|
||||||
|
# target_vel drops from 5 to 3 in 0.02s → decel = 100 m/s² >> 3.0
|
||||||
|
assert _check_emergency_stop(3.0, 5.0, 0.02, 0.4, 3.0) is True
|
||||||
|
|
||||||
|
def test_no_trigger_gradual_decel(self):
|
||||||
|
# 5 → 4 in 0.02s → decel = 50 m/s² >> threshold... wait, 50 > 3
|
||||||
|
# Let me use a smaller decel: 5 → 4.9 in 0.02s → decel = 5 m/s²... still > 3
|
||||||
|
# Use 5 → 4.9 in 0.1s → decel = 1.0 m/s²
|
||||||
|
assert _check_emergency_stop(4.9, 5.0, 0.1, 0.4, 3.0) is False
|
||||||
|
|
||||||
|
def test_no_trigger_at_cruising_speed(self):
|
||||||
|
assert _check_emergency_stop(6.0, 6.0, 0.02, 0.4, 3.0) is False
|
||||||
|
|
||||||
|
def test_boundary_exactly_at_stop_threshold(self):
|
||||||
|
# target_vel == stop_threshold → NOT < → no trigger
|
||||||
|
# prev must be close enough that decel check doesn't also fire:
|
||||||
|
# (0.42 - 0.40) / 0.02 = 1.0 m/s² < 3.0 → OK
|
||||||
|
assert _check_emergency_stop(0.4, 0.42, 0.02, 0.4, 3.0) is False
|
||||||
|
|
||||||
|
def test_triggers_just_below_threshold(self):
|
||||||
|
assert _check_emergency_stop(0.39, 1.0, 0.02, 0.4, 3.0) is True
|
||||||
|
|
||||||
|
|
||||||
|
# ── GPS runaway detection ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGpsRunaway:
|
||||||
|
def test_runaway_detected(self):
|
||||||
|
# GPS 5 m/s, commanded 2 m/s → 5 > 2*1.5=3, 5 > 8*0.5=4 → True
|
||||||
|
assert _check_gps_runaway(5.0, 2.0, 1.5, 8.0) is True
|
||||||
|
|
||||||
|
def test_no_runaway_normal(self):
|
||||||
|
# GPS 2.1 m/s, commanded 2 m/s → 2.1 not > 3.0
|
||||||
|
assert _check_gps_runaway(2.1, 2.0, 1.5, 8.0) is False
|
||||||
|
|
||||||
|
def test_no_runaway_stationary(self):
|
||||||
|
# Robot stopped (GPS 0.1 m/s, cmd 0), reference = max(0,0.5) = 0.5
|
||||||
|
# 0.1 not > 0.5*1.5=0.75
|
||||||
|
assert _check_gps_runaway(0.1, 0.0, 1.5, 1.5) is False
|
||||||
|
|
||||||
|
def test_runaway_only_at_meaningful_speed(self):
|
||||||
|
# GPS=6, cmd=4, factor=1.5 → 6 > 4*1.5=6 → False (not strictly greater)
|
||||||
|
assert _check_gps_runaway(6.0, 4.0, 1.5, 8.0) is False
|
||||||
|
|
||||||
|
def test_runaway_needs_gps_above_half_profile_max(self):
|
||||||
|
# Even if ratio is high, below 50% of profile max → no trigger
|
||||||
|
# profile_max=8 → threshold=4; GPS=3 < 4 → no trigger
|
||||||
|
assert _check_gps_runaway(3.0, 0.5, 1.5, 8.0) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Emergency decel step ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestEmergencyDecelStep:
|
||||||
|
_DT = 0.02
|
||||||
|
_DECEL = 2.0
|
||||||
|
|
||||||
|
def test_positive_speed_decreases(self):
|
||||||
|
v = _emergency_decel_step(5.0, self._DECEL, self._DT)
|
||||||
|
assert v == pytest.approx(5.0 - 2.0 * 0.02)
|
||||||
|
|
||||||
|
def test_negative_speed_increases(self):
|
||||||
|
v = _emergency_decel_step(-5.0, self._DECEL, self._DT)
|
||||||
|
assert v == pytest.approx(-5.0 + 2.0 * 0.02)
|
||||||
|
|
||||||
|
def test_does_not_go_past_zero_positive(self):
|
||||||
|
v = _emergency_decel_step(0.03, self._DECEL, self._DT)
|
||||||
|
assert v == pytest.approx(0.0) # 0.03 - 0.04 → clamped to 0
|
||||||
|
|
||||||
|
def test_does_not_go_past_zero_negative(self):
|
||||||
|
v = _emergency_decel_step(-0.03, self._DECEL, self._DT)
|
||||||
|
assert v == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_zero_stays_zero(self):
|
||||||
|
assert _emergency_decel_step(0.0, self._DECEL, self._DT) == 0.0
|
||||||
|
|
||||||
|
def test_full_stop_from_ride_speed(self):
|
||||||
|
# 8 m/s at 2.0 m/s² → ticks to stop = 8 / (2.0 * 0.02) = 200 ticks = 4s
|
||||||
|
v = 8.0
|
||||||
|
ticks = 0
|
||||||
|
for _ in range(500):
|
||||||
|
v = _emergency_decel_step(v, 2.0, 0.02)
|
||||||
|
ticks += 1
|
||||||
|
if v == 0.0:
|
||||||
|
break
|
||||||
|
assert v == 0.0
|
||||||
|
assert 195 <= ticks <= 205
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration: profile + accel + limit ─────────────────────────────────────
|
||||||
|
|
||||||
|
class TestIntegration:
|
||||||
|
"""End-to-end control step simulation."""
|
||||||
|
|
||||||
|
def _step(self, current, request, profile_name, dt=0.02):
|
||||||
|
p = _PROFILES[profile_name]
|
||||||
|
capped_lin, capped_ang = _limit_velocity(
|
||||||
|
request, 0.0, p["max_linear_vel"], p["max_angular_vel"])
|
||||||
|
new_lin = _apply_accel_limit(
|
||||||
|
current, capped_lin, p["accel_limit"], p["decel_limit"], dt)
|
||||||
|
return new_lin
|
||||||
|
|
||||||
|
def test_walk_stays_under_1p5(self):
|
||||||
|
v = 0.0
|
||||||
|
for _ in range(300):
|
||||||
|
v = self._step(v, 5.0, WALK) # request far above limit
|
||||||
|
assert v <= 1.5 + 1e-6
|
||||||
|
|
||||||
|
def test_ride_reaches_8_from_zero(self):
|
||||||
|
v = 0.0
|
||||||
|
for _ in range(5000):
|
||||||
|
v = self._step(v, 8.0, RIDE)
|
||||||
|
assert v == pytest.approx(8.0, abs=1e-4)
|
||||||
|
|
||||||
|
def test_emergency_overrides_profile(self):
|
||||||
|
v = 6.0 # at ride speed
|
||||||
|
# Emergency: decel toward 0
|
||||||
|
for _ in range(500):
|
||||||
|
v = _emergency_decel_step(v, 2.0, 0.02)
|
||||||
|
if v == 0.0:
|
||||||
|
break
|
||||||
|
assert v == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_profile_upgrade_then_downgrade_cycle(self):
|
||||||
|
counters = {}
|
||||||
|
profile = WALK
|
||||||
|
# Upgrade: 5 ticks at jog speed
|
||||||
|
for _ in range(5):
|
||||||
|
profile = _select_profile(3.0, _PROFILES, profile,
|
||||||
|
hysteresis_up=5, counters=counters)
|
||||||
|
assert profile == JOG
|
||||||
|
# Downgrade: 10 ticks at walk speed
|
||||||
|
counters = {}
|
||||||
|
for _ in range(10):
|
||||||
|
profile = _select_profile(1.0, _PROFILES, profile,
|
||||||
|
hysteresis_down=10, counters=counters)
|
||||||
|
assert profile == WALK
|
||||||
Loading…
x
Reference in New Issue
Block a user