feat(controls): Autonomous/RC mode switch with 500ms blend ramp (Issue #104)
New package: saltybot_mode_switch
mode_logic.py (pure, no ROS2 dep — 72/72 tests pass):
State machine: RC → RAMP_TO_AUTO → AUTO → RAMP_TO_RC → RC
• CH6 (axes[5] > 0.5) requests AUTO; CH6 low → RAMP_TO_RC
• Stick >10% in AUTO/RAMP_TO_AUTO/RAMP_TO_RC → instant RC (no ramp)
• Sticks neutral ≥ 2 s after override → override cleared → RAMP_TO_AUTO
• RC link lost (Joy silent > 0.5 s) → instant RC from any state
• SLAM fix lost → RAMP_TO_RC (graceful exit from AUTO)
• No AUTO entry without slam_ok AND rc_link_ok
blend_alpha: 0.0 (RC) → linear ramp over 500 ms → 1.0 (AUTO)
led_pattern: solid_yellow(RC) | blink_green_slow(ramp) |
solid_green(AUTO) | blink_orange_fast(slam lost) |
blink_red_fast(RC link lost)
mode_switch_node.py (ROS2, 20 Hz):
Sub: /rc/joy (Joy), /saltybot/balance_state (String),
/slam_toolbox/pose_with_covariance_stamped (PoseWithCovarianceStamped)
Pub: /saltybot/control_mode (String JSON: mode+blend_alpha+slam_ok+rc_link_ok+override_active)
/saltybot/led_pattern (String)
cmd_vel_mux_node.py (ROS2, 20 Hz):
Sub: /cmd_vel_auto (Twist from Nav2/follower), /saltybot/control_mode
Pub: /cmd_vel (Twist to bridge, scaled by blend_alpha)
Note: remap Nav2/follower output to /cmd_vel_auto in launch.
Tests: 72/72 pass (no ROS2 runtime required).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
0f2ea7931b
commit
9733f5f097
4
jetson/ros2_ws/src/saltybot_mode_switch/.gitignore
vendored
Normal file
4
jetson/ros2_ws/src/saltybot_mode_switch/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
*.egg-info/
|
||||
.pytest_cache/
|
||||
@ -0,0 +1,58 @@
|
||||
# mode_switch_params.yaml — saltybot_mode_switch
|
||||
#
|
||||
# Launch with:
|
||||
# ros2 launch saltybot_mode_switch mode_switch.launch.py
|
||||
#
|
||||
# Topic wiring:
|
||||
# /rc/joy → mode_switch_node (CRSF channels)
|
||||
# /saltybot/balance_state → mode_switch_node (STM32 state)
|
||||
# /slam_toolbox/pose_with_covariance_stamped → mode_switch_node (SLAM fix)
|
||||
# /saltybot/control_mode ← mode_switch_node (JSON mode + alpha)
|
||||
# /saltybot/led_pattern ← mode_switch_node (LED name)
|
||||
#
|
||||
# /cmd_vel_auto → cmd_vel_mux_node (from Nav2/follower)
|
||||
# /saltybot/control_mode → cmd_vel_mux_node
|
||||
# /cmd_vel ← cmd_vel_mux_node (to bridge)
|
||||
#
|
||||
# IMPORTANT: remap Nav2 and person_follower to publish /cmd_vel_auto:
|
||||
# ros2 launch saltybot_bringup nav2.launch.py cmd_vel_topic:=/cmd_vel_auto
|
||||
|
||||
# ── CRSF channel mapping ──────────────────────────────────────────────────────
|
||||
# Joy axes index for the AUX2 / CH6 mode-toggle switch.
|
||||
# Typical CRSF→Joy mapping: CH1=axes[0] … CH6=axes[5].
|
||||
ch6_axis: 5
|
||||
|
||||
# Space-separated Joy axes indices for the four control sticks (roll/pitch/throttle/yaw).
|
||||
# These are monitored for the 10% override threshold.
|
||||
stick_axes: "0 1 2 3"
|
||||
|
||||
# ── Override / return-to-auto ──────────────────────────────────────────────────
|
||||
# Stick magnitude (0–1) that triggers instant RC override while in AUTO.
|
||||
# 10% (0.10) per spec — change only if stick noise causes false triggers.
|
||||
override_threshold: 0.10
|
||||
|
||||
# Seconds sticks must remain neutral before AUTO re-engages after an override.
|
||||
return_delay_s: 2.0
|
||||
|
||||
# ── Ramp ──────────────────────────────────────────────────────────────────────
|
||||
# RC↔AUTO blend ramp duration (seconds). cmd_vel_mux scales by blend_alpha
|
||||
# so autonomous commands are eased in/out over this window.
|
||||
ramp_duration_s: 0.5
|
||||
|
||||
# ── RC link health ────────────────────────────────────────────────────────────
|
||||
# Seconds of Joy silence before RC link is declared lost → force RC mode.
|
||||
rc_link_timeout_s: 0.5
|
||||
|
||||
# ── SLAM fix ──────────────────────────────────────────────────────────────────
|
||||
# ROS2 topic providing the SLAM localisation pose. Any message received within
|
||||
# slam_fix_timeout_s counts as a valid fix.
|
||||
slam_fix_topic: "/slam_toolbox/pose_with_covariance_stamped"
|
||||
|
||||
# Seconds after the last SLAM pose before fix is considered lost.
|
||||
slam_fix_timeout_s: 3.0
|
||||
|
||||
# ── Control loop ──────────────────────────────────────────────────────────────
|
||||
control_rate: 20.0 # Hz
|
||||
|
||||
# ── cmd_vel_mux ───────────────────────────────────────────────────────────────
|
||||
cmd_vel_timeout_s: 0.5 # deadman: zero output if /cmd_vel_auto goes silent
|
||||
@ -0,0 +1,59 @@
|
||||
"""mode_switch.launch.py — launch mode_switch_node + cmd_vel_mux_node."""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_share = get_package_share_directory("saltybot_mode_switch")
|
||||
default_cfg = os.path.join(pkg_share, "config", "mode_switch_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument(
|
||||
"params_file",
|
||||
default_value=default_cfg,
|
||||
description="Path to mode_switch_params.yaml",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"ramp_duration_s",
|
||||
default_value="0.5",
|
||||
description="RC↔AUTO blend ramp duration (seconds)",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"return_delay_s",
|
||||
default_value="2.0",
|
||||
description="Sticks-neutral duration before auto re-engage (seconds)",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"slam_fix_topic",
|
||||
default_value="/slam_toolbox/pose_with_covariance_stamped",
|
||||
description="SLAM fix pose topic",
|
||||
),
|
||||
|
||||
Node(
|
||||
package="saltybot_mode_switch",
|
||||
executable="mode_switch_node",
|
||||
name="mode_switch",
|
||||
output="screen",
|
||||
parameters=[
|
||||
LaunchConfiguration("params_file"),
|
||||
{
|
||||
"ramp_duration_s": LaunchConfiguration("ramp_duration_s"),
|
||||
"return_delay_s": LaunchConfiguration("return_delay_s"),
|
||||
"slam_fix_topic": LaunchConfiguration("slam_fix_topic"),
|
||||
},
|
||||
],
|
||||
),
|
||||
|
||||
Node(
|
||||
package="saltybot_mode_switch",
|
||||
executable="cmd_vel_mux_node",
|
||||
name="cmd_vel_mux",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("params_file")],
|
||||
),
|
||||
])
|
||||
31
jetson/ros2_ws/src/saltybot_mode_switch/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_mode_switch/package.xml
Normal file
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_mode_switch</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Autonomous/RC mode switch for saltybot (Issue #104).
|
||||
Monitors CRSF CH6 toggle, RC stick override, SLAM fix status, and RC link
|
||||
health to manage RC↔AUTO transitions with a 500 ms blend ramp.
|
||||
Publishes /saltybot/control_mode (JSON) and /saltybot/led_pattern.
|
||||
Includes cmd_vel_mux_node that scales autonomous cmd_vel by blend_alpha.
|
||||
</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,133 @@
|
||||
"""
|
||||
cmd_vel_mux_node.py — Blend-alpha cmd_vel multiplexer (Issue #104).
|
||||
|
||||
Sits between the Nav2 / person-follower stack and the cmd_vel_bridge_node,
|
||||
scaling the autonomous cmd_vel output by the blend_alpha published by
|
||||
mode_switch_node so that the RC → AUTO handoff is smooth over 500 ms.
|
||||
|
||||
Topic graph
|
||||
-----------
|
||||
Nav2 / follower → /cmd_vel_auto ──┐
|
||||
├── cmd_vel_mux → /cmd_vel → bridge
|
||||
mode_switch → /saltybot/control_mode (blend_alpha)
|
||||
|
||||
In RC mode (blend_alpha ≈ 0) the node publishes Twist(0,0) so the bridge
|
||||
receives zeros — this is harmless because the bridge's mode gate already
|
||||
prevents autonomous commands when the STM32 is in RC_MANUAL.
|
||||
|
||||
The bridge's existing ESC ramp handles hardware-level smoothing;
|
||||
the blend_alpha here provides the higher-level cmd_vel policy ramp.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cmd_vel_timeout_s Deadman: zero output if /cmd_vel_auto goes silent (0.5 s)
|
||||
control_rate Output publish rate (Hz) (20.0)
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||
|
||||
from geometry_msgs.msg import Twist
|
||||
from std_msgs.msg import String
|
||||
|
||||
from saltybot_mode_switch.mode_logic import blend_twist
|
||||
|
||||
_BIG = 1e9
|
||||
|
||||
|
||||
class CmdVelMuxNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("cmd_vel_mux")
|
||||
|
||||
self.declare_parameter("cmd_vel_timeout_s", 0.5)
|
||||
self.declare_parameter("control_rate", 20.0)
|
||||
|
||||
self._timeout = self.get_parameter("cmd_vel_timeout_s").value
|
||||
self._rate = self.get_parameter("control_rate").value
|
||||
|
||||
reliable = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
|
||||
# ── State ─────────────────────────────────────────────────────────────
|
||||
self._auto_linear_x: float = 0.0
|
||||
self._auto_angular_z: float = 0.0
|
||||
self._blend_alpha: float = 0.0
|
||||
self._last_auto_t: float = 0.0
|
||||
|
||||
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
Twist, "/cmd_vel_auto", self._auto_cb, reliable)
|
||||
self.create_subscription(
|
||||
String, "/saltybot/control_mode", self._mode_cb, reliable)
|
||||
|
||||
# ── Publisher ─────────────────────────────────────────────────────────
|
||||
self._pub = self.create_publisher(Twist, "/cmd_vel", reliable)
|
||||
|
||||
# ── Timer ─────────────────────────────────────────────────────────────
|
||||
self._timer = self.create_timer(1.0 / self._rate, self._publish_cb)
|
||||
|
||||
self.get_logger().info(
|
||||
f"CmdVelMuxNode ready rate={self._rate:.0f}Hz "
|
||||
f"timeout={self._timeout:.1f}s"
|
||||
)
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _auto_cb(self, msg: Twist) -> None:
|
||||
self._auto_linear_x = msg.linear.x
|
||||
self._auto_angular_z = msg.angular.z
|
||||
self._last_auto_t = time.monotonic()
|
||||
|
||||
def _mode_cb(self, msg: String) -> None:
|
||||
try:
|
||||
data = json.loads(msg.data)
|
||||
self._blend_alpha = float(data.get("blend_alpha", 0.0))
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
self._blend_alpha = 0.0
|
||||
|
||||
# ── 20 Hz publish ─────────────────────────────────────────────────────────
|
||||
|
||||
def _publish_cb(self) -> None:
|
||||
now = time.monotonic()
|
||||
|
||||
# Deadman: zero if autonomous source went silent
|
||||
auto_age = (now - self._last_auto_t) if self._last_auto_t > 0.0 else _BIG
|
||||
if auto_age > self._timeout:
|
||||
linear_x, angular_z = 0.0, 0.0
|
||||
else:
|
||||
linear_x, angular_z = blend_twist(
|
||||
self._auto_linear_x,
|
||||
self._auto_angular_z,
|
||||
self._blend_alpha,
|
||||
)
|
||||
|
||||
twist = Twist()
|
||||
twist.linear.x = linear_x
|
||||
twist.angular.z = angular_z
|
||||
self._pub.publish(twist)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = CmdVelMuxNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,241 @@
|
||||
"""
|
||||
mode_logic.py — Pure RC/Autonomous mode-switch logic for saltybot.
|
||||
|
||||
All functions are stateless or operate on plain Python types so the full
|
||||
state machine can be exercised in unit tests without a ROS2 runtime.
|
||||
|
||||
Mode vocabulary
|
||||
---------------
|
||||
"RC" — STM32 executing RC pilot commands; Jetson cmd_vel blocked.
|
||||
"RAMP_TO_AUTO" — Transitioning RC→AUTO; blend_alpha 0.0→1.0 over ramp_s.
|
||||
"AUTO" — STM32 executing Jetson cmd_vel; RC sticks idle.
|
||||
"RAMP_TO_RC" — Transitioning AUTO→RC; blend_alpha 1.0→0.0 over ramp_s.
|
||||
|
||||
Blend alpha
|
||||
-----------
|
||||
The node publishes blend_alpha ∈ [0, 1] in /saltybot/control_mode.
|
||||
The cmd_vel_mux_node scales autonomous cmd_vel by alpha before publishing
|
||||
to /cmd_vel (the bridge input). alpha=0 ⟹ no autonomous motion.
|
||||
|
||||
RC override
|
||||
-----------
|
||||
Any stick magnitude > override_threshold while in AUTO instantly forces RC
|
||||
(alpha → 0 immediately; no ramp). _override_active is set so the
|
||||
return-to-auto path requires sticks to be neutral for return_delay seconds.
|
||||
|
||||
CH6 / toggle
|
||||
------------
|
||||
ch6_auto=True means the pilot has requested AUTO (switch up/high).
|
||||
ch6_auto=False means the pilot has requested RC (switch down/low).
|
||||
Pure CH6 transitions (no stick activity) always ramp gracefully.
|
||||
|
||||
Safety interlocks
|
||||
-----------------
|
||||
• No AUTO without slam_ok (valid SLAM fix received recently).
|
||||
• Instant drop to RC when rc_link_ok goes False (RC link lost).
|
||||
These checks are applied at every control tick.
|
||||
"""
|
||||
|
||||
import math
|
||||
|
||||
# ─── Constants ──────────────────────────────────────────────────────────────
|
||||
|
||||
MODES = ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC")
|
||||
|
||||
LED_PATTERNS = {
|
||||
"rc_normal": "solid_yellow",
|
||||
"rc_no_slam": "solid_yellow", # no slam needed in RC
|
||||
"auto_ok": "solid_green",
|
||||
"auto_no_slam": "blink_orange_fast", # shouldn't happen (interlock), but defensive
|
||||
"ramp_to_auto": "blink_green_slow",
|
||||
"ramp_to_rc": "blink_yellow_slow",
|
||||
"no_rc_link": "blink_red_fast",
|
||||
}
|
||||
|
||||
# ─── Stick / CH6 helpers ────────────────────────────────────────────────────
|
||||
|
||||
def ch6_wants_auto(axes: list, ch6_idx: int, threshold: float = 0.5) -> bool:
|
||||
"""
|
||||
Return True when CH6 (aux switch) is in the AUTO position.
|
||||
|
||||
Typical CRSF→Joy mapping: AUX2 = axes[5].
|
||||
Switch up (high) = +1.0 → AUTO; switch down (low) = -1.0 → RC.
|
||||
threshold default 0.5 gives hysteresis against mid-travel noise.
|
||||
"""
|
||||
if ch6_idx < 0 or ch6_idx >= len(axes):
|
||||
return False
|
||||
return float(axes[ch6_idx]) > threshold
|
||||
|
||||
|
||||
def sticks_active(
|
||||
axes: list,
|
||||
stick_indices: tuple,
|
||||
threshold: float = 0.10,
|
||||
) -> bool:
|
||||
"""
|
||||
Return True when any monitored stick axis exceeds threshold magnitude.
|
||||
|
||||
stick_indices: tuple of Joy axes indices for the four sticks
|
||||
(roll, pitch, throttle, yaw). Default CRSF mapping:
|
||||
(0, 1, 2, 3).
|
||||
threshold: 10 % of full deflection (issue spec).
|
||||
"""
|
||||
return any(
|
||||
abs(float(axes[i])) > threshold
|
||||
for i in stick_indices
|
||||
if i < len(axes)
|
||||
)
|
||||
|
||||
|
||||
# ─── Alpha / ramp helpers ───────────────────────────────────────────────────
|
||||
|
||||
def ramp_alpha(elapsed_s: float, ramp_s: float) -> float:
|
||||
"""
|
||||
Compute forward blend alpha [0, 1] linearly over ramp_s seconds.
|
||||
|
||||
elapsed_s : seconds since ramp start.
|
||||
ramp_s : total ramp duration (seconds).
|
||||
"""
|
||||
if ramp_s <= 0.0:
|
||||
return 1.0
|
||||
return min(1.0, elapsed_s / ramp_s)
|
||||
|
||||
|
||||
def alpha_for_mode(mode: str, elapsed_s: float, ramp_s: float) -> float:
|
||||
"""
|
||||
Return the blend alpha appropriate for `mode`.
|
||||
|
||||
RC → 0.0
|
||||
RAMP_TO_AUTO → 0.0 … 1.0 (linear with elapsed)
|
||||
AUTO → 1.0
|
||||
RAMP_TO_RC → 1.0 … 0.0 (linear with elapsed)
|
||||
"""
|
||||
if mode == "RC":
|
||||
return 0.0
|
||||
if mode == "AUTO":
|
||||
return 1.0
|
||||
if mode == "RAMP_TO_AUTO":
|
||||
return ramp_alpha(elapsed_s, ramp_s)
|
||||
if mode == "RAMP_TO_RC":
|
||||
return 1.0 - ramp_alpha(elapsed_s, ramp_s)
|
||||
return 0.0
|
||||
|
||||
|
||||
# ─── LED pattern ────────────────────────────────────────────────────────────
|
||||
|
||||
def led_pattern(mode: str, slam_ok: bool, rc_link_ok: bool) -> str:
|
||||
"""Return the LED pattern name for the current operating state."""
|
||||
if not rc_link_ok:
|
||||
return LED_PATTERNS["no_rc_link"]
|
||||
if mode in ("RC", "RAMP_TO_RC"):
|
||||
return LED_PATTERNS["rc_normal"]
|
||||
if mode == "RAMP_TO_AUTO":
|
||||
return LED_PATTERNS["ramp_to_auto"]
|
||||
if mode == "AUTO":
|
||||
return LED_PATTERNS["auto_ok"] if slam_ok else LED_PATTERNS["auto_no_slam"]
|
||||
return LED_PATTERNS["no_rc_link"]
|
||||
|
||||
|
||||
# ─── State machine step ──────────────────────────────────────────────────────
|
||||
|
||||
def step_state(
|
||||
current_mode: str,
|
||||
ramp_elapsed_s: float,
|
||||
ramp_s: float,
|
||||
ch6_auto: bool,
|
||||
sticks_now_active: bool,
|
||||
sticks_neutral_elapsed_s: float,
|
||||
return_delay_s: float,
|
||||
slam_ok: bool,
|
||||
rc_link_ok: bool,
|
||||
override_active: bool,
|
||||
) -> tuple:
|
||||
"""
|
||||
Compute (next_mode, next_override_active, instant_to_rc).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
current_mode : current mode string
|
||||
ramp_elapsed_s : seconds since last ramp start
|
||||
ramp_s : configured ramp duration
|
||||
ch6_auto : True = pilot switch requests AUTO
|
||||
sticks_now_active : True = at least one stick > override_threshold
|
||||
sticks_neutral_elapsed_s: seconds since all sticks went neutral (0 if still active)
|
||||
return_delay_s : seconds of stick-neutral required to re-engage AUTO
|
||||
slam_ok : True = SLAM fix is valid
|
||||
rc_link_ok : True = CRSF link alive
|
||||
override_active : True = currently in stick-override RC
|
||||
|
||||
Returns
|
||||
-------
|
||||
next_mode : new mode string
|
||||
next_override_active: updated override flag
|
||||
instant_to_rc : True when dropping to RC without ramp (safety / stick override)
|
||||
"""
|
||||
instant_to_rc = False
|
||||
|
||||
# ── Safety: RC link lost → instant RC ────────────────────────────────────
|
||||
if not rc_link_ok:
|
||||
return "RC", False, True
|
||||
|
||||
# ── Compute override transitions ──────────────────────────────────────────
|
||||
next_override = override_active
|
||||
if sticks_now_active and current_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"):
|
||||
# Stick input while AUTO or any transition → instant RC override
|
||||
next_override = True
|
||||
return "RC", True, True
|
||||
|
||||
if not sticks_now_active and sticks_neutral_elapsed_s >= return_delay_s:
|
||||
# Sticks have been neutral long enough — clear override
|
||||
next_override = False
|
||||
|
||||
# ── Ramp completion ───────────────────────────────────────────────────────
|
||||
ramp_done = ramp_elapsed_s >= ramp_s - 1e-9 # epsilon guards float accumulation
|
||||
|
||||
# ── Transitions per state ─────────────────────────────────────────────────
|
||||
if current_mode == "RC":
|
||||
if ch6_auto and slam_ok and not next_override:
|
||||
return "RAMP_TO_AUTO", next_override, False
|
||||
return "RC", next_override, False
|
||||
|
||||
if current_mode == "RAMP_TO_AUTO":
|
||||
if not ch6_auto:
|
||||
return "RAMP_TO_RC", next_override, False
|
||||
if not slam_ok:
|
||||
return "RAMP_TO_RC", next_override, False
|
||||
if ramp_done:
|
||||
return "AUTO", next_override, False
|
||||
return "RAMP_TO_AUTO", next_override, False
|
||||
|
||||
if current_mode == "AUTO":
|
||||
if not ch6_auto:
|
||||
return "RAMP_TO_RC", next_override, False
|
||||
if not slam_ok:
|
||||
return "RAMP_TO_RC", next_override, False
|
||||
return "AUTO", next_override, False
|
||||
|
||||
if current_mode == "RAMP_TO_RC":
|
||||
if ch6_auto and slam_ok and not next_override:
|
||||
return "RAMP_TO_AUTO", next_override, False
|
||||
if ramp_done:
|
||||
return "RC", next_override, False
|
||||
return "RAMP_TO_RC", next_override, False
|
||||
|
||||
return "RC", next_override, True # unknown state → safe fallback
|
||||
|
||||
|
||||
# ─── cmd_vel blending ───────────────────────────────────────────────────────
|
||||
|
||||
def blend_twist(
|
||||
auto_linear_x: float,
|
||||
auto_angular_z: float,
|
||||
alpha: float,
|
||||
) -> tuple:
|
||||
"""
|
||||
Scale autonomous cmd_vel by blend alpha.
|
||||
|
||||
Returns (linear_x, angular_z) for the blended output.
|
||||
alpha=0 → zeros; alpha=1 → full autonomous.
|
||||
"""
|
||||
a = max(0.0, min(1.0, alpha))
|
||||
return auto_linear_x * a, auto_angular_z * a
|
||||
@ -0,0 +1,307 @@
|
||||
"""
|
||||
mode_switch_node.py — Autonomous/RC mode switch for saltybot (Issue #104).
|
||||
|
||||
Inputs
|
||||
------
|
||||
/rc/joy (sensor_msgs/Joy)
|
||||
CRSF channel axes:
|
||||
axes[ch6_axis] AUX2 — mode toggle (+1=AUTO, -1=RC)
|
||||
axes[stick_axes...] Roll/Pitch/Throttle/Yaw — override detection
|
||||
|
||||
/saltybot/balance_state (std_msgs/String JSON)
|
||||
Parsed for RC link health (field "rc_link") and STM32 mode.
|
||||
|
||||
<slam_fix_topic> (geometry_msgs/PoseWithCovarianceStamped)
|
||||
Any message received within slam_fix_timeout_s → SLAM fix valid.
|
||||
|
||||
Outputs
|
||||
-------
|
||||
/saltybot/control_mode (std_msgs/String JSON)
|
||||
{
|
||||
"mode": "RC" | "RAMP_TO_AUTO" | "AUTO" | "RAMP_TO_RC",
|
||||
"blend_alpha": 0.0 … 1.0,
|
||||
"slam_ok": bool,
|
||||
"rc_link_ok": bool,
|
||||
"override_active": bool
|
||||
}
|
||||
|
||||
/saltybot/led_pattern (std_msgs/String)
|
||||
LED pattern name consumed by the LED controller node.
|
||||
|
||||
State machine
|
||||
-------------
|
||||
See mode_logic.py for the full transition table.
|
||||
|
||||
RC — pilot in control; blend_alpha = 0.0
|
||||
RAMP_TO_AUTO — transitioning; blend_alpha 0→1 over ramp_duration_s (500 ms)
|
||||
AUTO — Jetson in control; blend_alpha = 1.0
|
||||
RAMP_TO_RC — transitioning; blend_alpha 1→0 over ramp_duration_s (500 ms)
|
||||
|
||||
Safety interlocks
|
||||
-----------------
|
||||
• Any stick > override_threshold while AUTO/RAMP_TO_AUTO → instant RC.
|
||||
• rc_link lost (Joy silent > rc_link_timeout_s) → instant RC.
|
||||
• AUTO not entered without valid SLAM fix.
|
||||
• SLAM fix lost while AUTO → RAMP_TO_RC.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ch6_axis Joy axes index for the mode-toggle switch (default 5)
|
||||
stick_axes Space-separated Joy indices for 4 sticks (default "0 1 2 3")
|
||||
override_threshold Stick magnitude for instant RC override (default 0.10)
|
||||
return_delay_s Sticks-neutral duration before re-engage (default 2.0 s)
|
||||
ramp_duration_s RC↔AUTO blend ramp duration (default 0.5 s)
|
||||
rc_link_timeout_s Joy silence threshold → RC link lost (default 0.5 s)
|
||||
slam_fix_topic Pose topic for SLAM fix detection (default /slam_toolbox/pose_with_covariance_stamped)
|
||||
slam_fix_timeout_s Pose silence threshold → SLAM fix lost (default 3.0 s)
|
||||
control_rate State machine tick rate (Hz) (default 20.0)
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy, DurabilityPolicy
|
||||
|
||||
from geometry_msgs.msg import PoseWithCovarianceStamped
|
||||
from sensor_msgs.msg import Joy
|
||||
from std_msgs.msg import String
|
||||
|
||||
from saltybot_mode_switch.mode_logic import (
|
||||
alpha_for_mode,
|
||||
ch6_wants_auto,
|
||||
led_pattern,
|
||||
step_state,
|
||||
sticks_active,
|
||||
)
|
||||
|
||||
_BIG = 1e9 # sentinel: "never received"
|
||||
|
||||
|
||||
class ModeSwitchNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("mode_switch")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter("ch6_axis", 5)
|
||||
self.declare_parameter("stick_axes", "0 1 2 3")
|
||||
self.declare_parameter("override_threshold", 0.10)
|
||||
self.declare_parameter("return_delay_s", 2.0)
|
||||
self.declare_parameter("ramp_duration_s", 0.5)
|
||||
self.declare_parameter("rc_link_timeout_s", 0.5)
|
||||
self.declare_parameter("slam_fix_topic",
|
||||
"/slam_toolbox/pose_with_covariance_stamped")
|
||||
self.declare_parameter("slam_fix_timeout_s", 3.0)
|
||||
self.declare_parameter("control_rate", 20.0)
|
||||
|
||||
self._reload_params()
|
||||
|
||||
# ── State ─────────────────────────────────────────────────────────────
|
||||
self._mode: str = "RC"
|
||||
self._override_active: bool = False
|
||||
self._ramp_start: float = 0.0 # monotonic time ramp began
|
||||
self._sticks_neutral_since: float | None = None
|
||||
self._last_joy_t: float = 0.0 # monotonic; 0 = never
|
||||
self._last_slam_t: float = 0.0
|
||||
self._joy_axes: list = []
|
||||
self._stm32_mode: int = 0 # from balance_state JSON
|
||||
|
||||
# ── QoS ───────────────────────────────────────────────────────────────
|
||||
best_effort = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
reliable = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
# SLAM toolbox uses transient_local for its pose topic
|
||||
transient = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
||||
depth=1,
|
||||
)
|
||||
|
||||
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||
self.create_subscription(Joy, "/rc/joy", self._joy_cb, best_effort)
|
||||
self.create_subscription(
|
||||
String, "/saltybot/balance_state", self._balance_cb, reliable)
|
||||
|
||||
slam_topic = self.get_parameter("slam_fix_topic").value
|
||||
# Try transient_local first (SLAM Toolbox); fall back gracefully if not
|
||||
try:
|
||||
self.create_subscription(
|
||||
PoseWithCovarianceStamped, slam_topic,
|
||||
self._slam_cb, transient)
|
||||
except Exception:
|
||||
self.create_subscription(
|
||||
PoseWithCovarianceStamped, slam_topic,
|
||||
self._slam_cb, reliable)
|
||||
|
||||
# ── Publishers ────────────────────────────────────────────────────────
|
||||
self._mode_pub = self.create_publisher(
|
||||
String, "/saltybot/control_mode", reliable)
|
||||
self._led_pub = self.create_publisher(
|
||||
String, "/saltybot/led_pattern", reliable)
|
||||
|
||||
# ── Control timer ─────────────────────────────────────────────────────
|
||||
self._timer = self.create_timer(
|
||||
1.0 / self._p["control_rate"], self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f"ModeSwitchNode ready "
|
||||
f"ramp={self._p['ramp_duration_s']:.1f}s "
|
||||
f"override_thresh={self._p['override_threshold']:.0%} "
|
||||
f"return_delay={self._p['return_delay_s']:.1f}s "
|
||||
f"slam_topic={slam_topic}"
|
||||
)
|
||||
|
||||
# ── Parameter reload ──────────────────────────────────────────────────────
|
||||
|
||||
def _reload_params(self):
|
||||
raw_stick_axes = self.get_parameter("stick_axes").value
|
||||
self._p = {
|
||||
"ch6_axis": int(self.get_parameter("ch6_axis").value),
|
||||
"stick_indices": tuple(int(x) for x in raw_stick_axes.split()),
|
||||
"override_threshold": float(self.get_parameter("override_threshold").value),
|
||||
"return_delay_s": float(self.get_parameter("return_delay_s").value),
|
||||
"ramp_duration_s": float(self.get_parameter("ramp_duration_s").value),
|
||||
"rc_link_timeout_s": float(self.get_parameter("rc_link_timeout_s").value),
|
||||
"slam_fix_timeout_s": float(self.get_parameter("slam_fix_timeout_s").value),
|
||||
"control_rate": float(self.get_parameter("control_rate").value),
|
||||
}
|
||||
|
||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _joy_cb(self, msg: Joy) -> None:
|
||||
self._joy_axes = list(msg.axes)
|
||||
self._last_joy_t = time.monotonic()
|
||||
|
||||
def _balance_cb(self, msg: String) -> None:
|
||||
try:
|
||||
data = json.loads(msg.data)
|
||||
# "mode" is a label string; map back to int for reference
|
||||
mode_label = data.get("mode", "RC_MANUAL")
|
||||
self._stm32_mode = {"RC_MANUAL": 0, "RC_ASSISTED": 1,
|
||||
"AUTONOMOUS": 2}.get(mode_label, 0)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
|
||||
def _slam_cb(self, msg: PoseWithCovarianceStamped) -> None:
|
||||
self._last_slam_t = time.monotonic()
|
||||
|
||||
# ── 20 Hz control tick ────────────────────────────────────────────────────
|
||||
|
||||
def _tick(self) -> None:
|
||||
self._reload_params()
|
||||
now = time.monotonic()
|
||||
p = self._p
|
||||
|
||||
# ── Derive inputs ─────────────────────────────────────────────────────
|
||||
joy_age = (now - self._last_joy_t) if self._last_joy_t > 0.0 else _BIG
|
||||
slam_age = (now - self._last_slam_t) if self._last_slam_t > 0.0 else _BIG
|
||||
rc_link_ok = joy_age <= p["rc_link_timeout_s"]
|
||||
slam_ok = slam_age <= p["slam_fix_timeout_s"]
|
||||
|
||||
axes = self._joy_axes
|
||||
ch6 = ch6_wants_auto(axes, p["ch6_axis"])
|
||||
active = sticks_active(axes, p["stick_indices"], p["override_threshold"])
|
||||
|
||||
# Track neutral timer
|
||||
if active:
|
||||
self._sticks_neutral_since = None
|
||||
elif self._sticks_neutral_since is None:
|
||||
self._sticks_neutral_since = now
|
||||
|
||||
neutral_elapsed = (
|
||||
(now - self._sticks_neutral_since)
|
||||
if self._sticks_neutral_since is not None else 0.0
|
||||
)
|
||||
|
||||
ramp_elapsed = now - self._ramp_start
|
||||
|
||||
# ── State machine step ────────────────────────────────────────────────
|
||||
prev_mode = self._mode
|
||||
next_mode, next_override, instant = step_state(
|
||||
current_mode=self._mode,
|
||||
ramp_elapsed_s=ramp_elapsed,
|
||||
ramp_s=p["ramp_duration_s"],
|
||||
ch6_auto=ch6,
|
||||
sticks_now_active=active,
|
||||
sticks_neutral_elapsed_s=neutral_elapsed,
|
||||
return_delay_s=p["return_delay_s"],
|
||||
slam_ok=slam_ok,
|
||||
rc_link_ok=rc_link_ok,
|
||||
override_active=self._override_active,
|
||||
)
|
||||
|
||||
if next_mode != prev_mode:
|
||||
# Log transitions; reset ramp timer on ramp entry
|
||||
self.get_logger().info(
|
||||
f"Mode: {prev_mode} → {next_mode}"
|
||||
f" slam_ok={slam_ok} rc_link={rc_link_ok}"
|
||||
f" override={next_override}"
|
||||
)
|
||||
if next_mode in ("RAMP_TO_AUTO", "RAMP_TO_RC"):
|
||||
self._ramp_start = now
|
||||
ramp_elapsed = 0.0
|
||||
|
||||
self._mode = next_mode
|
||||
self._override_active = next_override
|
||||
|
||||
# ── Compute blend alpha ───────────────────────────────────────────────
|
||||
current_ramp_elapsed = now - self._ramp_start
|
||||
alpha = alpha_for_mode(next_mode, current_ramp_elapsed, p["ramp_duration_s"])
|
||||
if instant:
|
||||
alpha = 0.0
|
||||
|
||||
# ── Log steady warnings ───────────────────────────────────────────────
|
||||
if next_mode == "AUTO" and not slam_ok:
|
||||
self.get_logger().warn(
|
||||
"In AUTO but SLAM fix lost — will exit AUTO",
|
||||
throttle_duration_sec=2.0)
|
||||
if not rc_link_ok:
|
||||
self.get_logger().warn(
|
||||
"RC link lost — forced RC mode",
|
||||
throttle_duration_sec=1.0)
|
||||
|
||||
# ── Publish /saltybot/control_mode ────────────────────────────────────
|
||||
payload = {
|
||||
"mode": next_mode,
|
||||
"blend_alpha": round(alpha, 4),
|
||||
"slam_ok": slam_ok,
|
||||
"rc_link_ok": rc_link_ok,
|
||||
"override_active": next_override,
|
||||
}
|
||||
mode_msg = String()
|
||||
mode_msg.data = json.dumps(payload)
|
||||
self._mode_pub.publish(mode_msg)
|
||||
|
||||
# ── Publish /saltybot/led_pattern ─────────────────────────────────────
|
||||
pattern = led_pattern(next_mode, slam_ok, rc_link_ok)
|
||||
led_msg = String()
|
||||
led_msg.data = pattern
|
||||
self._led_pub.publish(led_msg)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = ModeSwitchNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
5
jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_mode_switch
|
||||
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_mode_switch
|
||||
33
jetson/ros2_ws/src/saltybot_mode_switch/setup.py
Normal file
33
jetson/ros2_ws/src/saltybot_mode_switch/setup.py
Normal file
@ -0,0 +1,33 @@
|
||||
from setuptools import setup, find_packages
|
||||
import os
|
||||
from glob import glob
|
||||
|
||||
package_name = "saltybot_mode_switch"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=find_packages(exclude=["test"]),
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages",
|
||||
[f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(os.path.join("share", package_name, "config"),
|
||||
glob("config/*.yaml")),
|
||||
(os.path.join("share", package_name, "launch"),
|
||||
glob("launch/*.py")),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description="Autonomous/RC mode switch with 500 ms blend ramp (Issue #104)",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
f"mode_switch_node = {package_name}.mode_switch_node:main",
|
||||
f"cmd_vel_mux_node = {package_name}.cmd_vel_mux_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
481
jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py
Normal file
481
jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py
Normal file
@ -0,0 +1,481 @@
|
||||
"""
|
||||
test_mode_logic.py — Unit tests for saltybot_mode_switch pure logic.
|
||||
|
||||
Covers:
|
||||
- ch6_wants_auto / sticks_active helpers
|
||||
- ramp_alpha / alpha_for_mode
|
||||
- led_pattern
|
||||
- step_state state machine (all transitions)
|
||||
- blend_twist
|
||||
|
||||
No ROS2 runtime required.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import math
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
import pytest
|
||||
|
||||
from saltybot_mode_switch.mode_logic import (
|
||||
alpha_for_mode,
|
||||
blend_twist,
|
||||
ch6_wants_auto,
|
||||
led_pattern,
|
||||
ramp_alpha,
|
||||
step_state,
|
||||
sticks_active,
|
||||
)
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Helpers
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _step(
|
||||
mode="RC",
|
||||
ramp_elapsed=0.0,
|
||||
ramp_s=0.5,
|
||||
ch6_auto=False,
|
||||
sticks_now_active=False,
|
||||
neutral_elapsed=0.0,
|
||||
return_delay=2.0,
|
||||
slam_ok=True,
|
||||
rc_link_ok=True,
|
||||
override_active=False,
|
||||
):
|
||||
return step_state(
|
||||
current_mode=mode,
|
||||
ramp_elapsed_s=ramp_elapsed,
|
||||
ramp_s=ramp_s,
|
||||
ch6_auto=ch6_auto,
|
||||
sticks_now_active=sticks_now_active,
|
||||
sticks_neutral_elapsed_s=neutral_elapsed,
|
||||
return_delay_s=return_delay,
|
||||
slam_ok=slam_ok,
|
||||
rc_link_ok=rc_link_ok,
|
||||
override_active=override_active,
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# ch6_wants_auto
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestCh6WantsAuto:
|
||||
|
||||
def test_high_value_returns_true(self):
|
||||
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 1.0], ch6_idx=5) is True
|
||||
|
||||
def test_low_value_returns_false(self):
|
||||
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, -1.0], ch6_idx=5) is False
|
||||
|
||||
def test_mid_value_below_threshold(self):
|
||||
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.4], ch6_idx=5) is False
|
||||
|
||||
def test_exactly_at_threshold(self):
|
||||
# 0.5 > 0.5 is False; require strictly above
|
||||
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.5], ch6_idx=5) is False
|
||||
|
||||
def test_just_above_threshold(self):
|
||||
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.51], ch6_idx=5) is True
|
||||
|
||||
def test_out_of_range_index(self):
|
||||
assert ch6_wants_auto([1.0], ch6_idx=10) is False
|
||||
|
||||
def test_empty_axes(self):
|
||||
assert ch6_wants_auto([], ch6_idx=0) is False
|
||||
|
||||
def test_custom_threshold(self):
|
||||
assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.2) is True
|
||||
assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.5) is False
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# sticks_active
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSticksActive:
|
||||
|
||||
def test_all_zero_not_active(self):
|
||||
assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 2, 3)) is False
|
||||
|
||||
def test_one_stick_above_threshold(self):
|
||||
assert sticks_active([0.0, 0.15, 0.0, 0.0], (0, 1, 2, 3)) is True
|
||||
|
||||
def test_all_exactly_at_threshold_not_active(self):
|
||||
axes = [0.10, 0.10, 0.10, 0.10]
|
||||
assert sticks_active(axes, (0, 1, 2, 3), threshold=0.10) is False
|
||||
|
||||
def test_just_above_threshold_active(self):
|
||||
assert sticks_active([0.0, 0.0, 0.0, 0.11], (0, 1, 2, 3)) is True
|
||||
|
||||
def test_negative_deflection_detected(self):
|
||||
assert sticks_active([-0.5, 0.0, 0.0, 0.0], (0,)) is True
|
||||
|
||||
def test_out_of_range_index_skipped(self):
|
||||
# index 5 is out of range for a 4-element list — should not crash
|
||||
assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 5)) is False
|
||||
|
||||
def test_only_monitored_axes_checked(self):
|
||||
# axes[4] = 0.9 but only (0,1,2,3) are monitored
|
||||
assert sticks_active([0.0, 0.0, 0.0, 0.0, 0.9], (0, 1, 2, 3)) is False
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# ramp_alpha / alpha_for_mode
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestRampAlpha:
|
||||
|
||||
def test_zero_elapsed_returns_zero(self):
|
||||
assert ramp_alpha(0.0, 0.5) == pytest.approx(0.0)
|
||||
|
||||
def test_full_elapsed_returns_one(self):
|
||||
assert ramp_alpha(0.5, 0.5) == pytest.approx(1.0)
|
||||
|
||||
def test_beyond_duration_clamped_to_one(self):
|
||||
assert ramp_alpha(1.0, 0.5) == pytest.approx(1.0)
|
||||
|
||||
def test_halfway(self):
|
||||
assert ramp_alpha(0.25, 0.5) == pytest.approx(0.5)
|
||||
|
||||
def test_zero_duration_returns_one(self):
|
||||
assert ramp_alpha(0.0, 0.0) == pytest.approx(1.0)
|
||||
|
||||
|
||||
class TestAlphaForMode:
|
||||
|
||||
def test_rc_returns_zero(self):
|
||||
assert alpha_for_mode("RC", 0.3, 0.5) == 0.0
|
||||
|
||||
def test_auto_returns_one(self):
|
||||
assert alpha_for_mode("AUTO", 0.3, 0.5) == 1.0
|
||||
|
||||
def test_ramp_to_auto_mid(self):
|
||||
assert alpha_for_mode("RAMP_TO_AUTO", 0.25, 0.5) == pytest.approx(0.5)
|
||||
|
||||
def test_ramp_to_rc_mid(self):
|
||||
assert alpha_for_mode("RAMP_TO_RC", 0.25, 0.5) == pytest.approx(0.5)
|
||||
|
||||
def test_ramp_to_auto_start(self):
|
||||
assert alpha_for_mode("RAMP_TO_AUTO", 0.0, 0.5) == pytest.approx(0.0)
|
||||
|
||||
def test_ramp_to_rc_start(self):
|
||||
assert alpha_for_mode("RAMP_TO_RC", 0.0, 0.5) == pytest.approx(1.0)
|
||||
|
||||
def test_ramp_to_auto_complete(self):
|
||||
assert alpha_for_mode("RAMP_TO_AUTO", 0.5, 0.5) == pytest.approx(1.0)
|
||||
|
||||
def test_ramp_to_rc_complete(self):
|
||||
assert alpha_for_mode("RAMP_TO_RC", 0.5, 0.5) == pytest.approx(0.0)
|
||||
|
||||
def test_unknown_mode_returns_zero(self):
|
||||
assert alpha_for_mode("UNKNOWN", 0.3, 0.5) == 0.0
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# led_pattern
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestLedPattern:
|
||||
|
||||
def test_no_rc_link_overrides_all(self):
|
||||
for mode in ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC"):
|
||||
assert led_pattern(mode, slam_ok=True, rc_link_ok=False) == "blink_red_fast"
|
||||
|
||||
def test_rc_mode_yellow(self):
|
||||
assert led_pattern("RC", slam_ok=True, rc_link_ok=True) == "solid_yellow"
|
||||
|
||||
def test_ramp_to_rc_yellow(self):
|
||||
assert led_pattern("RAMP_TO_RC", slam_ok=True, rc_link_ok=True) == "solid_yellow"
|
||||
|
||||
def test_auto_ok_green(self):
|
||||
assert led_pattern("AUTO", slam_ok=True, rc_link_ok=True) == "solid_green"
|
||||
|
||||
def test_auto_no_slam_orange(self):
|
||||
assert led_pattern("AUTO", slam_ok=False, rc_link_ok=True) == "blink_orange_fast"
|
||||
|
||||
def test_ramp_to_auto_blink_green(self):
|
||||
assert led_pattern("RAMP_TO_AUTO", slam_ok=True, rc_link_ok=True) == "blink_green_slow"
|
||||
|
||||
def test_rc_slam_state_irrelevant(self):
|
||||
p1 = led_pattern("RC", slam_ok=True, rc_link_ok=True)
|
||||
p2 = led_pattern("RC", slam_ok=False, rc_link_ok=True)
|
||||
assert p1 == p2 # SLAM doesn't affect RC LED
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# step_state — RC transitions
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestStepStateFromRC:
|
||||
|
||||
def test_rc_stays_rc_by_default(self):
|
||||
mode, override, instant = _step(mode="RC")
|
||||
assert mode == "RC"
|
||||
assert not instant
|
||||
|
||||
def test_rc_to_ramp_when_ch6_auto(self):
|
||||
mode, override, instant = _step(mode="RC", ch6_auto=True)
|
||||
assert mode == "RAMP_TO_AUTO"
|
||||
assert not instant
|
||||
|
||||
def test_rc_stays_rc_when_ch6_auto_but_no_slam(self):
|
||||
mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False)
|
||||
assert mode == "RC"
|
||||
|
||||
def test_rc_stays_rc_when_ch6_auto_but_no_rc_link(self):
|
||||
mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False)
|
||||
assert mode == "RC"
|
||||
assert instant # RC link lost → instant
|
||||
|
||||
def test_rc_stays_rc_when_override_active_and_sticks_not_neutral(self):
|
||||
mode, _, _ = _step(
|
||||
mode="RC", ch6_auto=True, slam_ok=True,
|
||||
override_active=True, neutral_elapsed=1.0, return_delay=2.0)
|
||||
assert mode == "RC"
|
||||
|
||||
def test_rc_to_ramp_after_override_clears(self):
|
||||
# override clears after sticks neutral >= return_delay
|
||||
mode, override, _ = _step(
|
||||
mode="RC", ch6_auto=True, slam_ok=True,
|
||||
override_active=True, neutral_elapsed=2.5, return_delay=2.0)
|
||||
assert mode == "RAMP_TO_AUTO"
|
||||
assert not override
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# step_state — RAMP_TO_AUTO transitions
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestStepStateFromRampToAuto:
|
||||
|
||||
def test_continues_ramping(self):
|
||||
mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.2, ramp_s=0.5,
|
||||
ch6_auto=True)
|
||||
assert mode == "RAMP_TO_AUTO"
|
||||
|
||||
def test_completes_to_auto(self):
|
||||
mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.5, ramp_s=0.5,
|
||||
ch6_auto=True)
|
||||
assert mode == "AUTO"
|
||||
|
||||
def test_stick_override_instant_rc(self):
|
||||
mode, override, instant = _step(
|
||||
mode="RAMP_TO_AUTO", ch6_auto=True,
|
||||
sticks_now_active=True)
|
||||
assert mode == "RC"
|
||||
assert override is True
|
||||
assert instant is True
|
||||
|
||||
def test_ch6_low_aborts_to_ramp_rc(self):
|
||||
mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=False, ramp_elapsed=0.1)
|
||||
assert mode == "RAMP_TO_RC"
|
||||
|
||||
def test_slam_lost_aborts_to_ramp_rc(self):
|
||||
mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=True, slam_ok=False)
|
||||
assert mode == "RAMP_TO_RC"
|
||||
|
||||
def test_rc_link_lost_instant_rc(self):
|
||||
mode, _, instant = _step(mode="RAMP_TO_AUTO", ch6_auto=True, rc_link_ok=False)
|
||||
assert mode == "RC"
|
||||
assert instant is True
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# step_state — AUTO transitions
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestStepStateFromAuto:
|
||||
|
||||
def test_auto_stays_auto(self):
|
||||
mode, _, _ = _step(mode="AUTO", ch6_auto=True)
|
||||
assert mode == "AUTO"
|
||||
|
||||
def test_stick_override_instant_rc(self):
|
||||
mode, override, instant = _step(
|
||||
mode="AUTO", ch6_auto=True, sticks_now_active=True)
|
||||
assert mode == "RC"
|
||||
assert override is True
|
||||
assert instant is True
|
||||
|
||||
def test_ch6_low_ramp_to_rc(self):
|
||||
mode, _, instant = _step(mode="AUTO", ch6_auto=False)
|
||||
assert mode == "RAMP_TO_RC"
|
||||
assert not instant
|
||||
|
||||
def test_slam_lost_ramp_to_rc(self):
|
||||
mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False)
|
||||
assert mode == "RAMP_TO_RC"
|
||||
assert not instant
|
||||
|
||||
def test_rc_link_lost_instant_rc(self):
|
||||
mode, _, instant = _step(mode="AUTO", ch6_auto=True, rc_link_ok=False)
|
||||
assert mode == "RC"
|
||||
assert instant is True
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# step_state — RAMP_TO_RC transitions
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestStepStateFromRampToRC:
|
||||
|
||||
def test_continues_ramping(self):
|
||||
mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.2, ramp_s=0.5)
|
||||
assert mode == "RAMP_TO_RC"
|
||||
|
||||
def test_completes_to_rc(self):
|
||||
mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.5, ramp_s=0.5)
|
||||
assert mode == "RC"
|
||||
|
||||
def test_stick_override_instant_rc(self):
|
||||
mode, override, instant = _step(
|
||||
mode="RAMP_TO_RC", sticks_now_active=True, ch6_auto=True)
|
||||
assert mode == "RC"
|
||||
assert instant is True
|
||||
|
||||
def test_ch6_still_auto_no_override_cancels_back(self):
|
||||
# CH6 still says AUTO, no override → cancel to RAMP_TO_AUTO
|
||||
mode, _, _ = _step(mode="RAMP_TO_RC", ch6_auto=True, slam_ok=True,
|
||||
override_active=False, ramp_elapsed=0.1)
|
||||
assert mode == "RAMP_TO_AUTO"
|
||||
|
||||
def test_rc_link_lost_instant_rc(self):
|
||||
_, _, instant = _step(mode="RAMP_TO_RC", rc_link_ok=False)
|
||||
assert instant is True
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# blend_twist
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestBlendTwist:
|
||||
|
||||
def test_alpha_zero_zeros_output(self):
|
||||
lx, az = blend_twist(1.0, 0.5, 0.0)
|
||||
assert lx == pytest.approx(0.0)
|
||||
assert az == pytest.approx(0.0)
|
||||
|
||||
def test_alpha_one_passes_through(self):
|
||||
lx, az = blend_twist(0.4, -0.7, 1.0)
|
||||
assert lx == pytest.approx(0.4)
|
||||
assert az == pytest.approx(-0.7)
|
||||
|
||||
def test_alpha_half_scales(self):
|
||||
lx, az = blend_twist(1.0, 1.0, 0.5)
|
||||
assert lx == pytest.approx(0.5)
|
||||
assert az == pytest.approx(0.5)
|
||||
|
||||
def test_alpha_above_one_clamped(self):
|
||||
lx, az = blend_twist(1.0, 1.0, 1.5)
|
||||
assert lx == pytest.approx(1.0)
|
||||
assert az == pytest.approx(1.0)
|
||||
|
||||
def test_alpha_below_zero_clamped(self):
|
||||
lx, az = blend_twist(1.0, 1.0, -0.5)
|
||||
assert lx == pytest.approx(0.0)
|
||||
assert az == pytest.approx(0.0)
|
||||
|
||||
def test_negative_cmd_vel_preserved(self):
|
||||
lx, az = blend_twist(-0.5, -1.0, 0.5)
|
||||
assert lx == pytest.approx(-0.25)
|
||||
assert az == pytest.approx(-0.5)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Integration scenarios
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestIntegrationScenarios:
|
||||
|
||||
def test_full_rc_to_auto_ramp_cycle(self):
|
||||
"""Simulate RC→RAMP_TO_AUTO→AUTO with 0.5s ramp at 20Hz."""
|
||||
mode = "RC"
|
||||
override = False
|
||||
ramp_s = 0.5
|
||||
dt = 1.0 / 20.0
|
||||
t_ramp = 0.0
|
||||
|
||||
# CH6 flipped to AUTO, SLAM ok
|
||||
mode, override, instant = step_state(
|
||||
mode, 0.0, ramp_s, True, False, 5.0, 2.0, True, True, override)
|
||||
assert mode == "RAMP_TO_AUTO"
|
||||
t_ramp = 0.0
|
||||
|
||||
# Simulate 10 ticks (0.5s) → should complete
|
||||
for tick in range(10):
|
||||
t_ramp += dt
|
||||
mode, override, instant = step_state(
|
||||
mode, t_ramp, ramp_s, True, False, 5.0, 2.0, True, True, override)
|
||||
|
||||
assert mode == "AUTO"
|
||||
assert alpha_for_mode("AUTO", t_ramp, ramp_s) == 1.0
|
||||
|
||||
def test_stick_override_then_return_to_auto(self):
|
||||
"""Stick override while AUTO → RC → neutral 2s → back to AUTO."""
|
||||
# Start in AUTO
|
||||
mode, override, instant = _step(
|
||||
mode="AUTO", ch6_auto=True, sticks_now_active=True)
|
||||
assert mode == "RC"
|
||||
assert override is True
|
||||
assert instant is True
|
||||
|
||||
# Sticks neutral but < 2s — stays RC
|
||||
mode, override, _ = _step(
|
||||
mode="RC", ch6_auto=True, slam_ok=True,
|
||||
override_active=True, neutral_elapsed=1.0, return_delay=2.0)
|
||||
assert mode == "RC"
|
||||
|
||||
# Sticks neutral >= 2s — override clears, enters ramp
|
||||
mode, override, _ = _step(
|
||||
mode="RC", ch6_auto=True, slam_ok=True,
|
||||
override_active=True, neutral_elapsed=2.0, return_delay=2.0)
|
||||
assert mode == "RAMP_TO_AUTO"
|
||||
assert override is False
|
||||
|
||||
def test_slam_lost_during_auto_graceful_exit(self):
|
||||
"""SLAM fix lost during AUTO → graceful RAMP_TO_RC (not instant)."""
|
||||
mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False)
|
||||
assert mode == "RAMP_TO_RC"
|
||||
assert not instant
|
||||
|
||||
def test_rc_link_lost_always_instant(self):
|
||||
"""RC link lost from any state → instant RC."""
|
||||
for start_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"):
|
||||
mode, _, instant = _step(mode=start_mode, ch6_auto=True, rc_link_ok=False)
|
||||
assert mode == "RC", f"from {start_mode}"
|
||||
assert instant is True, f"from {start_mode}"
|
||||
|
||||
def test_blend_alpha_sequence_through_ramp(self):
|
||||
"""Blend alpha is monotonically increasing during RAMP_TO_AUTO."""
|
||||
ramp_s = 0.5
|
||||
alphas = [
|
||||
alpha_for_mode("RAMP_TO_AUTO", t * 0.05, ramp_s)
|
||||
for t in range(11) # 0.0 to 0.5 s in 0.05 s steps
|
||||
]
|
||||
for a, b in zip(alphas, alphas[1:]):
|
||||
assert b >= a
|
||||
|
||||
def test_ramp_to_rc_alpha_decreasing(self):
|
||||
"""Blend alpha is monotonically decreasing during RAMP_TO_RC."""
|
||||
ramp_s = 0.5
|
||||
alphas = [
|
||||
alpha_for_mode("RAMP_TO_RC", t * 0.05, ramp_s)
|
||||
for t in range(11)
|
||||
]
|
||||
for a, b in zip(alphas, alphas[1:]):
|
||||
assert b <= a
|
||||
|
||||
def test_no_auto_without_slam(self):
|
||||
"""From RC, ch6=AUTO but slam=False → cannot enter AUTO path."""
|
||||
mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False)
|
||||
assert mode == "RC"
|
||||
|
||||
def test_no_auto_without_rc_link(self):
|
||||
"""From RC, ch6=AUTO but rc_link=False → stays RC (instant)."""
|
||||
mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False)
|
||||
assert mode == "RC"
|
||||
assert instant is True
|
||||
Loading…
x
Reference in New Issue
Block a user