feat(controls): Autonomous/RC mode switch with 500ms blend ramp (Issue #104) #114
4
jetson/ros2_ws/src/saltybot_mode_switch/.gitignore
vendored
Normal file
4
jetson/ros2_ws/src/saltybot_mode_switch/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
__pycache__/
|
||||||
|
*.pyc
|
||||||
|
*.egg-info/
|
||||||
|
.pytest_cache/
|
||||||
@ -0,0 +1,58 @@
|
|||||||
|
# mode_switch_params.yaml — saltybot_mode_switch
|
||||||
|
#
|
||||||
|
# Launch with:
|
||||||
|
# ros2 launch saltybot_mode_switch mode_switch.launch.py
|
||||||
|
#
|
||||||
|
# Topic wiring:
|
||||||
|
# /rc/joy → mode_switch_node (CRSF channels)
|
||||||
|
# /saltybot/balance_state → mode_switch_node (STM32 state)
|
||||||
|
# /slam_toolbox/pose_with_covariance_stamped → mode_switch_node (SLAM fix)
|
||||||
|
# /saltybot/control_mode ← mode_switch_node (JSON mode + alpha)
|
||||||
|
# /saltybot/led_pattern ← mode_switch_node (LED name)
|
||||||
|
#
|
||||||
|
# /cmd_vel_auto → cmd_vel_mux_node (from Nav2/follower)
|
||||||
|
# /saltybot/control_mode → cmd_vel_mux_node
|
||||||
|
# /cmd_vel ← cmd_vel_mux_node (to bridge)
|
||||||
|
#
|
||||||
|
# IMPORTANT: remap Nav2 and person_follower to publish /cmd_vel_auto:
|
||||||
|
# ros2 launch saltybot_bringup nav2.launch.py cmd_vel_topic:=/cmd_vel_auto
|
||||||
|
|
||||||
|
# ── CRSF channel mapping ──────────────────────────────────────────────────────
|
||||||
|
# Joy axes index for the AUX2 / CH6 mode-toggle switch.
|
||||||
|
# Typical CRSF→Joy mapping: CH1=axes[0] … CH6=axes[5].
|
||||||
|
ch6_axis: 5
|
||||||
|
|
||||||
|
# Space-separated Joy axes indices for the four control sticks (roll/pitch/throttle/yaw).
|
||||||
|
# These are monitored for the 10% override threshold.
|
||||||
|
stick_axes: "0 1 2 3"
|
||||||
|
|
||||||
|
# ── Override / return-to-auto ──────────────────────────────────────────────────
|
||||||
|
# Stick magnitude (0–1) that triggers instant RC override while in AUTO.
|
||||||
|
# 10% (0.10) per spec — change only if stick noise causes false triggers.
|
||||||
|
override_threshold: 0.10
|
||||||
|
|
||||||
|
# Seconds sticks must remain neutral before AUTO re-engages after an override.
|
||||||
|
return_delay_s: 2.0
|
||||||
|
|
||||||
|
# ── Ramp ──────────────────────────────────────────────────────────────────────
|
||||||
|
# RC↔AUTO blend ramp duration (seconds). cmd_vel_mux scales by blend_alpha
|
||||||
|
# so autonomous commands are eased in/out over this window.
|
||||||
|
ramp_duration_s: 0.5
|
||||||
|
|
||||||
|
# ── RC link health ────────────────────────────────────────────────────────────
|
||||||
|
# Seconds of Joy silence before RC link is declared lost → force RC mode.
|
||||||
|
rc_link_timeout_s: 0.5
|
||||||
|
|
||||||
|
# ── SLAM fix ──────────────────────────────────────────────────────────────────
|
||||||
|
# ROS2 topic providing the SLAM localisation pose. Any message received within
|
||||||
|
# slam_fix_timeout_s counts as a valid fix.
|
||||||
|
slam_fix_topic: "/slam_toolbox/pose_with_covariance_stamped"
|
||||||
|
|
||||||
|
# Seconds after the last SLAM pose before fix is considered lost.
|
||||||
|
slam_fix_timeout_s: 3.0
|
||||||
|
|
||||||
|
# ── Control loop ──────────────────────────────────────────────────────────────
|
||||||
|
control_rate: 20.0 # Hz
|
||||||
|
|
||||||
|
# ── cmd_vel_mux ───────────────────────────────────────────────────────────────
|
||||||
|
cmd_vel_timeout_s: 0.5 # deadman: zero output if /cmd_vel_auto goes silent
|
||||||
@ -0,0 +1,59 @@
|
|||||||
|
"""mode_switch.launch.py — launch mode_switch_node + cmd_vel_mux_node."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg_share = get_package_share_directory("saltybot_mode_switch")
|
||||||
|
default_cfg = os.path.join(pkg_share, "config", "mode_switch_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"params_file",
|
||||||
|
default_value=default_cfg,
|
||||||
|
description="Path to mode_switch_params.yaml",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"ramp_duration_s",
|
||||||
|
default_value="0.5",
|
||||||
|
description="RC↔AUTO blend ramp duration (seconds)",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"return_delay_s",
|
||||||
|
default_value="2.0",
|
||||||
|
description="Sticks-neutral duration before auto re-engage (seconds)",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"slam_fix_topic",
|
||||||
|
default_value="/slam_toolbox/pose_with_covariance_stamped",
|
||||||
|
description="SLAM fix pose topic",
|
||||||
|
),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_mode_switch",
|
||||||
|
executable="mode_switch_node",
|
||||||
|
name="mode_switch",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
LaunchConfiguration("params_file"),
|
||||||
|
{
|
||||||
|
"ramp_duration_s": LaunchConfiguration("ramp_duration_s"),
|
||||||
|
"return_delay_s": LaunchConfiguration("return_delay_s"),
|
||||||
|
"slam_fix_topic": LaunchConfiguration("slam_fix_topic"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_mode_switch",
|
||||||
|
executable="cmd_vel_mux_node",
|
||||||
|
name="cmd_vel_mux",
|
||||||
|
output="screen",
|
||||||
|
parameters=[LaunchConfiguration("params_file")],
|
||||||
|
),
|
||||||
|
])
|
||||||
31
jetson/ros2_ws/src/saltybot_mode_switch/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_mode_switch/package.xml
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_mode_switch</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Autonomous/RC mode switch for saltybot (Issue #104).
|
||||||
|
Monitors CRSF CH6 toggle, RC stick override, SLAM fix status, and RC link
|
||||||
|
health to manage RC↔AUTO transitions with a 500 ms blend ramp.
|
||||||
|
Publishes /saltybot/control_mode (JSON) and /saltybot/led_pattern.
|
||||||
|
Includes cmd_vel_mux_node that scales autonomous cmd_vel by blend_alpha.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,133 @@
|
|||||||
|
"""
|
||||||
|
cmd_vel_mux_node.py — Blend-alpha cmd_vel multiplexer (Issue #104).
|
||||||
|
|
||||||
|
Sits between the Nav2 / person-follower stack and the cmd_vel_bridge_node,
|
||||||
|
scaling the autonomous cmd_vel output by the blend_alpha published by
|
||||||
|
mode_switch_node so that the RC → AUTO handoff is smooth over 500 ms.
|
||||||
|
|
||||||
|
Topic graph
|
||||||
|
-----------
|
||||||
|
Nav2 / follower → /cmd_vel_auto ──┐
|
||||||
|
├── cmd_vel_mux → /cmd_vel → bridge
|
||||||
|
mode_switch → /saltybot/control_mode (blend_alpha)
|
||||||
|
|
||||||
|
In RC mode (blend_alpha ≈ 0) the node publishes Twist(0,0) so the bridge
|
||||||
|
receives zeros — this is harmless because the bridge's mode gate already
|
||||||
|
prevents autonomous commands when the STM32 is in RC_MANUAL.
|
||||||
|
|
||||||
|
The bridge's existing ESC ramp handles hardware-level smoothing;
|
||||||
|
the blend_alpha here provides the higher-level cmd_vel policy ramp.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
cmd_vel_timeout_s Deadman: zero output if /cmd_vel_auto goes silent (0.5 s)
|
||||||
|
control_rate Output publish rate (Hz) (20.0)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||||
|
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
from saltybot_mode_switch.mode_logic import blend_twist
|
||||||
|
|
||||||
|
_BIG = 1e9
|
||||||
|
|
||||||
|
|
||||||
|
class CmdVelMuxNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("cmd_vel_mux")
|
||||||
|
|
||||||
|
self.declare_parameter("cmd_vel_timeout_s", 0.5)
|
||||||
|
self.declare_parameter("control_rate", 20.0)
|
||||||
|
|
||||||
|
self._timeout = self.get_parameter("cmd_vel_timeout_s").value
|
||||||
|
self._rate = self.get_parameter("control_rate").value
|
||||||
|
|
||||||
|
reliable = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── State ─────────────────────────────────────────────────────────────
|
||||||
|
self._auto_linear_x: float = 0.0
|
||||||
|
self._auto_angular_z: float = 0.0
|
||||||
|
self._blend_alpha: float = 0.0
|
||||||
|
self._last_auto_t: float = 0.0
|
||||||
|
|
||||||
|
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(
|
||||||
|
Twist, "/cmd_vel_auto", self._auto_cb, reliable)
|
||||||
|
self.create_subscription(
|
||||||
|
String, "/saltybot/control_mode", self._mode_cb, reliable)
|
||||||
|
|
||||||
|
# ── Publisher ─────────────────────────────────────────────────────────
|
||||||
|
self._pub = self.create_publisher(Twist, "/cmd_vel", reliable)
|
||||||
|
|
||||||
|
# ── Timer ─────────────────────────────────────────────────────────────
|
||||||
|
self._timer = self.create_timer(1.0 / self._rate, self._publish_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"CmdVelMuxNode ready rate={self._rate:.0f}Hz "
|
||||||
|
f"timeout={self._timeout:.1f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _auto_cb(self, msg: Twist) -> None:
|
||||||
|
self._auto_linear_x = msg.linear.x
|
||||||
|
self._auto_angular_z = msg.angular.z
|
||||||
|
self._last_auto_t = time.monotonic()
|
||||||
|
|
||||||
|
def _mode_cb(self, msg: String) -> None:
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
self._blend_alpha = float(data.get("blend_alpha", 0.0))
|
||||||
|
except (json.JSONDecodeError, TypeError, ValueError):
|
||||||
|
self._blend_alpha = 0.0
|
||||||
|
|
||||||
|
# ── 20 Hz publish ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_cb(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
|
||||||
|
# Deadman: zero if autonomous source went silent
|
||||||
|
auto_age = (now - self._last_auto_t) if self._last_auto_t > 0.0 else _BIG
|
||||||
|
if auto_age > self._timeout:
|
||||||
|
linear_x, angular_z = 0.0, 0.0
|
||||||
|
else:
|
||||||
|
linear_x, angular_z = blend_twist(
|
||||||
|
self._auto_linear_x,
|
||||||
|
self._auto_angular_z,
|
||||||
|
self._blend_alpha,
|
||||||
|
)
|
||||||
|
|
||||||
|
twist = Twist()
|
||||||
|
twist.linear.x = linear_x
|
||||||
|
twist.angular.z = angular_z
|
||||||
|
self._pub.publish(twist)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = CmdVelMuxNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,241 @@
|
|||||||
|
"""
|
||||||
|
mode_logic.py — Pure RC/Autonomous mode-switch logic for saltybot.
|
||||||
|
|
||||||
|
All functions are stateless or operate on plain Python types so the full
|
||||||
|
state machine can be exercised in unit tests without a ROS2 runtime.
|
||||||
|
|
||||||
|
Mode vocabulary
|
||||||
|
---------------
|
||||||
|
"RC" — STM32 executing RC pilot commands; Jetson cmd_vel blocked.
|
||||||
|
"RAMP_TO_AUTO" — Transitioning RC→AUTO; blend_alpha 0.0→1.0 over ramp_s.
|
||||||
|
"AUTO" — STM32 executing Jetson cmd_vel; RC sticks idle.
|
||||||
|
"RAMP_TO_RC" — Transitioning AUTO→RC; blend_alpha 1.0→0.0 over ramp_s.
|
||||||
|
|
||||||
|
Blend alpha
|
||||||
|
-----------
|
||||||
|
The node publishes blend_alpha ∈ [0, 1] in /saltybot/control_mode.
|
||||||
|
The cmd_vel_mux_node scales autonomous cmd_vel by alpha before publishing
|
||||||
|
to /cmd_vel (the bridge input). alpha=0 ⟹ no autonomous motion.
|
||||||
|
|
||||||
|
RC override
|
||||||
|
-----------
|
||||||
|
Any stick magnitude > override_threshold while in AUTO instantly forces RC
|
||||||
|
(alpha → 0 immediately; no ramp). _override_active is set so the
|
||||||
|
return-to-auto path requires sticks to be neutral for return_delay seconds.
|
||||||
|
|
||||||
|
CH6 / toggle
|
||||||
|
------------
|
||||||
|
ch6_auto=True means the pilot has requested AUTO (switch up/high).
|
||||||
|
ch6_auto=False means the pilot has requested RC (switch down/low).
|
||||||
|
Pure CH6 transitions (no stick activity) always ramp gracefully.
|
||||||
|
|
||||||
|
Safety interlocks
|
||||||
|
-----------------
|
||||||
|
• No AUTO without slam_ok (valid SLAM fix received recently).
|
||||||
|
• Instant drop to RC when rc_link_ok goes False (RC link lost).
|
||||||
|
These checks are applied at every control tick.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
# ─── Constants ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
MODES = ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC")
|
||||||
|
|
||||||
|
LED_PATTERNS = {
|
||||||
|
"rc_normal": "solid_yellow",
|
||||||
|
"rc_no_slam": "solid_yellow", # no slam needed in RC
|
||||||
|
"auto_ok": "solid_green",
|
||||||
|
"auto_no_slam": "blink_orange_fast", # shouldn't happen (interlock), but defensive
|
||||||
|
"ramp_to_auto": "blink_green_slow",
|
||||||
|
"ramp_to_rc": "blink_yellow_slow",
|
||||||
|
"no_rc_link": "blink_red_fast",
|
||||||
|
}
|
||||||
|
|
||||||
|
# ─── Stick / CH6 helpers ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def ch6_wants_auto(axes: list, ch6_idx: int, threshold: float = 0.5) -> bool:
|
||||||
|
"""
|
||||||
|
Return True when CH6 (aux switch) is in the AUTO position.
|
||||||
|
|
||||||
|
Typical CRSF→Joy mapping: AUX2 = axes[5].
|
||||||
|
Switch up (high) = +1.0 → AUTO; switch down (low) = -1.0 → RC.
|
||||||
|
threshold default 0.5 gives hysteresis against mid-travel noise.
|
||||||
|
"""
|
||||||
|
if ch6_idx < 0 or ch6_idx >= len(axes):
|
||||||
|
return False
|
||||||
|
return float(axes[ch6_idx]) > threshold
|
||||||
|
|
||||||
|
|
||||||
|
def sticks_active(
|
||||||
|
axes: list,
|
||||||
|
stick_indices: tuple,
|
||||||
|
threshold: float = 0.10,
|
||||||
|
) -> bool:
|
||||||
|
"""
|
||||||
|
Return True when any monitored stick axis exceeds threshold magnitude.
|
||||||
|
|
||||||
|
stick_indices: tuple of Joy axes indices for the four sticks
|
||||||
|
(roll, pitch, throttle, yaw). Default CRSF mapping:
|
||||||
|
(0, 1, 2, 3).
|
||||||
|
threshold: 10 % of full deflection (issue spec).
|
||||||
|
"""
|
||||||
|
return any(
|
||||||
|
abs(float(axes[i])) > threshold
|
||||||
|
for i in stick_indices
|
||||||
|
if i < len(axes)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Alpha / ramp helpers ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def ramp_alpha(elapsed_s: float, ramp_s: float) -> float:
|
||||||
|
"""
|
||||||
|
Compute forward blend alpha [0, 1] linearly over ramp_s seconds.
|
||||||
|
|
||||||
|
elapsed_s : seconds since ramp start.
|
||||||
|
ramp_s : total ramp duration (seconds).
|
||||||
|
"""
|
||||||
|
if ramp_s <= 0.0:
|
||||||
|
return 1.0
|
||||||
|
return min(1.0, elapsed_s / ramp_s)
|
||||||
|
|
||||||
|
|
||||||
|
def alpha_for_mode(mode: str, elapsed_s: float, ramp_s: float) -> float:
|
||||||
|
"""
|
||||||
|
Return the blend alpha appropriate for `mode`.
|
||||||
|
|
||||||
|
RC → 0.0
|
||||||
|
RAMP_TO_AUTO → 0.0 … 1.0 (linear with elapsed)
|
||||||
|
AUTO → 1.0
|
||||||
|
RAMP_TO_RC → 1.0 … 0.0 (linear with elapsed)
|
||||||
|
"""
|
||||||
|
if mode == "RC":
|
||||||
|
return 0.0
|
||||||
|
if mode == "AUTO":
|
||||||
|
return 1.0
|
||||||
|
if mode == "RAMP_TO_AUTO":
|
||||||
|
return ramp_alpha(elapsed_s, ramp_s)
|
||||||
|
if mode == "RAMP_TO_RC":
|
||||||
|
return 1.0 - ramp_alpha(elapsed_s, ramp_s)
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ─── LED pattern ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def led_pattern(mode: str, slam_ok: bool, rc_link_ok: bool) -> str:
|
||||||
|
"""Return the LED pattern name for the current operating state."""
|
||||||
|
if not rc_link_ok:
|
||||||
|
return LED_PATTERNS["no_rc_link"]
|
||||||
|
if mode in ("RC", "RAMP_TO_RC"):
|
||||||
|
return LED_PATTERNS["rc_normal"]
|
||||||
|
if mode == "RAMP_TO_AUTO":
|
||||||
|
return LED_PATTERNS["ramp_to_auto"]
|
||||||
|
if mode == "AUTO":
|
||||||
|
return LED_PATTERNS["auto_ok"] if slam_ok else LED_PATTERNS["auto_no_slam"]
|
||||||
|
return LED_PATTERNS["no_rc_link"]
|
||||||
|
|
||||||
|
|
||||||
|
# ─── State machine step ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def step_state(
|
||||||
|
current_mode: str,
|
||||||
|
ramp_elapsed_s: float,
|
||||||
|
ramp_s: float,
|
||||||
|
ch6_auto: bool,
|
||||||
|
sticks_now_active: bool,
|
||||||
|
sticks_neutral_elapsed_s: float,
|
||||||
|
return_delay_s: float,
|
||||||
|
slam_ok: bool,
|
||||||
|
rc_link_ok: bool,
|
||||||
|
override_active: bool,
|
||||||
|
) -> tuple:
|
||||||
|
"""
|
||||||
|
Compute (next_mode, next_override_active, instant_to_rc).
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
current_mode : current mode string
|
||||||
|
ramp_elapsed_s : seconds since last ramp start
|
||||||
|
ramp_s : configured ramp duration
|
||||||
|
ch6_auto : True = pilot switch requests AUTO
|
||||||
|
sticks_now_active : True = at least one stick > override_threshold
|
||||||
|
sticks_neutral_elapsed_s: seconds since all sticks went neutral (0 if still active)
|
||||||
|
return_delay_s : seconds of stick-neutral required to re-engage AUTO
|
||||||
|
slam_ok : True = SLAM fix is valid
|
||||||
|
rc_link_ok : True = CRSF link alive
|
||||||
|
override_active : True = currently in stick-override RC
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
next_mode : new mode string
|
||||||
|
next_override_active: updated override flag
|
||||||
|
instant_to_rc : True when dropping to RC without ramp (safety / stick override)
|
||||||
|
"""
|
||||||
|
instant_to_rc = False
|
||||||
|
|
||||||
|
# ── Safety: RC link lost → instant RC ────────────────────────────────────
|
||||||
|
if not rc_link_ok:
|
||||||
|
return "RC", False, True
|
||||||
|
|
||||||
|
# ── Compute override transitions ──────────────────────────────────────────
|
||||||
|
next_override = override_active
|
||||||
|
if sticks_now_active and current_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"):
|
||||||
|
# Stick input while AUTO or any transition → instant RC override
|
||||||
|
next_override = True
|
||||||
|
return "RC", True, True
|
||||||
|
|
||||||
|
if not sticks_now_active and sticks_neutral_elapsed_s >= return_delay_s:
|
||||||
|
# Sticks have been neutral long enough — clear override
|
||||||
|
next_override = False
|
||||||
|
|
||||||
|
# ── Ramp completion ───────────────────────────────────────────────────────
|
||||||
|
ramp_done = ramp_elapsed_s >= ramp_s - 1e-9 # epsilon guards float accumulation
|
||||||
|
|
||||||
|
# ── Transitions per state ─────────────────────────────────────────────────
|
||||||
|
if current_mode == "RC":
|
||||||
|
if ch6_auto and slam_ok and not next_override:
|
||||||
|
return "RAMP_TO_AUTO", next_override, False
|
||||||
|
return "RC", next_override, False
|
||||||
|
|
||||||
|
if current_mode == "RAMP_TO_AUTO":
|
||||||
|
if not ch6_auto:
|
||||||
|
return "RAMP_TO_RC", next_override, False
|
||||||
|
if not slam_ok:
|
||||||
|
return "RAMP_TO_RC", next_override, False
|
||||||
|
if ramp_done:
|
||||||
|
return "AUTO", next_override, False
|
||||||
|
return "RAMP_TO_AUTO", next_override, False
|
||||||
|
|
||||||
|
if current_mode == "AUTO":
|
||||||
|
if not ch6_auto:
|
||||||
|
return "RAMP_TO_RC", next_override, False
|
||||||
|
if not slam_ok:
|
||||||
|
return "RAMP_TO_RC", next_override, False
|
||||||
|
return "AUTO", next_override, False
|
||||||
|
|
||||||
|
if current_mode == "RAMP_TO_RC":
|
||||||
|
if ch6_auto and slam_ok and not next_override:
|
||||||
|
return "RAMP_TO_AUTO", next_override, False
|
||||||
|
if ramp_done:
|
||||||
|
return "RC", next_override, False
|
||||||
|
return "RAMP_TO_RC", next_override, False
|
||||||
|
|
||||||
|
return "RC", next_override, True # unknown state → safe fallback
|
||||||
|
|
||||||
|
|
||||||
|
# ─── cmd_vel blending ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def blend_twist(
|
||||||
|
auto_linear_x: float,
|
||||||
|
auto_angular_z: float,
|
||||||
|
alpha: float,
|
||||||
|
) -> tuple:
|
||||||
|
"""
|
||||||
|
Scale autonomous cmd_vel by blend alpha.
|
||||||
|
|
||||||
|
Returns (linear_x, angular_z) for the blended output.
|
||||||
|
alpha=0 → zeros; alpha=1 → full autonomous.
|
||||||
|
"""
|
||||||
|
a = max(0.0, min(1.0, alpha))
|
||||||
|
return auto_linear_x * a, auto_angular_z * a
|
||||||
@ -0,0 +1,307 @@
|
|||||||
|
"""
|
||||||
|
mode_switch_node.py — Autonomous/RC mode switch for saltybot (Issue #104).
|
||||||
|
|
||||||
|
Inputs
|
||||||
|
------
|
||||||
|
/rc/joy (sensor_msgs/Joy)
|
||||||
|
CRSF channel axes:
|
||||||
|
axes[ch6_axis] AUX2 — mode toggle (+1=AUTO, -1=RC)
|
||||||
|
axes[stick_axes...] Roll/Pitch/Throttle/Yaw — override detection
|
||||||
|
|
||||||
|
/saltybot/balance_state (std_msgs/String JSON)
|
||||||
|
Parsed for RC link health (field "rc_link") and STM32 mode.
|
||||||
|
|
||||||
|
<slam_fix_topic> (geometry_msgs/PoseWithCovarianceStamped)
|
||||||
|
Any message received within slam_fix_timeout_s → SLAM fix valid.
|
||||||
|
|
||||||
|
Outputs
|
||||||
|
-------
|
||||||
|
/saltybot/control_mode (std_msgs/String JSON)
|
||||||
|
{
|
||||||
|
"mode": "RC" | "RAMP_TO_AUTO" | "AUTO" | "RAMP_TO_RC",
|
||||||
|
"blend_alpha": 0.0 … 1.0,
|
||||||
|
"slam_ok": bool,
|
||||||
|
"rc_link_ok": bool,
|
||||||
|
"override_active": bool
|
||||||
|
}
|
||||||
|
|
||||||
|
/saltybot/led_pattern (std_msgs/String)
|
||||||
|
LED pattern name consumed by the LED controller node.
|
||||||
|
|
||||||
|
State machine
|
||||||
|
-------------
|
||||||
|
See mode_logic.py for the full transition table.
|
||||||
|
|
||||||
|
RC — pilot in control; blend_alpha = 0.0
|
||||||
|
RAMP_TO_AUTO — transitioning; blend_alpha 0→1 over ramp_duration_s (500 ms)
|
||||||
|
AUTO — Jetson in control; blend_alpha = 1.0
|
||||||
|
RAMP_TO_RC — transitioning; blend_alpha 1→0 over ramp_duration_s (500 ms)
|
||||||
|
|
||||||
|
Safety interlocks
|
||||||
|
-----------------
|
||||||
|
• Any stick > override_threshold while AUTO/RAMP_TO_AUTO → instant RC.
|
||||||
|
• rc_link lost (Joy silent > rc_link_timeout_s) → instant RC.
|
||||||
|
• AUTO not entered without valid SLAM fix.
|
||||||
|
• SLAM fix lost while AUTO → RAMP_TO_RC.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
ch6_axis Joy axes index for the mode-toggle switch (default 5)
|
||||||
|
stick_axes Space-separated Joy indices for 4 sticks (default "0 1 2 3")
|
||||||
|
override_threshold Stick magnitude for instant RC override (default 0.10)
|
||||||
|
return_delay_s Sticks-neutral duration before re-engage (default 2.0 s)
|
||||||
|
ramp_duration_s RC↔AUTO blend ramp duration (default 0.5 s)
|
||||||
|
rc_link_timeout_s Joy silence threshold → RC link lost (default 0.5 s)
|
||||||
|
slam_fix_topic Pose topic for SLAM fix detection (default /slam_toolbox/pose_with_covariance_stamped)
|
||||||
|
slam_fix_timeout_s Pose silence threshold → SLAM fix lost (default 3.0 s)
|
||||||
|
control_rate State machine tick rate (Hz) (default 20.0)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy, DurabilityPolicy
|
||||||
|
|
||||||
|
from geometry_msgs.msg import PoseWithCovarianceStamped
|
||||||
|
from sensor_msgs.msg import Joy
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
from saltybot_mode_switch.mode_logic import (
|
||||||
|
alpha_for_mode,
|
||||||
|
ch6_wants_auto,
|
||||||
|
led_pattern,
|
||||||
|
step_state,
|
||||||
|
sticks_active,
|
||||||
|
)
|
||||||
|
|
||||||
|
_BIG = 1e9 # sentinel: "never received"
|
||||||
|
|
||||||
|
|
||||||
|
class ModeSwitchNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("mode_switch")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("ch6_axis", 5)
|
||||||
|
self.declare_parameter("stick_axes", "0 1 2 3")
|
||||||
|
self.declare_parameter("override_threshold", 0.10)
|
||||||
|
self.declare_parameter("return_delay_s", 2.0)
|
||||||
|
self.declare_parameter("ramp_duration_s", 0.5)
|
||||||
|
self.declare_parameter("rc_link_timeout_s", 0.5)
|
||||||
|
self.declare_parameter("slam_fix_topic",
|
||||||
|
"/slam_toolbox/pose_with_covariance_stamped")
|
||||||
|
self.declare_parameter("slam_fix_timeout_s", 3.0)
|
||||||
|
self.declare_parameter("control_rate", 20.0)
|
||||||
|
|
||||||
|
self._reload_params()
|
||||||
|
|
||||||
|
# ── State ─────────────────────────────────────────────────────────────
|
||||||
|
self._mode: str = "RC"
|
||||||
|
self._override_active: bool = False
|
||||||
|
self._ramp_start: float = 0.0 # monotonic time ramp began
|
||||||
|
self._sticks_neutral_since: float | None = None
|
||||||
|
self._last_joy_t: float = 0.0 # monotonic; 0 = never
|
||||||
|
self._last_slam_t: float = 0.0
|
||||||
|
self._joy_axes: list = []
|
||||||
|
self._stm32_mode: int = 0 # from balance_state JSON
|
||||||
|
|
||||||
|
# ── QoS ───────────────────────────────────────────────────────────────
|
||||||
|
best_effort = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
reliable = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
# SLAM toolbox uses transient_local for its pose topic
|
||||||
|
transient = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
||||||
|
depth=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(Joy, "/rc/joy", self._joy_cb, best_effort)
|
||||||
|
self.create_subscription(
|
||||||
|
String, "/saltybot/balance_state", self._balance_cb, reliable)
|
||||||
|
|
||||||
|
slam_topic = self.get_parameter("slam_fix_topic").value
|
||||||
|
# Try transient_local first (SLAM Toolbox); fall back gracefully if not
|
||||||
|
try:
|
||||||
|
self.create_subscription(
|
||||||
|
PoseWithCovarianceStamped, slam_topic,
|
||||||
|
self._slam_cb, transient)
|
||||||
|
except Exception:
|
||||||
|
self.create_subscription(
|
||||||
|
PoseWithCovarianceStamped, slam_topic,
|
||||||
|
self._slam_cb, reliable)
|
||||||
|
|
||||||
|
# ── Publishers ────────────────────────────────────────────────────────
|
||||||
|
self._mode_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/control_mode", reliable)
|
||||||
|
self._led_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/led_pattern", reliable)
|
||||||
|
|
||||||
|
# ── Control timer ─────────────────────────────────────────────────────
|
||||||
|
self._timer = self.create_timer(
|
||||||
|
1.0 / self._p["control_rate"], self._tick)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"ModeSwitchNode ready "
|
||||||
|
f"ramp={self._p['ramp_duration_s']:.1f}s "
|
||||||
|
f"override_thresh={self._p['override_threshold']:.0%} "
|
||||||
|
f"return_delay={self._p['return_delay_s']:.1f}s "
|
||||||
|
f"slam_topic={slam_topic}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Parameter reload ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _reload_params(self):
|
||||||
|
raw_stick_axes = self.get_parameter("stick_axes").value
|
||||||
|
self._p = {
|
||||||
|
"ch6_axis": int(self.get_parameter("ch6_axis").value),
|
||||||
|
"stick_indices": tuple(int(x) for x in raw_stick_axes.split()),
|
||||||
|
"override_threshold": float(self.get_parameter("override_threshold").value),
|
||||||
|
"return_delay_s": float(self.get_parameter("return_delay_s").value),
|
||||||
|
"ramp_duration_s": float(self.get_parameter("ramp_duration_s").value),
|
||||||
|
"rc_link_timeout_s": float(self.get_parameter("rc_link_timeout_s").value),
|
||||||
|
"slam_fix_timeout_s": float(self.get_parameter("slam_fix_timeout_s").value),
|
||||||
|
"control_rate": float(self.get_parameter("control_rate").value),
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _joy_cb(self, msg: Joy) -> None:
|
||||||
|
self._joy_axes = list(msg.axes)
|
||||||
|
self._last_joy_t = time.monotonic()
|
||||||
|
|
||||||
|
def _balance_cb(self, msg: String) -> None:
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
# "mode" is a label string; map back to int for reference
|
||||||
|
mode_label = data.get("mode", "RC_MANUAL")
|
||||||
|
self._stm32_mode = {"RC_MANUAL": 0, "RC_ASSISTED": 1,
|
||||||
|
"AUTONOMOUS": 2}.get(mode_label, 0)
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _slam_cb(self, msg: PoseWithCovarianceStamped) -> None:
|
||||||
|
self._last_slam_t = time.monotonic()
|
||||||
|
|
||||||
|
# ── 20 Hz control tick ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _tick(self) -> None:
|
||||||
|
self._reload_params()
|
||||||
|
now = time.monotonic()
|
||||||
|
p = self._p
|
||||||
|
|
||||||
|
# ── Derive inputs ─────────────────────────────────────────────────────
|
||||||
|
joy_age = (now - self._last_joy_t) if self._last_joy_t > 0.0 else _BIG
|
||||||
|
slam_age = (now - self._last_slam_t) if self._last_slam_t > 0.0 else _BIG
|
||||||
|
rc_link_ok = joy_age <= p["rc_link_timeout_s"]
|
||||||
|
slam_ok = slam_age <= p["slam_fix_timeout_s"]
|
||||||
|
|
||||||
|
axes = self._joy_axes
|
||||||
|
ch6 = ch6_wants_auto(axes, p["ch6_axis"])
|
||||||
|
active = sticks_active(axes, p["stick_indices"], p["override_threshold"])
|
||||||
|
|
||||||
|
# Track neutral timer
|
||||||
|
if active:
|
||||||
|
self._sticks_neutral_since = None
|
||||||
|
elif self._sticks_neutral_since is None:
|
||||||
|
self._sticks_neutral_since = now
|
||||||
|
|
||||||
|
neutral_elapsed = (
|
||||||
|
(now - self._sticks_neutral_since)
|
||||||
|
if self._sticks_neutral_since is not None else 0.0
|
||||||
|
)
|
||||||
|
|
||||||
|
ramp_elapsed = now - self._ramp_start
|
||||||
|
|
||||||
|
# ── State machine step ────────────────────────────────────────────────
|
||||||
|
prev_mode = self._mode
|
||||||
|
next_mode, next_override, instant = step_state(
|
||||||
|
current_mode=self._mode,
|
||||||
|
ramp_elapsed_s=ramp_elapsed,
|
||||||
|
ramp_s=p["ramp_duration_s"],
|
||||||
|
ch6_auto=ch6,
|
||||||
|
sticks_now_active=active,
|
||||||
|
sticks_neutral_elapsed_s=neutral_elapsed,
|
||||||
|
return_delay_s=p["return_delay_s"],
|
||||||
|
slam_ok=slam_ok,
|
||||||
|
rc_link_ok=rc_link_ok,
|
||||||
|
override_active=self._override_active,
|
||||||
|
)
|
||||||
|
|
||||||
|
if next_mode != prev_mode:
|
||||||
|
# Log transitions; reset ramp timer on ramp entry
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Mode: {prev_mode} → {next_mode}"
|
||||||
|
f" slam_ok={slam_ok} rc_link={rc_link_ok}"
|
||||||
|
f" override={next_override}"
|
||||||
|
)
|
||||||
|
if next_mode in ("RAMP_TO_AUTO", "RAMP_TO_RC"):
|
||||||
|
self._ramp_start = now
|
||||||
|
ramp_elapsed = 0.0
|
||||||
|
|
||||||
|
self._mode = next_mode
|
||||||
|
self._override_active = next_override
|
||||||
|
|
||||||
|
# ── Compute blend alpha ───────────────────────────────────────────────
|
||||||
|
current_ramp_elapsed = now - self._ramp_start
|
||||||
|
alpha = alpha_for_mode(next_mode, current_ramp_elapsed, p["ramp_duration_s"])
|
||||||
|
if instant:
|
||||||
|
alpha = 0.0
|
||||||
|
|
||||||
|
# ── Log steady warnings ───────────────────────────────────────────────
|
||||||
|
if next_mode == "AUTO" and not slam_ok:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"In AUTO but SLAM fix lost — will exit AUTO",
|
||||||
|
throttle_duration_sec=2.0)
|
||||||
|
if not rc_link_ok:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"RC link lost — forced RC mode",
|
||||||
|
throttle_duration_sec=1.0)
|
||||||
|
|
||||||
|
# ── Publish /saltybot/control_mode ────────────────────────────────────
|
||||||
|
payload = {
|
||||||
|
"mode": next_mode,
|
||||||
|
"blend_alpha": round(alpha, 4),
|
||||||
|
"slam_ok": slam_ok,
|
||||||
|
"rc_link_ok": rc_link_ok,
|
||||||
|
"override_active": next_override,
|
||||||
|
}
|
||||||
|
mode_msg = String()
|
||||||
|
mode_msg.data = json.dumps(payload)
|
||||||
|
self._mode_pub.publish(mode_msg)
|
||||||
|
|
||||||
|
# ── Publish /saltybot/led_pattern ─────────────────────────────────────
|
||||||
|
pattern = led_pattern(next_mode, slam_ok, rc_link_ok)
|
||||||
|
led_msg = String()
|
||||||
|
led_msg.data = pattern
|
||||||
|
self._led_pub.publish(led_msg)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = ModeSwitchNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
5
jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_mode_switch
|
||||||
|
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_mode_switch
|
||||||
33
jetson/ros2_ws/src/saltybot_mode_switch/setup.py
Normal file
33
jetson/ros2_ws/src/saltybot_mode_switch/setup.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
|
package_name = "saltybot_mode_switch"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=find_packages(exclude=["test"]),
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages",
|
||||||
|
[f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(os.path.join("share", package_name, "config"),
|
||||||
|
glob("config/*.yaml")),
|
||||||
|
(os.path.join("share", package_name, "launch"),
|
||||||
|
glob("launch/*.py")),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description="Autonomous/RC mode switch with 500 ms blend ramp (Issue #104)",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
f"mode_switch_node = {package_name}.mode_switch_node:main",
|
||||||
|
f"cmd_vel_mux_node = {package_name}.cmd_vel_mux_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
481
jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py
Normal file
481
jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py
Normal file
@ -0,0 +1,481 @@
|
|||||||
|
"""
|
||||||
|
test_mode_logic.py — Unit tests for saltybot_mode_switch pure logic.
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- ch6_wants_auto / sticks_active helpers
|
||||||
|
- ramp_alpha / alpha_for_mode
|
||||||
|
- led_pattern
|
||||||
|
- step_state state machine (all transitions)
|
||||||
|
- blend_twist
|
||||||
|
|
||||||
|
No ROS2 runtime required.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import math
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_mode_switch.mode_logic import (
|
||||||
|
alpha_for_mode,
|
||||||
|
blend_twist,
|
||||||
|
ch6_wants_auto,
|
||||||
|
led_pattern,
|
||||||
|
ramp_alpha,
|
||||||
|
step_state,
|
||||||
|
sticks_active,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Helpers
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _step(
|
||||||
|
mode="RC",
|
||||||
|
ramp_elapsed=0.0,
|
||||||
|
ramp_s=0.5,
|
||||||
|
ch6_auto=False,
|
||||||
|
sticks_now_active=False,
|
||||||
|
neutral_elapsed=0.0,
|
||||||
|
return_delay=2.0,
|
||||||
|
slam_ok=True,
|
||||||
|
rc_link_ok=True,
|
||||||
|
override_active=False,
|
||||||
|
):
|
||||||
|
return step_state(
|
||||||
|
current_mode=mode,
|
||||||
|
ramp_elapsed_s=ramp_elapsed,
|
||||||
|
ramp_s=ramp_s,
|
||||||
|
ch6_auto=ch6_auto,
|
||||||
|
sticks_now_active=sticks_now_active,
|
||||||
|
sticks_neutral_elapsed_s=neutral_elapsed,
|
||||||
|
return_delay_s=return_delay,
|
||||||
|
slam_ok=slam_ok,
|
||||||
|
rc_link_ok=rc_link_ok,
|
||||||
|
override_active=override_active,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# ch6_wants_auto
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestCh6WantsAuto:
|
||||||
|
|
||||||
|
def test_high_value_returns_true(self):
|
||||||
|
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 1.0], ch6_idx=5) is True
|
||||||
|
|
||||||
|
def test_low_value_returns_false(self):
|
||||||
|
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, -1.0], ch6_idx=5) is False
|
||||||
|
|
||||||
|
def test_mid_value_below_threshold(self):
|
||||||
|
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.4], ch6_idx=5) is False
|
||||||
|
|
||||||
|
def test_exactly_at_threshold(self):
|
||||||
|
# 0.5 > 0.5 is False; require strictly above
|
||||||
|
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.5], ch6_idx=5) is False
|
||||||
|
|
||||||
|
def test_just_above_threshold(self):
|
||||||
|
assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.51], ch6_idx=5) is True
|
||||||
|
|
||||||
|
def test_out_of_range_index(self):
|
||||||
|
assert ch6_wants_auto([1.0], ch6_idx=10) is False
|
||||||
|
|
||||||
|
def test_empty_axes(self):
|
||||||
|
assert ch6_wants_auto([], ch6_idx=0) is False
|
||||||
|
|
||||||
|
def test_custom_threshold(self):
|
||||||
|
assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.2) is True
|
||||||
|
assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.5) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# sticks_active
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSticksActive:
|
||||||
|
|
||||||
|
def test_all_zero_not_active(self):
|
||||||
|
assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 2, 3)) is False
|
||||||
|
|
||||||
|
def test_one_stick_above_threshold(self):
|
||||||
|
assert sticks_active([0.0, 0.15, 0.0, 0.0], (0, 1, 2, 3)) is True
|
||||||
|
|
||||||
|
def test_all_exactly_at_threshold_not_active(self):
|
||||||
|
axes = [0.10, 0.10, 0.10, 0.10]
|
||||||
|
assert sticks_active(axes, (0, 1, 2, 3), threshold=0.10) is False
|
||||||
|
|
||||||
|
def test_just_above_threshold_active(self):
|
||||||
|
assert sticks_active([0.0, 0.0, 0.0, 0.11], (0, 1, 2, 3)) is True
|
||||||
|
|
||||||
|
def test_negative_deflection_detected(self):
|
||||||
|
assert sticks_active([-0.5, 0.0, 0.0, 0.0], (0,)) is True
|
||||||
|
|
||||||
|
def test_out_of_range_index_skipped(self):
|
||||||
|
# index 5 is out of range for a 4-element list — should not crash
|
||||||
|
assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 5)) is False
|
||||||
|
|
||||||
|
def test_only_monitored_axes_checked(self):
|
||||||
|
# axes[4] = 0.9 but only (0,1,2,3) are monitored
|
||||||
|
assert sticks_active([0.0, 0.0, 0.0, 0.0, 0.9], (0, 1, 2, 3)) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# ramp_alpha / alpha_for_mode
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestRampAlpha:
|
||||||
|
|
||||||
|
def test_zero_elapsed_returns_zero(self):
|
||||||
|
assert ramp_alpha(0.0, 0.5) == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_full_elapsed_returns_one(self):
|
||||||
|
assert ramp_alpha(0.5, 0.5) == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_beyond_duration_clamped_to_one(self):
|
||||||
|
assert ramp_alpha(1.0, 0.5) == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_halfway(self):
|
||||||
|
assert ramp_alpha(0.25, 0.5) == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_zero_duration_returns_one(self):
|
||||||
|
assert ramp_alpha(0.0, 0.0) == pytest.approx(1.0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestAlphaForMode:
|
||||||
|
|
||||||
|
def test_rc_returns_zero(self):
|
||||||
|
assert alpha_for_mode("RC", 0.3, 0.5) == 0.0
|
||||||
|
|
||||||
|
def test_auto_returns_one(self):
|
||||||
|
assert alpha_for_mode("AUTO", 0.3, 0.5) == 1.0
|
||||||
|
|
||||||
|
def test_ramp_to_auto_mid(self):
|
||||||
|
assert alpha_for_mode("RAMP_TO_AUTO", 0.25, 0.5) == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_ramp_to_rc_mid(self):
|
||||||
|
assert alpha_for_mode("RAMP_TO_RC", 0.25, 0.5) == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_ramp_to_auto_start(self):
|
||||||
|
assert alpha_for_mode("RAMP_TO_AUTO", 0.0, 0.5) == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_ramp_to_rc_start(self):
|
||||||
|
assert alpha_for_mode("RAMP_TO_RC", 0.0, 0.5) == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_ramp_to_auto_complete(self):
|
||||||
|
assert alpha_for_mode("RAMP_TO_AUTO", 0.5, 0.5) == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_ramp_to_rc_complete(self):
|
||||||
|
assert alpha_for_mode("RAMP_TO_RC", 0.5, 0.5) == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_unknown_mode_returns_zero(self):
|
||||||
|
assert alpha_for_mode("UNKNOWN", 0.3, 0.5) == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# led_pattern
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestLedPattern:
|
||||||
|
|
||||||
|
def test_no_rc_link_overrides_all(self):
|
||||||
|
for mode in ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC"):
|
||||||
|
assert led_pattern(mode, slam_ok=True, rc_link_ok=False) == "blink_red_fast"
|
||||||
|
|
||||||
|
def test_rc_mode_yellow(self):
|
||||||
|
assert led_pattern("RC", slam_ok=True, rc_link_ok=True) == "solid_yellow"
|
||||||
|
|
||||||
|
def test_ramp_to_rc_yellow(self):
|
||||||
|
assert led_pattern("RAMP_TO_RC", slam_ok=True, rc_link_ok=True) == "solid_yellow"
|
||||||
|
|
||||||
|
def test_auto_ok_green(self):
|
||||||
|
assert led_pattern("AUTO", slam_ok=True, rc_link_ok=True) == "solid_green"
|
||||||
|
|
||||||
|
def test_auto_no_slam_orange(self):
|
||||||
|
assert led_pattern("AUTO", slam_ok=False, rc_link_ok=True) == "blink_orange_fast"
|
||||||
|
|
||||||
|
def test_ramp_to_auto_blink_green(self):
|
||||||
|
assert led_pattern("RAMP_TO_AUTO", slam_ok=True, rc_link_ok=True) == "blink_green_slow"
|
||||||
|
|
||||||
|
def test_rc_slam_state_irrelevant(self):
|
||||||
|
p1 = led_pattern("RC", slam_ok=True, rc_link_ok=True)
|
||||||
|
p2 = led_pattern("RC", slam_ok=False, rc_link_ok=True)
|
||||||
|
assert p1 == p2 # SLAM doesn't affect RC LED
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# step_state — RC transitions
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestStepStateFromRC:
|
||||||
|
|
||||||
|
def test_rc_stays_rc_by_default(self):
|
||||||
|
mode, override, instant = _step(mode="RC")
|
||||||
|
assert mode == "RC"
|
||||||
|
assert not instant
|
||||||
|
|
||||||
|
def test_rc_to_ramp_when_ch6_auto(self):
|
||||||
|
mode, override, instant = _step(mode="RC", ch6_auto=True)
|
||||||
|
assert mode == "RAMP_TO_AUTO"
|
||||||
|
assert not instant
|
||||||
|
|
||||||
|
def test_rc_stays_rc_when_ch6_auto_but_no_slam(self):
|
||||||
|
mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False)
|
||||||
|
assert mode == "RC"
|
||||||
|
|
||||||
|
def test_rc_stays_rc_when_ch6_auto_but_no_rc_link(self):
|
||||||
|
mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert instant # RC link lost → instant
|
||||||
|
|
||||||
|
def test_rc_stays_rc_when_override_active_and_sticks_not_neutral(self):
|
||||||
|
mode, _, _ = _step(
|
||||||
|
mode="RC", ch6_auto=True, slam_ok=True,
|
||||||
|
override_active=True, neutral_elapsed=1.0, return_delay=2.0)
|
||||||
|
assert mode == "RC"
|
||||||
|
|
||||||
|
def test_rc_to_ramp_after_override_clears(self):
|
||||||
|
# override clears after sticks neutral >= return_delay
|
||||||
|
mode, override, _ = _step(
|
||||||
|
mode="RC", ch6_auto=True, slam_ok=True,
|
||||||
|
override_active=True, neutral_elapsed=2.5, return_delay=2.0)
|
||||||
|
assert mode == "RAMP_TO_AUTO"
|
||||||
|
assert not override
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# step_state — RAMP_TO_AUTO transitions
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestStepStateFromRampToAuto:
|
||||||
|
|
||||||
|
def test_continues_ramping(self):
|
||||||
|
mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.2, ramp_s=0.5,
|
||||||
|
ch6_auto=True)
|
||||||
|
assert mode == "RAMP_TO_AUTO"
|
||||||
|
|
||||||
|
def test_completes_to_auto(self):
|
||||||
|
mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.5, ramp_s=0.5,
|
||||||
|
ch6_auto=True)
|
||||||
|
assert mode == "AUTO"
|
||||||
|
|
||||||
|
def test_stick_override_instant_rc(self):
|
||||||
|
mode, override, instant = _step(
|
||||||
|
mode="RAMP_TO_AUTO", ch6_auto=True,
|
||||||
|
sticks_now_active=True)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert override is True
|
||||||
|
assert instant is True
|
||||||
|
|
||||||
|
def test_ch6_low_aborts_to_ramp_rc(self):
|
||||||
|
mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=False, ramp_elapsed=0.1)
|
||||||
|
assert mode == "RAMP_TO_RC"
|
||||||
|
|
||||||
|
def test_slam_lost_aborts_to_ramp_rc(self):
|
||||||
|
mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=True, slam_ok=False)
|
||||||
|
assert mode == "RAMP_TO_RC"
|
||||||
|
|
||||||
|
def test_rc_link_lost_instant_rc(self):
|
||||||
|
mode, _, instant = _step(mode="RAMP_TO_AUTO", ch6_auto=True, rc_link_ok=False)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert instant is True
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# step_state — AUTO transitions
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestStepStateFromAuto:
|
||||||
|
|
||||||
|
def test_auto_stays_auto(self):
|
||||||
|
mode, _, _ = _step(mode="AUTO", ch6_auto=True)
|
||||||
|
assert mode == "AUTO"
|
||||||
|
|
||||||
|
def test_stick_override_instant_rc(self):
|
||||||
|
mode, override, instant = _step(
|
||||||
|
mode="AUTO", ch6_auto=True, sticks_now_active=True)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert override is True
|
||||||
|
assert instant is True
|
||||||
|
|
||||||
|
def test_ch6_low_ramp_to_rc(self):
|
||||||
|
mode, _, instant = _step(mode="AUTO", ch6_auto=False)
|
||||||
|
assert mode == "RAMP_TO_RC"
|
||||||
|
assert not instant
|
||||||
|
|
||||||
|
def test_slam_lost_ramp_to_rc(self):
|
||||||
|
mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False)
|
||||||
|
assert mode == "RAMP_TO_RC"
|
||||||
|
assert not instant
|
||||||
|
|
||||||
|
def test_rc_link_lost_instant_rc(self):
|
||||||
|
mode, _, instant = _step(mode="AUTO", ch6_auto=True, rc_link_ok=False)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert instant is True
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# step_state — RAMP_TO_RC transitions
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestStepStateFromRampToRC:
|
||||||
|
|
||||||
|
def test_continues_ramping(self):
|
||||||
|
mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.2, ramp_s=0.5)
|
||||||
|
assert mode == "RAMP_TO_RC"
|
||||||
|
|
||||||
|
def test_completes_to_rc(self):
|
||||||
|
mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.5, ramp_s=0.5)
|
||||||
|
assert mode == "RC"
|
||||||
|
|
||||||
|
def test_stick_override_instant_rc(self):
|
||||||
|
mode, override, instant = _step(
|
||||||
|
mode="RAMP_TO_RC", sticks_now_active=True, ch6_auto=True)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert instant is True
|
||||||
|
|
||||||
|
def test_ch6_still_auto_no_override_cancels_back(self):
|
||||||
|
# CH6 still says AUTO, no override → cancel to RAMP_TO_AUTO
|
||||||
|
mode, _, _ = _step(mode="RAMP_TO_RC", ch6_auto=True, slam_ok=True,
|
||||||
|
override_active=False, ramp_elapsed=0.1)
|
||||||
|
assert mode == "RAMP_TO_AUTO"
|
||||||
|
|
||||||
|
def test_rc_link_lost_instant_rc(self):
|
||||||
|
_, _, instant = _step(mode="RAMP_TO_RC", rc_link_ok=False)
|
||||||
|
assert instant is True
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# blend_twist
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestBlendTwist:
|
||||||
|
|
||||||
|
def test_alpha_zero_zeros_output(self):
|
||||||
|
lx, az = blend_twist(1.0, 0.5, 0.0)
|
||||||
|
assert lx == pytest.approx(0.0)
|
||||||
|
assert az == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_alpha_one_passes_through(self):
|
||||||
|
lx, az = blend_twist(0.4, -0.7, 1.0)
|
||||||
|
assert lx == pytest.approx(0.4)
|
||||||
|
assert az == pytest.approx(-0.7)
|
||||||
|
|
||||||
|
def test_alpha_half_scales(self):
|
||||||
|
lx, az = blend_twist(1.0, 1.0, 0.5)
|
||||||
|
assert lx == pytest.approx(0.5)
|
||||||
|
assert az == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_alpha_above_one_clamped(self):
|
||||||
|
lx, az = blend_twist(1.0, 1.0, 1.5)
|
||||||
|
assert lx == pytest.approx(1.0)
|
||||||
|
assert az == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_alpha_below_zero_clamped(self):
|
||||||
|
lx, az = blend_twist(1.0, 1.0, -0.5)
|
||||||
|
assert lx == pytest.approx(0.0)
|
||||||
|
assert az == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_negative_cmd_vel_preserved(self):
|
||||||
|
lx, az = blend_twist(-0.5, -1.0, 0.5)
|
||||||
|
assert lx == pytest.approx(-0.25)
|
||||||
|
assert az == pytest.approx(-0.5)
|
||||||
|
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Integration scenarios
|
||||||
|
# ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestIntegrationScenarios:
|
||||||
|
|
||||||
|
def test_full_rc_to_auto_ramp_cycle(self):
|
||||||
|
"""Simulate RC→RAMP_TO_AUTO→AUTO with 0.5s ramp at 20Hz."""
|
||||||
|
mode = "RC"
|
||||||
|
override = False
|
||||||
|
ramp_s = 0.5
|
||||||
|
dt = 1.0 / 20.0
|
||||||
|
t_ramp = 0.0
|
||||||
|
|
||||||
|
# CH6 flipped to AUTO, SLAM ok
|
||||||
|
mode, override, instant = step_state(
|
||||||
|
mode, 0.0, ramp_s, True, False, 5.0, 2.0, True, True, override)
|
||||||
|
assert mode == "RAMP_TO_AUTO"
|
||||||
|
t_ramp = 0.0
|
||||||
|
|
||||||
|
# Simulate 10 ticks (0.5s) → should complete
|
||||||
|
for tick in range(10):
|
||||||
|
t_ramp += dt
|
||||||
|
mode, override, instant = step_state(
|
||||||
|
mode, t_ramp, ramp_s, True, False, 5.0, 2.0, True, True, override)
|
||||||
|
|
||||||
|
assert mode == "AUTO"
|
||||||
|
assert alpha_for_mode("AUTO", t_ramp, ramp_s) == 1.0
|
||||||
|
|
||||||
|
def test_stick_override_then_return_to_auto(self):
|
||||||
|
"""Stick override while AUTO → RC → neutral 2s → back to AUTO."""
|
||||||
|
# Start in AUTO
|
||||||
|
mode, override, instant = _step(
|
||||||
|
mode="AUTO", ch6_auto=True, sticks_now_active=True)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert override is True
|
||||||
|
assert instant is True
|
||||||
|
|
||||||
|
# Sticks neutral but < 2s — stays RC
|
||||||
|
mode, override, _ = _step(
|
||||||
|
mode="RC", ch6_auto=True, slam_ok=True,
|
||||||
|
override_active=True, neutral_elapsed=1.0, return_delay=2.0)
|
||||||
|
assert mode == "RC"
|
||||||
|
|
||||||
|
# Sticks neutral >= 2s — override clears, enters ramp
|
||||||
|
mode, override, _ = _step(
|
||||||
|
mode="RC", ch6_auto=True, slam_ok=True,
|
||||||
|
override_active=True, neutral_elapsed=2.0, return_delay=2.0)
|
||||||
|
assert mode == "RAMP_TO_AUTO"
|
||||||
|
assert override is False
|
||||||
|
|
||||||
|
def test_slam_lost_during_auto_graceful_exit(self):
|
||||||
|
"""SLAM fix lost during AUTO → graceful RAMP_TO_RC (not instant)."""
|
||||||
|
mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False)
|
||||||
|
assert mode == "RAMP_TO_RC"
|
||||||
|
assert not instant
|
||||||
|
|
||||||
|
def test_rc_link_lost_always_instant(self):
|
||||||
|
"""RC link lost from any state → instant RC."""
|
||||||
|
for start_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"):
|
||||||
|
mode, _, instant = _step(mode=start_mode, ch6_auto=True, rc_link_ok=False)
|
||||||
|
assert mode == "RC", f"from {start_mode}"
|
||||||
|
assert instant is True, f"from {start_mode}"
|
||||||
|
|
||||||
|
def test_blend_alpha_sequence_through_ramp(self):
|
||||||
|
"""Blend alpha is monotonically increasing during RAMP_TO_AUTO."""
|
||||||
|
ramp_s = 0.5
|
||||||
|
alphas = [
|
||||||
|
alpha_for_mode("RAMP_TO_AUTO", t * 0.05, ramp_s)
|
||||||
|
for t in range(11) # 0.0 to 0.5 s in 0.05 s steps
|
||||||
|
]
|
||||||
|
for a, b in zip(alphas, alphas[1:]):
|
||||||
|
assert b >= a
|
||||||
|
|
||||||
|
def test_ramp_to_rc_alpha_decreasing(self):
|
||||||
|
"""Blend alpha is monotonically decreasing during RAMP_TO_RC."""
|
||||||
|
ramp_s = 0.5
|
||||||
|
alphas = [
|
||||||
|
alpha_for_mode("RAMP_TO_RC", t * 0.05, ramp_s)
|
||||||
|
for t in range(11)
|
||||||
|
]
|
||||||
|
for a, b in zip(alphas, alphas[1:]):
|
||||||
|
assert b <= a
|
||||||
|
|
||||||
|
def test_no_auto_without_slam(self):
|
||||||
|
"""From RC, ch6=AUTO but slam=False → cannot enter AUTO path."""
|
||||||
|
mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False)
|
||||||
|
assert mode == "RC"
|
||||||
|
|
||||||
|
def test_no_auto_without_rc_link(self):
|
||||||
|
"""From RC, ch6=AUTO but rc_link=False → stays RC (instant)."""
|
||||||
|
mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False)
|
||||||
|
assert mode == "RC"
|
||||||
|
assert instant is True
|
||||||
Loading…
x
Reference in New Issue
Block a user