feat: outdoor adaptive speed controller — walk/jog/ride profiles up to 8 m/s #76

Merged
seb merged 1 commits from sl-controls/outdoor-speed into main 2026-03-01 01:11:10 -05:00
9 changed files with 1236 additions and 0 deletions

View File

@ -0,0 +1,96 @@
# speed_profiles.yaml
# Outdoor adaptive speed profiles for SaltyBot follow-me mode.
#
# Run with:
# ros2 launch saltybot_speed_controller outdoor_speed.launch.py
#
# IMPORTANT: This controller outputs /cmd_vel which feeds into cmd_vel_bridge
# (PR #46). The bridge applies its own hard caps (max_linear_vel=0.5 m/s).
# For outdoor/ride profiles you MUST re-launch cmd_vel_bridge with higher limits:
# ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=8.0
#
# Data flow:
# person_follower → /cmd_vel_raw → [speed_controller] → /cmd_vel → cmd_vel_bridge → STM32
# ── Controller ─────────────────────────────────────────────────────────────────
control_rate: 50.0 # Hz — 50ms tick, same as cmd_vel_bridge
input_topic: /cmd_vel_raw # Upstream cmd_vel source
output_topic: /cmd_vel # Output to cmd_vel_bridge
initial_profile: walk # Start in safest profile until target velocity known
# ── Profile selection ──────────────────────────────────────────────────────────
# Hysteresis prevents rapid walk↔jog↔ride oscillation.
# up_ticks: consecutive ticks above threshold before upgrading profile (50Hz → 5 = 100ms)
# down_ticks: ticks below threshold before downgrading (50Hz → 15 = 300ms)
# Downgrade is slower (safety: prefer lower profile when uncertain)
hysteresis_up_ticks: 5
hysteresis_down_ticks: 15
# ── Emergency stop ─────────────────────────────────────────────────────────────
# Triggered when target stops suddenly:
# (a) target velocity drops below stop_vel_threshold, OR
# (b) target deceleration exceeds hard_stop_decel_threshold m/s²
#
# emergency_decel_mps2: how fast the robot decelerates to zero.
# Conservative (0.5-1.0 m/s²) is safer for balance; too fast = tip-over.
# At ride speed (8 m/s), 2.0 m/s² = 4 second stop distance ≈ 16m — acceptable.
emergency_decel_mps2: 2.0 # m/s² — hard brake limit
stop_vel_threshold: 0.4 # m/s — target speed below this = stopped
hard_stop_decel_threshold: 3.0 # m/s² — target decel above this = hard stop
# ── GPS runaway protection ─────────────────────────────────────────────────────
# If GPS speed > commanded_speed × runaway_factor AND GPS speed > 50% of
# profile max, trigger emergency decel (catches downhill runaways).
gps_runaway_factor: 1.5 # 1.5× commanded = runaway
gps_timeout: 3.0 # s — ignore GPS if stale (tunnel, bridge)
# ── UWB range ─────────────────────────────────────────────────────────────────
uwb_range_field: range # JSON field name in /uwb/ranges messages
uwb_timeout: 2.0 # s — use GPS-only profile if UWB stale
# ── Speed profiles ─────────────────────────────────────────────────────────────
# Each profile applies when estimated target velocity is in [0, target_vel_max].
# Velocity limits are BEFORE cmd_vel_bridge caps (bridge must also be set high).
#
# Acceleration notes:
# - Balance robot needs time to adjust lean angle when speed changes.
# - accel_limit at RIDE (0.3 m/s²) means 0→8 m/s takes ~27s — intentionally slow.
# - Increase only after validating balance PID tuning at speed.
# - decel_limit can be slightly higher than accel_limit (braking > accelerating).
walk:
# Person walking at 02 m/s (≈ 07 km/h)
max_linear_vel: 1.5 # m/s (= 5.4 km/h)
max_angular_vel: 1.2 # rad/s
accel_limit: 0.6 # m/s² — moderate, balance handles it
decel_limit: 1.0 # m/s²
target_vel_max: 2.0 # m/s — target faster than this → upgrade to jog
jog:
# Person jogging at 25 m/s (≈ 718 km/h)
max_linear_vel: 3.0 # m/s (= 10.8 km/h)
max_angular_vel: 1.5 # rad/s
accel_limit: 0.5 # m/s² — a bit gentler at higher speed
decel_limit: 0.8 # m/s²
target_vel_max: 5.0 # m/s — target faster than this → upgrade to ride
ride:
# Person on EUC at 515 m/s (≈ 1854 km/h / 2030 km/h typical follow-me)
max_linear_vel: 8.0 # m/s (= 28.8 km/h)
max_angular_vel: 1.5 # rad/s — conservative for balance at speed
accel_limit: 0.3 # m/s² — very gentle: balance dynamics change at 8 m/s
decel_limit: 0.5 # m/s² — gradual stop from 8 m/s ≈ 16s / ~64m
target_vel_max: 15.0 # m/s — cap; EUC max ~30 km/h = 8.3 m/s typical
# ── Notes ─────────────────────────────────────────────────────────────────────
# 1. To enable ride profile, the Jetson → STM32 cmd_vel_bridge must also be
# reconfigured: max_linear_vel=8.0, ramp_rate=500 → consider ramp_rate=150
# at ride speed (slower ramp = smoother balance).
#
# 2. The STM32 balance PID gains likely need retuning for ride speed. Expect
# increased sensitivity to pitch angle errors at 8 m/s vs 0.5 m/s.
#
# 3. Test sequence recommendation:
# - Validate walk profile on flat indoor surface first
# - Test jog profile in controlled outdoor space
# - Only enable ride profile after verified balance at jog speeds

View File

@ -0,0 +1,155 @@
"""
outdoor_speed.launch.py Outdoor adaptive speed controller.
Inserts SpeedControllerNode between the person_follower (/cmd_vel_raw)
and cmd_vel_bridge (/cmd_vel), providing adaptive walk/jog/ride profiles
with smooth acceleration curves and emergency deceleration.
IMPORTANT: For ride profile (up to 8 m/s) you must ALSO re-launch
cmd_vel_bridge with matching limits:
ros2 launch saltybot_bridge cmd_vel_bridge.launch.py max_linear_vel:=8.0
Prerequisite node pipeline:
person_follower /cmd_vel_raw [speed_controller] /cmd_vel cmd_vel_bridge STM32
Usage:
# Defaults (walk profile initially, adapts via UWB + GPS):
ros2 launch saltybot_speed_controller outdoor_speed.launch.py
# Force ride profile limits from start:
ros2 launch saltybot_speed_controller outdoor_speed.launch.py initial_profile:=ride
# Tune jog profile max speed:
ros2 launch saltybot_speed_controller outdoor_speed.launch.py jog_max_vel:=4.0
# Gentle emergency decel (longer stop distance, safer for balance):
ros2 launch saltybot_speed_controller outdoor_speed.launch.py emergency_decel:=1.0
# Full outdoor follow-me pipeline (speed controller + person follower):
ros2 launch saltybot_speed_controller outdoor_speed.launch.py \
launch_person_follower:=true
Topics:
/cmd_vel_raw Input (from person_follower or Nav2)
/cmd_vel Output (to cmd_vel_bridge)
/speed_controller/profile Active profile + status (JSON String)
/gps/vel Ground truth speed input (from gps_driver_node)
/uwb/ranges Target range input (for velocity estimation)
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, IncludeLaunchDescription, OpaqueFunction
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def _launch_nodes(context, *args, **kwargs):
params_file = LaunchConfiguration("params_file").perform(context)
def _s(k): return LaunchConfiguration(k).perform(context)
def _f(k): return float(LaunchConfiguration(k).perform(context))
def _i(k): return int(LaunchConfiguration(k).perform(context))
def _b(k): return LaunchConfiguration(k).perform(context).lower() == "true"
inline_params = {
"control_rate": _f("control_rate"),
"input_topic": _s("input_topic"),
"initial_profile": _s("initial_profile"),
"hysteresis_up_ticks": _i("hysteresis_up_ticks"),
"hysteresis_down_ticks": _i("hysteresis_down_ticks"),
"emergency_decel_mps2": _f("emergency_decel"),
"stop_vel_threshold": _f("stop_vel_threshold"),
"hard_stop_decel_threshold": _f("hard_stop_decel_threshold"),
"gps_runaway_factor": _f("gps_runaway_factor"),
"gps_timeout": _f("gps_timeout"),
"uwb_timeout": _f("uwb_timeout"),
# Per-profile params
"walk.max_linear_vel": _f("walk_max_vel"),
"walk.accel_limit": _f("walk_accel"),
"walk.decel_limit": _f("walk_decel"),
"jog.max_linear_vel": _f("jog_max_vel"),
"jog.accel_limit": _f("jog_accel"),
"jog.decel_limit": _f("jog_decel"),
"ride.max_linear_vel": _f("ride_max_vel"),
"ride.accel_limit": _f("ride_accel"),
"ride.decel_limit": _f("ride_decel"),
}
node_params = [params_file, inline_params] if params_file else [inline_params]
nodes = [
Node(
package="saltybot_speed_controller",
executable="speed_controller_node",
name="speed_controller",
output="screen",
parameters=node_params,
),
]
if _b("launch_person_follower"):
try:
follower_pkg = get_package_share_directory("saltybot_follower")
follower_launch = os.path.join(
follower_pkg, "launch", "person_follower.launch.py")
nodes.append(IncludeLaunchDescription(
PythonLaunchDescriptionSource(follower_launch),
launch_arguments={"cmd_vel_topic": "/cmd_vel_raw"}.items(),
))
except Exception:
pass # package not built yet — skip silently
return nodes
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_speed_controller")
default_params = os.path.join(pkg_share, "config", "speed_profiles.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file", default_value=default_params,
description="Full path to speed_profiles.yaml"),
# General
DeclareLaunchArgument("control_rate", default_value="50.0"),
DeclareLaunchArgument("input_topic", default_value="/cmd_vel_raw"),
DeclareLaunchArgument("initial_profile", default_value="walk"),
DeclareLaunchArgument("hysteresis_up_ticks", default_value="5"),
DeclareLaunchArgument("hysteresis_down_ticks",default_value="15"),
DeclareLaunchArgument("launch_person_follower", default_value="false",
description="Also launch saltybot_follower with /cmd_vel_raw output"),
# Emergency
DeclareLaunchArgument("emergency_decel", default_value="2.0",
description="Emergency decel limit (m/s²)"),
DeclareLaunchArgument("stop_vel_threshold", default_value="0.4"),
DeclareLaunchArgument("hard_stop_decel_threshold",default_value="3.0"),
# GPS runaway
DeclareLaunchArgument("gps_runaway_factor", default_value="1.5"),
DeclareLaunchArgument("gps_timeout", default_value="3.0"),
DeclareLaunchArgument("uwb_timeout", default_value="2.0"),
# Walk profile
DeclareLaunchArgument("walk_max_vel", default_value="1.5"),
DeclareLaunchArgument("walk_accel", default_value="0.6"),
DeclareLaunchArgument("walk_decel", default_value="1.0"),
# Jog profile
DeclareLaunchArgument("jog_max_vel", default_value="3.0"),
DeclareLaunchArgument("jog_accel", default_value="0.5"),
DeclareLaunchArgument("jog_decel", default_value="0.8"),
# Ride profile
DeclareLaunchArgument("ride_max_vel", default_value="8.0",
description="Max speed for EUC follow-me (m/s)"),
DeclareLaunchArgument("ride_accel", default_value="0.3",
description="Gentle accel — balance sensitive at 8 m/s"),
DeclareLaunchArgument("ride_decel", default_value="0.5"),
OpaqueFunction(function=_launch_nodes),
])

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_speed_controller</name>
<version>0.1.0</version>
<description>
Outdoor adaptive speed controller for SaltyBot follow-me mode.
Inserts between person_follower (/cmd_vel_raw) and cmd_vel_bridge (/cmd_vel),
selecting walk/jog/ride profiles based on estimated target velocity (UWB + GPS),
applying smooth trapezoidal acceleration curves, GPS runaway protection,
and emergency deceleration for balance stability at speeds up to 8 m/s.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,491 @@
"""
speed_controller_node.py Outdoor adaptive speed controller for SaltyBot.
Sits between upstream cmd_vel sources (person_follower, Nav2) and the
cmd_vel_bridge (PR #46), providing:
Adaptive speed profiles (walk / jog / ride) selected from UWB target
velocity and GPS ground-truth speed
Smooth acceleration curves (trapezoidal ramp) per profile, tuned for
balance stability at speed more aggressive jerk at low speed, gentler
at ride speeds (EUC follow-me at 20-30 km/h)
Emergency deceleration when the target stops suddenly (detected via
UWB range acceleration and target velocity drop)
GPS runaway protection if GPS ground speed significantly overshoots
the commanded velocity (downhill, slippage)
Data flow:
/cmd_vel_raw (Twist) uncapped request from person_follower/Nav2
/gps/vel (TwistStamped) robot ground-truth speed (GPS, 1 Hz)
/uwb/ranges (String JSON) UWB range to target (for velocity estimate)
SpeedControllerNode
/cmd_vel (Twist) smoothed, profile-capped output cmd_vel_bridge
Profiles
walk : 1.5 m/s (person walking 02 m/s)
jog : 3.0 m/s (person jogging 25 m/s)
ride : 8.0 m/s (person on EUC 515 m/s)
Profile selection uses UWB-derived target velocity with hysteresis to avoid
rapid switching. GPS ground speed provides a secondary confirmation and
runaway guard.
"""
import collections
import json
import math
import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, TwistStamped
from std_msgs.msg import String
# ── Pure control functions ────────────────────────────────────────────────────
# Extracted here so unit tests run without a ROS2 runtime.
# Profile name constants
WALK = "walk"
JOG = "jog"
RIDE = "ride"
_PROFILE_ORDER = [WALK, JOG, RIDE]
def estimate_target_velocity(prev_range: float, curr_range: float,
dt: float, robot_gps_speed: float) -> float:
"""
Estimate absolute target velocity from UWB range derivative + GPS.
When the robot follows a target:
range_rate = d(range)/dt (positive = target moving away)
target_vel robot_gps_speed + range_rate
This gives a rough scalar speed suitable for profile classification.
Returns 0 if dt 0 or range values are invalid.
"""
if dt <= 0 or prev_range <= 0 or curr_range <= 0:
return robot_gps_speed # fall back to robot's own speed
range_rate = (curr_range - prev_range) / dt
return max(0.0, robot_gps_speed + range_rate)
def select_profile(target_vel: float, profiles: dict,
current_profile: str,
hysteresis_up: int = 5,
hysteresis_down: int = 10,
_counters: dict = None) -> str:
"""
Select speed profile with up/down hysteresis.
target_vel : estimated target velocity (m/s)
profiles : dict of profile_name {target_vel_min, target_vel_max, ...}
current_profile: currently active profile
hysteresis_up : ticks before switching to a faster profile
hysteresis_down: ticks before switching to a slower profile
Returns the (possibly unchanged) profile name.
_counters is an internal mutable dict passed by the caller for state
(avoids making this a method). Pass {} on first call; reuse thereafter.
"""
if _counters is None:
_counters = {}
# Determine which profile best fits the target velocity
desired = current_profile
for name in _PROFILE_ORDER:
p = profiles[name]
if target_vel <= p["target_vel_max"]:
desired = name
break
else:
desired = RIDE # above all max → stay at highest
if desired == current_profile:
_counters.clear()
return current_profile
key = desired
_counters[key] = _counters.get(key, 0) + 1
idx_desired = _PROFILE_ORDER.index(desired)
idx_current = _PROFILE_ORDER.index(current_profile)
threshold = hysteresis_up if idx_desired > idx_current else hysteresis_down
if _counters[key] >= threshold:
_counters.clear()
return desired
return current_profile
def apply_accel_limit(current: float, target: float,
accel_limit: float, decel_limit: float,
dt: float) -> float:
"""
Apply symmetric trapezoidal acceleration / deceleration limit.
current : current velocity (m/s)
target : desired velocity (m/s)
accel_limit : max acceleration (m/)
decel_limit : max deceleration (m/) positive value
dt : time step (s)
Returns new velocity, ramped toward target within the limits.
"""
delta = target - current
# Decelerating = magnitude is decreasing (current and delta have opposite signs)
decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0)
limit = decel_limit if decelerating else accel_limit
if delta > 0:
step = min(delta, limit * dt)
else:
step = max(delta, -limit * dt)
return current + step
def limit_velocity(linear_x: float, angular_z: float,
max_linear: float, max_angular: float) -> tuple:
"""Hard-clamp linear and angular velocity to profile limits."""
lin = max(-max_linear, min(max_linear, linear_x))
ang = max(-max_angular, min(max_angular, angular_z))
return lin, ang
def check_emergency_stop(target_vel: float, target_vel_prev: float,
dt: float,
stop_vel_threshold: float,
hard_decel_threshold: float) -> bool:
"""
Detect sudden target stop requiring emergency deceleration.
Triggers if:
(a) target velocity drops below stop_vel_threshold, OR
(b) target deceleration exceeds hard_decel_threshold (m/)
Returns True if emergency stop should be engaged.
"""
if target_vel < stop_vel_threshold:
return True
if dt > 0:
target_decel = (target_vel_prev - target_vel) / dt # positive = decelerating
if target_decel > hard_decel_threshold:
return True
return False
def check_gps_runaway(gps_speed: float, cmd_vel: float,
runaway_factor: float, profile_max: float) -> bool:
"""
Detect GPS runaway: robot moving significantly faster than commanded.
Triggers if gps_speed > max(cmd_vel, 0.5) * runaway_factor
AND gps_speed > profile_max (not a sensor glitch on a stationary robot).
"""
reference = max(abs(cmd_vel), 0.5)
return gps_speed > reference * runaway_factor and gps_speed > profile_max * 0.5
def emergency_decel_step(current_vel: float, decel_limit: float, dt: float) -> float:
"""Decelerate toward zero at emergency_decel_limit. Returns new velocity."""
if current_vel > 0:
return max(0.0, current_vel - decel_limit * dt)
if current_vel < 0:
return min(0.0, current_vel + decel_limit * dt)
return 0.0
# ── Default profiles (overridable via YAML) ──────────────────────────────────
DEFAULT_PROFILES = {
WALK: {
"max_linear_vel": 1.5,
"max_angular_vel": 1.2,
"accel_limit": 0.6, # m/s²
"decel_limit": 1.0, # m/s²
"target_vel_min": 0.0,
"target_vel_max": 2.0,
},
JOG: {
"max_linear_vel": 3.0,
"max_angular_vel": 1.5,
"accel_limit": 0.5,
"decel_limit": 0.8,
"target_vel_min": 2.0,
"target_vel_max": 5.0,
},
RIDE: {
"max_linear_vel": 8.0,
"max_angular_vel": 1.5, # conservative for balance at speed
"accel_limit": 0.3, # very gentle — balance needs time at 8 m/s
"decel_limit": 0.5,
"target_vel_min": 5.0,
"target_vel_max": 15.0,
},
}
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class SpeedControllerNode(Node):
def __init__(self):
super().__init__("speed_controller")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("control_rate", 50.0)
self.declare_parameter("input_topic", "/cmd_vel_raw")
self.declare_parameter("output_topic", "/cmd_vel")
self.declare_parameter("initial_profile", WALK)
# Profile selection
self.declare_parameter("hysteresis_up_ticks", 5)
self.declare_parameter("hysteresis_down_ticks", 15)
# Emergency stop
self.declare_parameter("emergency_decel_mps2", 2.0)
self.declare_parameter("stop_vel_threshold", 0.4) # m/s
self.declare_parameter("hard_stop_decel_threshold", 3.0) # m/s²
# GPS runaway
self.declare_parameter("gps_runaway_factor", 1.5) # 1.5× commanded
self.declare_parameter("gps_timeout", 3.0) # s before ignoring GPS
# UWB range
self.declare_parameter("uwb_range_field", "range")
self.declare_parameter("uwb_timeout", 2.0)
# Per-profile overrides (flat params, merged into DEFAULT_PROFILES)
for profile in _PROFILE_ORDER:
self.declare_parameter(f"{profile}.max_linear_vel",
DEFAULT_PROFILES[profile]["max_linear_vel"])
self.declare_parameter(f"{profile}.max_angular_vel",
DEFAULT_PROFILES[profile]["max_angular_vel"])
self.declare_parameter(f"{profile}.accel_limit",
DEFAULT_PROFILES[profile]["accel_limit"])
self.declare_parameter(f"{profile}.decel_limit",
DEFAULT_PROFILES[profile]["decel_limit"])
self.declare_parameter(f"{profile}.target_vel_max",
DEFAULT_PROFILES[profile]["target_vel_max"])
self._profiles = self._load_profiles()
self._p = {
"control_rate": self.get_parameter("control_rate").value,
"hysteresis_up": self.get_parameter("hysteresis_up_ticks").value,
"hysteresis_down": self.get_parameter("hysteresis_down_ticks").value,
"emergency_decel": self.get_parameter("emergency_decel_mps2").value,
"stop_vel_threshold": self.get_parameter("stop_vel_threshold").value,
"hard_stop_decel_threshold": self.get_parameter("hard_stop_decel_threshold").value,
"gps_runaway_factor": self.get_parameter("gps_runaway_factor").value,
"gps_timeout": self.get_parameter("gps_timeout").value,
"uwb_range_field": self.get_parameter("uwb_range_field").value,
"uwb_timeout": self.get_parameter("uwb_timeout").value,
}
dt = 1.0 / self._p["control_rate"]
# ── State ─────────────────────────────────────────────────────────────
self._profile = self.get_parameter("initial_profile").value
self._profile_counters: dict = {}
self._current_linear = 0.0 # smoothed output linear.x
self._current_angular= 0.0 # smoothed output angular.z
self._request_linear = 0.0 # latest /cmd_vel_raw linear.x
self._request_angular= 0.0 # latest /cmd_vel_raw angular.z
self._gps_speed = 0.0 # magnitude from /gps/vel
self._gps_stamp = 0.0 # monotonic time of last GPS
self._uwb_range = 0.0 # latest range (m)
self._uwb_range_prev = 0.0
self._uwb_stamp = 0.0
self._uwb_stamp_prev = 0.0
self._target_vel = 0.0
self._target_vel_prev= 0.0
self._emergency = False
self._dt = dt
# History for runaway filter (last 3 GPS samples)
self._gps_history: collections.deque = collections.deque(maxlen=3)
# ── Subscriptions ─────────────────────────────────────────────────────
self.create_subscription(
Twist, self.get_parameter("input_topic").value,
self._cmd_raw_cb, 10)
self.create_subscription(
TwistStamped, "/gps/vel", self._gps_cb, 10)
self.create_subscription(
String, "/uwb/ranges", self._uwb_cb, 10)
# ── Publisher ──────────────────────────────────────────────────────────
self._cmd_pub = self.create_publisher(
Twist, self.get_parameter("output_topic").value, 10)
self._profile_pub = self.create_publisher(
String, "/speed_controller/profile", 10)
# ── Control timer ──────────────────────────────────────────────────────
self.create_timer(dt, self._control_cb)
self.get_logger().info(
f"SpeedController ready profile={self._profile} "
f"rate={self._p['control_rate']}Hz "
f"walk≤{self._profiles[WALK]['max_linear_vel']}m/s "
f"jog≤{self._profiles[JOG]['max_linear_vel']}m/s "
f"ride≤{self._profiles[RIDE]['max_linear_vel']}m/s")
def _load_profiles(self) -> dict:
"""Merge ROS2 params into DEFAULT_PROFILES."""
import copy
profiles = copy.deepcopy(DEFAULT_PROFILES)
for name in _PROFILE_ORDER:
for field in ("max_linear_vel", "max_angular_vel", "accel_limit",
"decel_limit", "target_vel_max"):
val = self.get_parameter(f"{name}.{field}").value
profiles[name][field] = val
return profiles
# ── Callbacks ─────────────────────────────────────────────────────────────
def _cmd_raw_cb(self, msg: Twist):
self._request_linear = msg.linear.x
self._request_angular = msg.angular.z
def _gps_cb(self, msg: TwistStamped):
vx = msg.twist.linear.x
vy = msg.twist.linear.y
self._gps_speed = math.sqrt(vx * vx + vy * vy)
self._gps_stamp = time.monotonic()
self._gps_history.append(self._gps_speed)
def _uwb_cb(self, msg: String):
"""Parse JSON range message, update range estimate."""
try:
data = json.loads(msg.data)
field = self._p["uwb_range_field"]
r = float(data.get(field, data.get("distance", 0.0)))
if r > 0:
self._uwb_range_prev = self._uwb_range
self._uwb_stamp_prev = self._uwb_stamp
self._uwb_range = r
self._uwb_stamp = time.monotonic()
except (json.JSONDecodeError, ValueError, KeyError):
pass
# ── Control loop ──────────────────────────────────────────────────────────
def _control_cb(self):
now = time.monotonic()
dt = self._dt
# ── 1. Estimate target velocity from UWB range derivative + GPS ──────
uwb_fresh = (now - self._uwb_stamp) < self._p["uwb_timeout"]
gps_fresh = (now - self._gps_stamp) < self._p["gps_timeout"]
if uwb_fresh and self._uwb_stamp_prev > 0:
uwb_dt = self._uwb_stamp - self._uwb_stamp_prev
robot_speed = self._gps_speed if gps_fresh else self._current_linear
self._target_vel_prev = self._target_vel
self._target_vel = estimate_target_velocity(
self._uwb_range_prev, self._uwb_range, uwb_dt, robot_speed)
elif gps_fresh:
# No UWB — use GPS speed as proxy for target velocity
self._target_vel_prev = self._target_vel
self._target_vel = self._gps_speed
# else: hold previous estimate
# ── 2. Profile selection ──────────────────────────────────────────────
self._profile = select_profile(
self._target_vel, self._profiles, self._profile,
self._p["hysteresis_up"], self._p["hysteresis_down"],
self._profile_counters,
)
prof = self._profiles[self._profile]
# ── 3. Emergency stop detection ───────────────────────────────────────
emerg = check_emergency_stop(
self._target_vel, self._target_vel_prev, dt,
self._p["stop_vel_threshold"],
self._p["hard_stop_decel_threshold"],
)
# GPS runaway check (use smoothed GPS)
if gps_fresh:
gps_smooth = sum(self._gps_history) / len(self._gps_history)
if check_gps_runaway(
gps_smooth, self._current_linear,
self._p["gps_runaway_factor"],
prof["max_linear_vel"]):
emerg = True
self.get_logger().warn(
f"GPS runaway: gps={gps_smooth:.2f} "
f"cmd={self._current_linear:.2f} m/s — emergency decel",
throttle_duration_sec=2.0)
# Latch emergency; clear when speed reaches zero
if emerg and not self._emergency:
self.get_logger().warn(
f"Emergency stop: target_vel={self._target_vel:.2f} m/s "
f"(profile={self._profile})")
self._emergency = emerg or (
self._emergency and abs(self._current_linear) > 0.05)
# ── 4. Compute target command ─────────────────────────────────────────
if self._emergency:
# Override request: decelerate to zero at emergency rate
target_lin = 0.0
target_ang = 0.0
new_lin = emergency_decel_step(
self._current_linear, self._p["emergency_decel"], dt)
new_ang = emergency_decel_step(
self._current_angular, self._p["emergency_decel"] * 2.0, dt)
else:
# Apply profile caps to request
target_lin, target_ang = limit_velocity(
self._request_linear, self._request_angular,
prof["max_linear_vel"], prof["max_angular_vel"])
# Apply smooth acceleration / deceleration
new_lin = apply_accel_limit(
self._current_linear, target_lin,
prof["accel_limit"], prof["decel_limit"], dt)
new_ang = apply_accel_limit(
self._current_angular, target_ang,
prof["accel_limit"] * 2.0, # angular can be snappier
prof["decel_limit"] * 2.0, dt)
self._current_linear = new_lin
self._current_angular = new_ang
# ── 5. Publish ────────────────────────────────────────────────────────
out = Twist()
out.linear.x = self._current_linear
out.angular.z = self._current_angular
self._cmd_pub.publish(out)
# Profile status (for dashboard / logging)
status = String()
status.data = json.dumps({
"profile": self._profile,
"max_vel": prof["max_linear_vel"],
"target_vel": round(self._target_vel, 2),
"gps_speed": round(self._gps_speed, 2),
"cmd_linear": round(new_lin, 3),
"emergency": self._emergency,
})
self._profile_pub.publish(status)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = SpeedControllerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_speed_controller
[install]
install_scripts=$base/lib/saltybot_speed_controller

View File

@ -0,0 +1,31 @@
from setuptools import setup
package_name = "saltybot_speed_controller"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/outdoor_speed.launch.py"]),
(f"share/{package_name}/config", ["config/speed_profiles.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description=(
"Outdoor adaptive speed controller: walk/jog/ride profiles "
"with trapezoidal accel limits and GPS runaway protection"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
# Adaptive speed profile node: /cmd_vel_raw → /cmd_vel
"speed_controller_node = saltybot_speed_controller.speed_controller_node:main",
],
},
)

View File

@ -0,0 +1,428 @@
"""
Unit tests for speed_controller_node pure functions.
No ROS2 runtime required.
Run with: pytest jetson/ros2_ws/src/saltybot_speed_controller/test/test_speed_controller.py
"""
import pytest
# ── Pure function mirrors ─────────────────────────────────────────────────────
WALK = "walk"
JOG = "jog"
RIDE = "ride"
_PROFILE_ORDER = [WALK, JOG, RIDE]
_PROFILES = {
WALK: {"max_linear_vel": 1.5, "max_angular_vel": 1.2,
"accel_limit": 0.6, "decel_limit": 1.0,
"target_vel_min": 0.0, "target_vel_max": 2.0},
JOG: {"max_linear_vel": 3.0, "max_angular_vel": 1.5,
"accel_limit": 0.5, "decel_limit": 0.8,
"target_vel_min": 2.0, "target_vel_max": 5.0},
RIDE: {"max_linear_vel": 8.0, "max_angular_vel": 1.5,
"accel_limit": 0.3, "decel_limit": 0.5,
"target_vel_min": 5.0, "target_vel_max": 15.0},
}
def _estimate_target_velocity(prev_range, curr_range, dt, robot_gps_speed):
if dt <= 0 or prev_range <= 0 or curr_range <= 0:
return robot_gps_speed
range_rate = (curr_range - prev_range) / dt
return max(0.0, robot_gps_speed + range_rate)
def _select_profile(target_vel, profiles, current_profile,
hysteresis_up=5, hysteresis_down=10, counters=None):
if counters is None:
counters = {}
desired = current_profile
for name in _PROFILE_ORDER:
if target_vel <= profiles[name]["target_vel_max"]:
desired = name
break
else:
desired = RIDE
if desired == current_profile:
counters.clear()
return current_profile
key = desired
counters[key] = counters.get(key, 0) + 1
idx_d = _PROFILE_ORDER.index(desired)
idx_c = _PROFILE_ORDER.index(current_profile)
threshold = hysteresis_up if idx_d > idx_c else hysteresis_down
if counters[key] >= threshold:
counters.clear()
return desired
return current_profile
def _apply_accel_limit(current, target, accel_limit, decel_limit, dt):
delta = target - current
# Decelerating = magnitude is decreasing (current and delta have opposite signs)
decelerating = (current > 0 and delta < 0) or (current < 0 and delta > 0)
limit = decel_limit if decelerating else accel_limit
if delta > 0:
step = min(delta, limit * dt)
else:
step = max(delta, -limit * dt)
return current + step
def _limit_velocity(linear_x, angular_z, max_linear, max_angular):
return (max(-max_linear, min(max_linear, linear_x)),
max(-max_angular, min(max_angular, angular_z)))
def _check_emergency_stop(target_vel, target_vel_prev, dt,
stop_threshold, hard_decel_threshold):
if target_vel < stop_threshold:
return True
if dt > 0 and (target_vel_prev - target_vel) / dt > hard_decel_threshold:
return True
return False
def _check_gps_runaway(gps_speed, cmd_vel, runaway_factor, profile_max):
reference = max(abs(cmd_vel), 0.5)
return gps_speed > reference * runaway_factor and gps_speed > profile_max * 0.5
def _emergency_decel_step(current_vel, decel_limit, dt):
if current_vel > 0:
return max(0.0, current_vel - decel_limit * dt)
if current_vel < 0:
return min(0.0, current_vel + decel_limit * dt)
return 0.0
# ── Target velocity estimation ────────────────────────────────────────────────
class TestTargetVelocityEstimation:
def test_target_faster_than_robot(self):
# Robot at 2 m/s GPS, range increasing 3 m/s → target at 5 m/s
v = _estimate_target_velocity(5.0, 8.0, 1.0, 2.0) # range: 5→8 in 1s
assert v == pytest.approx(5.0)
def test_target_same_speed_constant_range(self):
v = _estimate_target_velocity(5.0, 5.0, 1.0, 3.0)
assert v == pytest.approx(3.0)
def test_target_stopped_range_closing(self):
# Robot at 3 m/s, range closing 3 m/s → target = 3 - 3 = 0 → clamped to 0
v = _estimate_target_velocity(8.0, 5.0, 1.0, 3.0)
assert v == pytest.approx(0.0)
def test_invalid_dt_returns_gps_speed(self):
assert _estimate_target_velocity(5.0, 6.0, 0.0, 2.5) == pytest.approx(2.5)
def test_invalid_prev_range_returns_gps_speed(self):
assert _estimate_target_velocity(0.0, 6.0, 1.0, 2.5) == pytest.approx(2.5)
def test_result_non_negative(self):
# Range closing much faster than robot → would be negative → clamp to 0
v = _estimate_target_velocity(10.0, 3.0, 0.5, 1.0)
assert v >= 0.0
def test_high_speed_ride_estimate(self):
# Robot at 6 m/s, range increasing 2 m/s → target at 8 m/s
v = _estimate_target_velocity(4.0, 5.0, 0.5, 6.0) # 2 m/s range rate
assert v == pytest.approx(8.0)
# ── Profile selection ─────────────────────────────────────────────────────────
class TestProfileSelection:
def test_walk_range_stays_walk(self):
assert _select_profile(1.0, _PROFILES, WALK) == WALK
def test_jog_range_stays_jog(self):
assert _select_profile(3.5, _PROFILES, JOG) == JOG
def test_ride_range_stays_ride(self):
assert _select_profile(7.0, _PROFILES, RIDE) == RIDE
def test_upgrade_needs_hysteresis_up(self):
# Should NOT upgrade immediately
counters = {}
profile = WALK
# 4 ticks at jog speed — not enough (hysteresis_up=5)
for _ in range(4):
profile = _select_profile(3.0, _PROFILES, profile,
hysteresis_up=5, counters=counters)
assert profile == WALK
def test_upgrade_happens_at_threshold(self):
counters = {}
profile = WALK
for _ in range(5):
profile = _select_profile(3.0, _PROFILES, profile,
hysteresis_up=5, counters=counters)
assert profile == JOG
def test_downgrade_needs_more_hysteresis(self):
counters = {}
profile = JOG
for _ in range(9):
profile = _select_profile(1.0, _PROFILES, profile,
hysteresis_down=10, counters=counters)
assert profile == JOG # not yet
def test_downgrade_happens_at_threshold(self):
counters = {}
profile = JOG
for _ in range(10):
profile = _select_profile(1.0, _PROFILES, profile,
hysteresis_down=10, counters=counters)
assert profile == WALK
def test_counters_cleared_on_stable(self):
counters = {}
# Partial upgrade attempt then target returns to walk range
_select_profile(3.0, _PROFILES, WALK, hysteresis_up=5, counters=counters)
_select_profile(3.0, _PROFILES, WALK, hysteresis_up=5, counters=counters)
assert JOG in counters # counter accumulated
_select_profile(1.0, _PROFILES, WALK, hysteresis_up=5, counters=counters)
assert len(counters) == 0 # cleared when stable
def test_above_all_profiles_selects_ride(self):
profile = _select_profile(20.0, _PROFILES, RIDE)
assert profile == RIDE
def test_boundary_walk_max(self):
# 2.0 m/s is exactly at walk max → stays walk
assert _select_profile(2.0, _PROFILES, WALK) == WALK
def test_just_above_walk_max_triggers_upgrade(self):
counters = {}
profile = WALK
for _ in range(5):
profile = _select_profile(2.01, _PROFILES, profile,
hysteresis_up=5, counters=counters)
assert profile == JOG
# ── Acceleration limiter ──────────────────────────────────────────────────────
class TestAccelLimit:
_DT = 0.02 # 50 Hz
def test_accel_step(self):
# 0 → 1 at 0.5 m/s², 0.02s tick → step = 0.01
v = _apply_accel_limit(0.0, 1.0, 0.5, 1.0, self._DT)
assert v == pytest.approx(0.01)
def test_decel_step(self):
# 1 → 0 at 1.0 m/s², 0.02s → step = 0.02
v = _apply_accel_limit(1.0, 0.0, 0.5, 1.0, self._DT)
assert v == pytest.approx(0.98)
def test_does_not_overshoot(self):
# Close to target (gap < step)
v = _apply_accel_limit(0.995, 1.0, 0.5, 1.0, self._DT)
assert v == pytest.approx(1.0)
def test_does_not_undershoot(self):
v = _apply_accel_limit(0.005, 0.0, 0.5, 1.0, self._DT)
assert v == pytest.approx(0.0)
def test_negative_to_positive(self):
# -1 → 0 (decelerating negative) at decel_limit
v = _apply_accel_limit(-1.0, 0.0, 0.5, 1.0, self._DT)
assert v == pytest.approx(-0.98) # reduced magnitude
def test_zero_to_zero(self):
assert _apply_accel_limit(0.0, 0.0, 0.5, 1.0, self._DT) == pytest.approx(0.0)
def test_ride_accel_gentler_than_walk(self):
dt = 0.02
v_walk = _apply_accel_limit(0.0, 5.0,
_PROFILES[WALK]["accel_limit"],
_PROFILES[WALK]["decel_limit"], dt)
v_ride = _apply_accel_limit(0.0, 5.0,
_PROFILES[RIDE]["accel_limit"],
_PROFILES[RIDE]["decel_limit"], dt)
assert v_walk > v_ride # walk accelerates faster than ride
def test_time_to_reach_walk_max(self):
# walk: accel=0.6 m/s², target=1.5 m/s, dt=0.02s
# expected ticks ≈ 1.5 / (0.6 * 0.02) = 125 ticks
v = 0.0
ticks = 0
for _ in range(200):
v = _apply_accel_limit(v, 1.5, 0.6, 1.0, 0.02)
ticks += 1
if v >= 1.5 - 1e-6:
break
assert v == pytest.approx(1.5, abs=1e-4)
assert 120 <= ticks <= 130 # ≈ 125 ticks
# ── Velocity limiter ──────────────────────────────────────────────────────────
class TestLimitVelocity:
def test_within_limits(self):
lin, ang = _limit_velocity(1.0, 0.5, 8.0, 1.5)
assert lin == pytest.approx(1.0)
assert ang == pytest.approx(0.5)
def test_linear_clamped_high(self):
lin, _ = _limit_velocity(10.0, 0.0, 8.0, 1.5)
assert lin == pytest.approx(8.0)
def test_linear_clamped_low(self):
lin, _ = _limit_velocity(-10.0, 0.0, 8.0, 1.5)
assert lin == pytest.approx(-8.0)
def test_angular_clamped(self):
_, ang = _limit_velocity(0.0, 5.0, 8.0, 1.5)
assert ang == pytest.approx(1.5)
def test_zero_passthrough(self):
lin, ang = _limit_velocity(0.0, 0.0, 8.0, 1.5)
assert lin == pytest.approx(0.0)
assert ang == pytest.approx(0.0)
# ── Emergency stop detection ──────────────────────────────────────────────────
class TestEmergencyStop:
def test_triggers_below_stop_threshold(self):
assert _check_emergency_stop(0.2, 3.0, 0.02, 0.4, 3.0) is True
def test_triggers_on_hard_decel(self):
# target_vel drops from 5 to 3 in 0.02s → decel = 100 m/s² >> 3.0
assert _check_emergency_stop(3.0, 5.0, 0.02, 0.4, 3.0) is True
def test_no_trigger_gradual_decel(self):
# 5 → 4 in 0.02s → decel = 50 m/s² >> threshold... wait, 50 > 3
# Let me use a smaller decel: 5 → 4.9 in 0.02s → decel = 5 m/s²... still > 3
# Use 5 → 4.9 in 0.1s → decel = 1.0 m/s²
assert _check_emergency_stop(4.9, 5.0, 0.1, 0.4, 3.0) is False
def test_no_trigger_at_cruising_speed(self):
assert _check_emergency_stop(6.0, 6.0, 0.02, 0.4, 3.0) is False
def test_boundary_exactly_at_stop_threshold(self):
# target_vel == stop_threshold → NOT < → no trigger
# prev must be close enough that decel check doesn't also fire:
# (0.42 - 0.40) / 0.02 = 1.0 m/s² < 3.0 → OK
assert _check_emergency_stop(0.4, 0.42, 0.02, 0.4, 3.0) is False
def test_triggers_just_below_threshold(self):
assert _check_emergency_stop(0.39, 1.0, 0.02, 0.4, 3.0) is True
# ── GPS runaway detection ─────────────────────────────────────────────────────
class TestGpsRunaway:
def test_runaway_detected(self):
# GPS 5 m/s, commanded 2 m/s → 5 > 2*1.5=3, 5 > 8*0.5=4 → True
assert _check_gps_runaway(5.0, 2.0, 1.5, 8.0) is True
def test_no_runaway_normal(self):
# GPS 2.1 m/s, commanded 2 m/s → 2.1 not > 3.0
assert _check_gps_runaway(2.1, 2.0, 1.5, 8.0) is False
def test_no_runaway_stationary(self):
# Robot stopped (GPS 0.1 m/s, cmd 0), reference = max(0,0.5) = 0.5
# 0.1 not > 0.5*1.5=0.75
assert _check_gps_runaway(0.1, 0.0, 1.5, 1.5) is False
def test_runaway_only_at_meaningful_speed(self):
# GPS=6, cmd=4, factor=1.5 → 6 > 4*1.5=6 → False (not strictly greater)
assert _check_gps_runaway(6.0, 4.0, 1.5, 8.0) is False
def test_runaway_needs_gps_above_half_profile_max(self):
# Even if ratio is high, below 50% of profile max → no trigger
# profile_max=8 → threshold=4; GPS=3 < 4 → no trigger
assert _check_gps_runaway(3.0, 0.5, 1.5, 8.0) is False
# ── Emergency decel step ──────────────────────────────────────────────────────
class TestEmergencyDecelStep:
_DT = 0.02
_DECEL = 2.0
def test_positive_speed_decreases(self):
v = _emergency_decel_step(5.0, self._DECEL, self._DT)
assert v == pytest.approx(5.0 - 2.0 * 0.02)
def test_negative_speed_increases(self):
v = _emergency_decel_step(-5.0, self._DECEL, self._DT)
assert v == pytest.approx(-5.0 + 2.0 * 0.02)
def test_does_not_go_past_zero_positive(self):
v = _emergency_decel_step(0.03, self._DECEL, self._DT)
assert v == pytest.approx(0.0) # 0.03 - 0.04 → clamped to 0
def test_does_not_go_past_zero_negative(self):
v = _emergency_decel_step(-0.03, self._DECEL, self._DT)
assert v == pytest.approx(0.0)
def test_zero_stays_zero(self):
assert _emergency_decel_step(0.0, self._DECEL, self._DT) == 0.0
def test_full_stop_from_ride_speed(self):
# 8 m/s at 2.0 m/s² → ticks to stop = 8 / (2.0 * 0.02) = 200 ticks = 4s
v = 8.0
ticks = 0
for _ in range(500):
v = _emergency_decel_step(v, 2.0, 0.02)
ticks += 1
if v == 0.0:
break
assert v == 0.0
assert 195 <= ticks <= 205
# ── Integration: profile + accel + limit ─────────────────────────────────────
class TestIntegration:
"""End-to-end control step simulation."""
def _step(self, current, request, profile_name, dt=0.02):
p = _PROFILES[profile_name]
capped_lin, capped_ang = _limit_velocity(
request, 0.0, p["max_linear_vel"], p["max_angular_vel"])
new_lin = _apply_accel_limit(
current, capped_lin, p["accel_limit"], p["decel_limit"], dt)
return new_lin
def test_walk_stays_under_1p5(self):
v = 0.0
for _ in range(300):
v = self._step(v, 5.0, WALK) # request far above limit
assert v <= 1.5 + 1e-6
def test_ride_reaches_8_from_zero(self):
v = 0.0
for _ in range(5000):
v = self._step(v, 8.0, RIDE)
assert v == pytest.approx(8.0, abs=1e-4)
def test_emergency_overrides_profile(self):
v = 6.0 # at ride speed
# Emergency: decel toward 0
for _ in range(500):
v = _emergency_decel_step(v, 2.0, 0.02)
if v == 0.0:
break
assert v == pytest.approx(0.0)
def test_profile_upgrade_then_downgrade_cycle(self):
counters = {}
profile = WALK
# Upgrade: 5 ticks at jog speed
for _ in range(5):
profile = _select_profile(3.0, _PROFILES, profile,
hysteresis_up=5, counters=counters)
assert profile == JOG
# Downgrade: 10 ticks at walk speed
counters = {}
for _ in range(10):
profile = _select_profile(1.0, _PROFILES, profile,
hysteresis_down=10, counters=counters)
assert profile == WALK