From 9733f5f097df0560b162e4704f711af586af9e6b Mon Sep 17 00:00:00 2001 From: sl-controls Date: Mon, 2 Mar 2026 08:38:49 -0500 Subject: [PATCH] feat(controls): Autonomous/RC mode switch with 500ms blend ramp (Issue #104) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New package: saltybot_mode_switch mode_logic.py (pure, no ROS2 dep — 72/72 tests pass): State machine: RC → RAMP_TO_AUTO → AUTO → RAMP_TO_RC → RC • CH6 (axes[5] > 0.5) requests AUTO; CH6 low → RAMP_TO_RC • Stick >10% in AUTO/RAMP_TO_AUTO/RAMP_TO_RC → instant RC (no ramp) • Sticks neutral ≥ 2 s after override → override cleared → RAMP_TO_AUTO • RC link lost (Joy silent > 0.5 s) → instant RC from any state • SLAM fix lost → RAMP_TO_RC (graceful exit from AUTO) • No AUTO entry without slam_ok AND rc_link_ok blend_alpha: 0.0 (RC) → linear ramp over 500 ms → 1.0 (AUTO) led_pattern: solid_yellow(RC) | blink_green_slow(ramp) | solid_green(AUTO) | blink_orange_fast(slam lost) | blink_red_fast(RC link lost) mode_switch_node.py (ROS2, 20 Hz): Sub: /rc/joy (Joy), /saltybot/balance_state (String), /slam_toolbox/pose_with_covariance_stamped (PoseWithCovarianceStamped) Pub: /saltybot/control_mode (String JSON: mode+blend_alpha+slam_ok+rc_link_ok+override_active) /saltybot/led_pattern (String) cmd_vel_mux_node.py (ROS2, 20 Hz): Sub: /cmd_vel_auto (Twist from Nav2/follower), /saltybot/control_mode Pub: /cmd_vel (Twist to bridge, scaled by blend_alpha) Note: remap Nav2/follower output to /cmd_vel_auto in launch. Tests: 72/72 pass (no ROS2 runtime required). Co-Authored-By: Claude Sonnet 4.6 --- .../src/saltybot_mode_switch/.gitignore | 4 + .../config/mode_switch_params.yaml | 58 +++ .../launch/mode_switch.launch.py | 59 +++ .../src/saltybot_mode_switch/package.xml | 31 ++ .../resource/saltybot_mode_switch | 0 .../saltybot_mode_switch/__init__.py | 0 .../saltybot_mode_switch/cmd_vel_mux_node.py | 133 +++++ .../saltybot_mode_switch/mode_logic.py | 241 +++++++++ .../saltybot_mode_switch/mode_switch_node.py | 307 +++++++++++ .../src/saltybot_mode_switch/setup.cfg | 5 + .../ros2_ws/src/saltybot_mode_switch/setup.py | 33 ++ .../test/test_mode_logic.py | 481 ++++++++++++++++++ 12 files changed, 1352 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/.gitignore create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/config/mode_switch_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/launch/mode_switch.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/resource/saltybot_mode_switch create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/cmd_vel_mux_node.py create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_logic.py create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_switch_node.py create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/.gitignore b/jetson/ros2_ws/src/saltybot_mode_switch/.gitignore new file mode 100644 index 0000000..4e33d94 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.pyc +*.egg-info/ +.pytest_cache/ diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/config/mode_switch_params.yaml b/jetson/ros2_ws/src/saltybot_mode_switch/config/mode_switch_params.yaml new file mode 100644 index 0000000..7ac783d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/config/mode_switch_params.yaml @@ -0,0 +1,58 @@ +# mode_switch_params.yaml — saltybot_mode_switch +# +# Launch with: +# ros2 launch saltybot_mode_switch mode_switch.launch.py +# +# Topic wiring: +# /rc/joy → mode_switch_node (CRSF channels) +# /saltybot/balance_state → mode_switch_node (STM32 state) +# /slam_toolbox/pose_with_covariance_stamped → mode_switch_node (SLAM fix) +# /saltybot/control_mode ← mode_switch_node (JSON mode + alpha) +# /saltybot/led_pattern ← mode_switch_node (LED name) +# +# /cmd_vel_auto → cmd_vel_mux_node (from Nav2/follower) +# /saltybot/control_mode → cmd_vel_mux_node +# /cmd_vel ← cmd_vel_mux_node (to bridge) +# +# IMPORTANT: remap Nav2 and person_follower to publish /cmd_vel_auto: +# ros2 launch saltybot_bringup nav2.launch.py cmd_vel_topic:=/cmd_vel_auto + +# ── CRSF channel mapping ────────────────────────────────────────────────────── +# Joy axes index for the AUX2 / CH6 mode-toggle switch. +# Typical CRSF→Joy mapping: CH1=axes[0] … CH6=axes[5]. +ch6_axis: 5 + +# Space-separated Joy axes indices for the four control sticks (roll/pitch/throttle/yaw). +# These are monitored for the 10% override threshold. +stick_axes: "0 1 2 3" + +# ── Override / return-to-auto ────────────────────────────────────────────────── +# Stick magnitude (0–1) that triggers instant RC override while in AUTO. +# 10% (0.10) per spec — change only if stick noise causes false triggers. +override_threshold: 0.10 + +# Seconds sticks must remain neutral before AUTO re-engages after an override. +return_delay_s: 2.0 + +# ── Ramp ────────────────────────────────────────────────────────────────────── +# RC↔AUTO blend ramp duration (seconds). cmd_vel_mux scales by blend_alpha +# so autonomous commands are eased in/out over this window. +ramp_duration_s: 0.5 + +# ── RC link health ──────────────────────────────────────────────────────────── +# Seconds of Joy silence before RC link is declared lost → force RC mode. +rc_link_timeout_s: 0.5 + +# ── SLAM fix ────────────────────────────────────────────────────────────────── +# ROS2 topic providing the SLAM localisation pose. Any message received within +# slam_fix_timeout_s counts as a valid fix. +slam_fix_topic: "/slam_toolbox/pose_with_covariance_stamped" + +# Seconds after the last SLAM pose before fix is considered lost. +slam_fix_timeout_s: 3.0 + +# ── Control loop ────────────────────────────────────────────────────────────── +control_rate: 20.0 # Hz + +# ── cmd_vel_mux ─────────────────────────────────────────────────────────────── +cmd_vel_timeout_s: 0.5 # deadman: zero output if /cmd_vel_auto goes silent diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/launch/mode_switch.launch.py b/jetson/ros2_ws/src/saltybot_mode_switch/launch/mode_switch.launch.py new file mode 100644 index 0000000..2a72e3c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/launch/mode_switch.launch.py @@ -0,0 +1,59 @@ +"""mode_switch.launch.py — launch mode_switch_node + cmd_vel_mux_node.""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_mode_switch") + default_cfg = os.path.join(pkg_share, "config", "mode_switch_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", + default_value=default_cfg, + description="Path to mode_switch_params.yaml", + ), + DeclareLaunchArgument( + "ramp_duration_s", + default_value="0.5", + description="RC↔AUTO blend ramp duration (seconds)", + ), + DeclareLaunchArgument( + "return_delay_s", + default_value="2.0", + description="Sticks-neutral duration before auto re-engage (seconds)", + ), + DeclareLaunchArgument( + "slam_fix_topic", + default_value="/slam_toolbox/pose_with_covariance_stamped", + description="SLAM fix pose topic", + ), + + Node( + package="saltybot_mode_switch", + executable="mode_switch_node", + name="mode_switch", + output="screen", + parameters=[ + LaunchConfiguration("params_file"), + { + "ramp_duration_s": LaunchConfiguration("ramp_duration_s"), + "return_delay_s": LaunchConfiguration("return_delay_s"), + "slam_fix_topic": LaunchConfiguration("slam_fix_topic"), + }, + ], + ), + + Node( + package="saltybot_mode_switch", + executable="cmd_vel_mux_node", + name="cmd_vel_mux", + output="screen", + parameters=[LaunchConfiguration("params_file")], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/package.xml b/jetson/ros2_ws/src/saltybot_mode_switch/package.xml new file mode 100644 index 0000000..330c0ef --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_mode_switch + 0.1.0 + + Autonomous/RC mode switch for saltybot (Issue #104). + Monitors CRSF CH6 toggle, RC stick override, SLAM fix status, and RC link + health to manage RC↔AUTO transitions with a 500 ms blend ramp. + Publishes /saltybot/control_mode (JSON) and /saltybot/led_pattern. + Includes cmd_vel_mux_node that scales autonomous cmd_vel by blend_alpha. + + sl-controls + MIT + + rclpy + geometry_msgs + sensor_msgs + std_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/resource/saltybot_mode_switch b/jetson/ros2_ws/src/saltybot_mode_switch/resource/saltybot_mode_switch new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/__init__.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/cmd_vel_mux_node.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/cmd_vel_mux_node.py new file mode 100644 index 0000000..3a05c02 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/cmd_vel_mux_node.py @@ -0,0 +1,133 @@ +""" +cmd_vel_mux_node.py — Blend-alpha cmd_vel multiplexer (Issue #104). + +Sits between the Nav2 / person-follower stack and the cmd_vel_bridge_node, +scaling the autonomous cmd_vel output by the blend_alpha published by +mode_switch_node so that the RC → AUTO handoff is smooth over 500 ms. + +Topic graph +----------- + Nav2 / follower → /cmd_vel_auto ──┐ + ├── cmd_vel_mux → /cmd_vel → bridge + mode_switch → /saltybot/control_mode (blend_alpha) + +In RC mode (blend_alpha ≈ 0) the node publishes Twist(0,0) so the bridge +receives zeros — this is harmless because the bridge's mode gate already +prevents autonomous commands when the STM32 is in RC_MANUAL. + +The bridge's existing ESC ramp handles hardware-level smoothing; +the blend_alpha here provides the higher-level cmd_vel policy ramp. + +Parameters +---------- + cmd_vel_timeout_s Deadman: zero output if /cmd_vel_auto goes silent (0.5 s) + control_rate Output publish rate (Hz) (20.0) +""" + +import json +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy + +from geometry_msgs.msg import Twist +from std_msgs.msg import String + +from saltybot_mode_switch.mode_logic import blend_twist + +_BIG = 1e9 + + +class CmdVelMuxNode(Node): + + def __init__(self): + super().__init__("cmd_vel_mux") + + self.declare_parameter("cmd_vel_timeout_s", 0.5) + self.declare_parameter("control_rate", 20.0) + + self._timeout = self.get_parameter("cmd_vel_timeout_s").value + self._rate = self.get_parameter("control_rate").value + + reliable = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + + # ── State ───────────────────────────────────────────────────────────── + self._auto_linear_x: float = 0.0 + self._auto_angular_z: float = 0.0 + self._blend_alpha: float = 0.0 + self._last_auto_t: float = 0.0 + + # ── Subscriptions ───────────────────────────────────────────────────── + self.create_subscription( + Twist, "/cmd_vel_auto", self._auto_cb, reliable) + self.create_subscription( + String, "/saltybot/control_mode", self._mode_cb, reliable) + + # ── Publisher ───────────────────────────────────────────────────────── + self._pub = self.create_publisher(Twist, "/cmd_vel", reliable) + + # ── Timer ───────────────────────────────────────────────────────────── + self._timer = self.create_timer(1.0 / self._rate, self._publish_cb) + + self.get_logger().info( + f"CmdVelMuxNode ready rate={self._rate:.0f}Hz " + f"timeout={self._timeout:.1f}s" + ) + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _auto_cb(self, msg: Twist) -> None: + self._auto_linear_x = msg.linear.x + self._auto_angular_z = msg.angular.z + self._last_auto_t = time.monotonic() + + def _mode_cb(self, msg: String) -> None: + try: + data = json.loads(msg.data) + self._blend_alpha = float(data.get("blend_alpha", 0.0)) + except (json.JSONDecodeError, TypeError, ValueError): + self._blend_alpha = 0.0 + + # ── 20 Hz publish ───────────────────────────────────────────────────────── + + def _publish_cb(self) -> None: + now = time.monotonic() + + # Deadman: zero if autonomous source went silent + auto_age = (now - self._last_auto_t) if self._last_auto_t > 0.0 else _BIG + if auto_age > self._timeout: + linear_x, angular_z = 0.0, 0.0 + else: + linear_x, angular_z = blend_twist( + self._auto_linear_x, + self._auto_angular_z, + self._blend_alpha, + ) + + twist = Twist() + twist.linear.x = linear_x + twist.angular.z = angular_z + self._pub.publish(twist) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = CmdVelMuxNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_logic.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_logic.py new file mode 100644 index 0000000..1a9b2d5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_logic.py @@ -0,0 +1,241 @@ +""" +mode_logic.py — Pure RC/Autonomous mode-switch logic for saltybot. + +All functions are stateless or operate on plain Python types so the full +state machine can be exercised in unit tests without a ROS2 runtime. + +Mode vocabulary +--------------- + "RC" — STM32 executing RC pilot commands; Jetson cmd_vel blocked. + "RAMP_TO_AUTO" — Transitioning RC→AUTO; blend_alpha 0.0→1.0 over ramp_s. + "AUTO" — STM32 executing Jetson cmd_vel; RC sticks idle. + "RAMP_TO_RC" — Transitioning AUTO→RC; blend_alpha 1.0→0.0 over ramp_s. + +Blend alpha +----------- + The node publishes blend_alpha ∈ [0, 1] in /saltybot/control_mode. + The cmd_vel_mux_node scales autonomous cmd_vel by alpha before publishing + to /cmd_vel (the bridge input). alpha=0 ⟹ no autonomous motion. + +RC override +----------- + Any stick magnitude > override_threshold while in AUTO instantly forces RC + (alpha → 0 immediately; no ramp). _override_active is set so the + return-to-auto path requires sticks to be neutral for return_delay seconds. + +CH6 / toggle +------------ + ch6_auto=True means the pilot has requested AUTO (switch up/high). + ch6_auto=False means the pilot has requested RC (switch down/low). + Pure CH6 transitions (no stick activity) always ramp gracefully. + +Safety interlocks +----------------- + • No AUTO without slam_ok (valid SLAM fix received recently). + • Instant drop to RC when rc_link_ok goes False (RC link lost). + These checks are applied at every control tick. +""" + +import math + +# ─── Constants ────────────────────────────────────────────────────────────── + +MODES = ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC") + +LED_PATTERNS = { + "rc_normal": "solid_yellow", + "rc_no_slam": "solid_yellow", # no slam needed in RC + "auto_ok": "solid_green", + "auto_no_slam": "blink_orange_fast", # shouldn't happen (interlock), but defensive + "ramp_to_auto": "blink_green_slow", + "ramp_to_rc": "blink_yellow_slow", + "no_rc_link": "blink_red_fast", +} + +# ─── Stick / CH6 helpers ──────────────────────────────────────────────────── + +def ch6_wants_auto(axes: list, ch6_idx: int, threshold: float = 0.5) -> bool: + """ + Return True when CH6 (aux switch) is in the AUTO position. + + Typical CRSF→Joy mapping: AUX2 = axes[5]. + Switch up (high) = +1.0 → AUTO; switch down (low) = -1.0 → RC. + threshold default 0.5 gives hysteresis against mid-travel noise. + """ + if ch6_idx < 0 or ch6_idx >= len(axes): + return False + return float(axes[ch6_idx]) > threshold + + +def sticks_active( + axes: list, + stick_indices: tuple, + threshold: float = 0.10, +) -> bool: + """ + Return True when any monitored stick axis exceeds threshold magnitude. + + stick_indices: tuple of Joy axes indices for the four sticks + (roll, pitch, throttle, yaw). Default CRSF mapping: + (0, 1, 2, 3). + threshold: 10 % of full deflection (issue spec). + """ + return any( + abs(float(axes[i])) > threshold + for i in stick_indices + if i < len(axes) + ) + + +# ─── Alpha / ramp helpers ─────────────────────────────────────────────────── + +def ramp_alpha(elapsed_s: float, ramp_s: float) -> float: + """ + Compute forward blend alpha [0, 1] linearly over ramp_s seconds. + + elapsed_s : seconds since ramp start. + ramp_s : total ramp duration (seconds). + """ + if ramp_s <= 0.0: + return 1.0 + return min(1.0, elapsed_s / ramp_s) + + +def alpha_for_mode(mode: str, elapsed_s: float, ramp_s: float) -> float: + """ + Return the blend alpha appropriate for `mode`. + + RC → 0.0 + RAMP_TO_AUTO → 0.0 … 1.0 (linear with elapsed) + AUTO → 1.0 + RAMP_TO_RC → 1.0 … 0.0 (linear with elapsed) + """ + if mode == "RC": + return 0.0 + if mode == "AUTO": + return 1.0 + if mode == "RAMP_TO_AUTO": + return ramp_alpha(elapsed_s, ramp_s) + if mode == "RAMP_TO_RC": + return 1.0 - ramp_alpha(elapsed_s, ramp_s) + return 0.0 + + +# ─── LED pattern ──────────────────────────────────────────────────────────── + +def led_pattern(mode: str, slam_ok: bool, rc_link_ok: bool) -> str: + """Return the LED pattern name for the current operating state.""" + if not rc_link_ok: + return LED_PATTERNS["no_rc_link"] + if mode in ("RC", "RAMP_TO_RC"): + return LED_PATTERNS["rc_normal"] + if mode == "RAMP_TO_AUTO": + return LED_PATTERNS["ramp_to_auto"] + if mode == "AUTO": + return LED_PATTERNS["auto_ok"] if slam_ok else LED_PATTERNS["auto_no_slam"] + return LED_PATTERNS["no_rc_link"] + + +# ─── State machine step ────────────────────────────────────────────────────── + +def step_state( + current_mode: str, + ramp_elapsed_s: float, + ramp_s: float, + ch6_auto: bool, + sticks_now_active: bool, + sticks_neutral_elapsed_s: float, + return_delay_s: float, + slam_ok: bool, + rc_link_ok: bool, + override_active: bool, +) -> tuple: + """ + Compute (next_mode, next_override_active, instant_to_rc). + + Parameters + ---------- + current_mode : current mode string + ramp_elapsed_s : seconds since last ramp start + ramp_s : configured ramp duration + ch6_auto : True = pilot switch requests AUTO + sticks_now_active : True = at least one stick > override_threshold + sticks_neutral_elapsed_s: seconds since all sticks went neutral (0 if still active) + return_delay_s : seconds of stick-neutral required to re-engage AUTO + slam_ok : True = SLAM fix is valid + rc_link_ok : True = CRSF link alive + override_active : True = currently in stick-override RC + + Returns + ------- + next_mode : new mode string + next_override_active: updated override flag + instant_to_rc : True when dropping to RC without ramp (safety / stick override) + """ + instant_to_rc = False + + # ── Safety: RC link lost → instant RC ──────────────────────────────────── + if not rc_link_ok: + return "RC", False, True + + # ── Compute override transitions ────────────────────────────────────────── + next_override = override_active + if sticks_now_active and current_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"): + # Stick input while AUTO or any transition → instant RC override + next_override = True + return "RC", True, True + + if not sticks_now_active and sticks_neutral_elapsed_s >= return_delay_s: + # Sticks have been neutral long enough — clear override + next_override = False + + # ── Ramp completion ─────────────────────────────────────────────────────── + ramp_done = ramp_elapsed_s >= ramp_s - 1e-9 # epsilon guards float accumulation + + # ── Transitions per state ───────────────────────────────────────────────── + if current_mode == "RC": + if ch6_auto and slam_ok and not next_override: + return "RAMP_TO_AUTO", next_override, False + return "RC", next_override, False + + if current_mode == "RAMP_TO_AUTO": + if not ch6_auto: + return "RAMP_TO_RC", next_override, False + if not slam_ok: + return "RAMP_TO_RC", next_override, False + if ramp_done: + return "AUTO", next_override, False + return "RAMP_TO_AUTO", next_override, False + + if current_mode == "AUTO": + if not ch6_auto: + return "RAMP_TO_RC", next_override, False + if not slam_ok: + return "RAMP_TO_RC", next_override, False + return "AUTO", next_override, False + + if current_mode == "RAMP_TO_RC": + if ch6_auto and slam_ok and not next_override: + return "RAMP_TO_AUTO", next_override, False + if ramp_done: + return "RC", next_override, False + return "RAMP_TO_RC", next_override, False + + return "RC", next_override, True # unknown state → safe fallback + + +# ─── cmd_vel blending ─────────────────────────────────────────────────────── + +def blend_twist( + auto_linear_x: float, + auto_angular_z: float, + alpha: float, +) -> tuple: + """ + Scale autonomous cmd_vel by blend alpha. + + Returns (linear_x, angular_z) for the blended output. + alpha=0 → zeros; alpha=1 → full autonomous. + """ + a = max(0.0, min(1.0, alpha)) + return auto_linear_x * a, auto_angular_z * a diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_switch_node.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_switch_node.py new file mode 100644 index 0000000..78ed0a1 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_switch_node.py @@ -0,0 +1,307 @@ +""" +mode_switch_node.py — Autonomous/RC mode switch for saltybot (Issue #104). + +Inputs +------ + /rc/joy (sensor_msgs/Joy) + CRSF channel axes: + axes[ch6_axis] AUX2 — mode toggle (+1=AUTO, -1=RC) + axes[stick_axes...] Roll/Pitch/Throttle/Yaw — override detection + + /saltybot/balance_state (std_msgs/String JSON) + Parsed for RC link health (field "rc_link") and STM32 mode. + + (geometry_msgs/PoseWithCovarianceStamped) + Any message received within slam_fix_timeout_s → SLAM fix valid. + +Outputs +------- + /saltybot/control_mode (std_msgs/String JSON) + { + "mode": "RC" | "RAMP_TO_AUTO" | "AUTO" | "RAMP_TO_RC", + "blend_alpha": 0.0 … 1.0, + "slam_ok": bool, + "rc_link_ok": bool, + "override_active": bool + } + + /saltybot/led_pattern (std_msgs/String) + LED pattern name consumed by the LED controller node. + +State machine +------------- + See mode_logic.py for the full transition table. + + RC — pilot in control; blend_alpha = 0.0 + RAMP_TO_AUTO — transitioning; blend_alpha 0→1 over ramp_duration_s (500 ms) + AUTO — Jetson in control; blend_alpha = 1.0 + RAMP_TO_RC — transitioning; blend_alpha 1→0 over ramp_duration_s (500 ms) + +Safety interlocks +----------------- + • Any stick > override_threshold while AUTO/RAMP_TO_AUTO → instant RC. + • rc_link lost (Joy silent > rc_link_timeout_s) → instant RC. + • AUTO not entered without valid SLAM fix. + • SLAM fix lost while AUTO → RAMP_TO_RC. + +Parameters +---------- + ch6_axis Joy axes index for the mode-toggle switch (default 5) + stick_axes Space-separated Joy indices for 4 sticks (default "0 1 2 3") + override_threshold Stick magnitude for instant RC override (default 0.10) + return_delay_s Sticks-neutral duration before re-engage (default 2.0 s) + ramp_duration_s RC↔AUTO blend ramp duration (default 0.5 s) + rc_link_timeout_s Joy silence threshold → RC link lost (default 0.5 s) + slam_fix_topic Pose topic for SLAM fix detection (default /slam_toolbox/pose_with_covariance_stamped) + slam_fix_timeout_s Pose silence threshold → SLAM fix lost (default 3.0 s) + control_rate State machine tick rate (Hz) (default 20.0) +""" + +import json +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy, DurabilityPolicy + +from geometry_msgs.msg import PoseWithCovarianceStamped +from sensor_msgs.msg import Joy +from std_msgs.msg import String + +from saltybot_mode_switch.mode_logic import ( + alpha_for_mode, + ch6_wants_auto, + led_pattern, + step_state, + sticks_active, +) + +_BIG = 1e9 # sentinel: "never received" + + +class ModeSwitchNode(Node): + + def __init__(self): + super().__init__("mode_switch") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("ch6_axis", 5) + self.declare_parameter("stick_axes", "0 1 2 3") + self.declare_parameter("override_threshold", 0.10) + self.declare_parameter("return_delay_s", 2.0) + self.declare_parameter("ramp_duration_s", 0.5) + self.declare_parameter("rc_link_timeout_s", 0.5) + self.declare_parameter("slam_fix_topic", + "/slam_toolbox/pose_with_covariance_stamped") + self.declare_parameter("slam_fix_timeout_s", 3.0) + self.declare_parameter("control_rate", 20.0) + + self._reload_params() + + # ── State ───────────────────────────────────────────────────────────── + self._mode: str = "RC" + self._override_active: bool = False + self._ramp_start: float = 0.0 # monotonic time ramp began + self._sticks_neutral_since: float | None = None + self._last_joy_t: float = 0.0 # monotonic; 0 = never + self._last_slam_t: float = 0.0 + self._joy_axes: list = [] + self._stm32_mode: int = 0 # from balance_state JSON + + # ── QoS ─────────────────────────────────────────────────────────────── + best_effort = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=5, + ) + reliable = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + # SLAM toolbox uses transient_local for its pose topic + transient = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + durability=DurabilityPolicy.TRANSIENT_LOCAL, + depth=1, + ) + + # ── Subscriptions ───────────────────────────────────────────────────── + self.create_subscription(Joy, "/rc/joy", self._joy_cb, best_effort) + self.create_subscription( + String, "/saltybot/balance_state", self._balance_cb, reliable) + + slam_topic = self.get_parameter("slam_fix_topic").value + # Try transient_local first (SLAM Toolbox); fall back gracefully if not + try: + self.create_subscription( + PoseWithCovarianceStamped, slam_topic, + self._slam_cb, transient) + except Exception: + self.create_subscription( + PoseWithCovarianceStamped, slam_topic, + self._slam_cb, reliable) + + # ── Publishers ──────────────────────────────────────────────────────── + self._mode_pub = self.create_publisher( + String, "/saltybot/control_mode", reliable) + self._led_pub = self.create_publisher( + String, "/saltybot/led_pattern", reliable) + + # ── Control timer ───────────────────────────────────────────────────── + self._timer = self.create_timer( + 1.0 / self._p["control_rate"], self._tick) + + self.get_logger().info( + f"ModeSwitchNode ready " + f"ramp={self._p['ramp_duration_s']:.1f}s " + f"override_thresh={self._p['override_threshold']:.0%} " + f"return_delay={self._p['return_delay_s']:.1f}s " + f"slam_topic={slam_topic}" + ) + + # ── Parameter reload ────────────────────────────────────────────────────── + + def _reload_params(self): + raw_stick_axes = self.get_parameter("stick_axes").value + self._p = { + "ch6_axis": int(self.get_parameter("ch6_axis").value), + "stick_indices": tuple(int(x) for x in raw_stick_axes.split()), + "override_threshold": float(self.get_parameter("override_threshold").value), + "return_delay_s": float(self.get_parameter("return_delay_s").value), + "ramp_duration_s": float(self.get_parameter("ramp_duration_s").value), + "rc_link_timeout_s": float(self.get_parameter("rc_link_timeout_s").value), + "slam_fix_timeout_s": float(self.get_parameter("slam_fix_timeout_s").value), + "control_rate": float(self.get_parameter("control_rate").value), + } + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _joy_cb(self, msg: Joy) -> None: + self._joy_axes = list(msg.axes) + self._last_joy_t = time.monotonic() + + def _balance_cb(self, msg: String) -> None: + try: + data = json.loads(msg.data) + # "mode" is a label string; map back to int for reference + mode_label = data.get("mode", "RC_MANUAL") + self._stm32_mode = {"RC_MANUAL": 0, "RC_ASSISTED": 1, + "AUTONOMOUS": 2}.get(mode_label, 0) + except (json.JSONDecodeError, TypeError): + pass + + def _slam_cb(self, msg: PoseWithCovarianceStamped) -> None: + self._last_slam_t = time.monotonic() + + # ── 20 Hz control tick ──────────────────────────────────────────────────── + + def _tick(self) -> None: + self._reload_params() + now = time.monotonic() + p = self._p + + # ── Derive inputs ───────────────────────────────────────────────────── + joy_age = (now - self._last_joy_t) if self._last_joy_t > 0.0 else _BIG + slam_age = (now - self._last_slam_t) if self._last_slam_t > 0.0 else _BIG + rc_link_ok = joy_age <= p["rc_link_timeout_s"] + slam_ok = slam_age <= p["slam_fix_timeout_s"] + + axes = self._joy_axes + ch6 = ch6_wants_auto(axes, p["ch6_axis"]) + active = sticks_active(axes, p["stick_indices"], p["override_threshold"]) + + # Track neutral timer + if active: + self._sticks_neutral_since = None + elif self._sticks_neutral_since is None: + self._sticks_neutral_since = now + + neutral_elapsed = ( + (now - self._sticks_neutral_since) + if self._sticks_neutral_since is not None else 0.0 + ) + + ramp_elapsed = now - self._ramp_start + + # ── State machine step ──────────────────────────────────────────────── + prev_mode = self._mode + next_mode, next_override, instant = step_state( + current_mode=self._mode, + ramp_elapsed_s=ramp_elapsed, + ramp_s=p["ramp_duration_s"], + ch6_auto=ch6, + sticks_now_active=active, + sticks_neutral_elapsed_s=neutral_elapsed, + return_delay_s=p["return_delay_s"], + slam_ok=slam_ok, + rc_link_ok=rc_link_ok, + override_active=self._override_active, + ) + + if next_mode != prev_mode: + # Log transitions; reset ramp timer on ramp entry + self.get_logger().info( + f"Mode: {prev_mode} → {next_mode}" + f" slam_ok={slam_ok} rc_link={rc_link_ok}" + f" override={next_override}" + ) + if next_mode in ("RAMP_TO_AUTO", "RAMP_TO_RC"): + self._ramp_start = now + ramp_elapsed = 0.0 + + self._mode = next_mode + self._override_active = next_override + + # ── Compute blend alpha ─────────────────────────────────────────────── + current_ramp_elapsed = now - self._ramp_start + alpha = alpha_for_mode(next_mode, current_ramp_elapsed, p["ramp_duration_s"]) + if instant: + alpha = 0.0 + + # ── Log steady warnings ─────────────────────────────────────────────── + if next_mode == "AUTO" and not slam_ok: + self.get_logger().warn( + "In AUTO but SLAM fix lost — will exit AUTO", + throttle_duration_sec=2.0) + if not rc_link_ok: + self.get_logger().warn( + "RC link lost — forced RC mode", + throttle_duration_sec=1.0) + + # ── Publish /saltybot/control_mode ──────────────────────────────────── + payload = { + "mode": next_mode, + "blend_alpha": round(alpha, 4), + "slam_ok": slam_ok, + "rc_link_ok": rc_link_ok, + "override_active": next_override, + } + mode_msg = String() + mode_msg.data = json.dumps(payload) + self._mode_pub.publish(mode_msg) + + # ── Publish /saltybot/led_pattern ───────────────────────────────────── + pattern = led_pattern(next_mode, slam_ok, rc_link_ok) + led_msg = String() + led_msg.data = pattern + self._led_pub.publish(led_msg) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = ModeSwitchNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg b/jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg new file mode 100644 index 0000000..27fd664 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_mode_switch + +[install] +install_scripts=$base/lib/saltybot_mode_switch diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/setup.py b/jetson/ros2_ws/src/saltybot_mode_switch/setup.py new file mode 100644 index 0000000..8b47063 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/setup.py @@ -0,0 +1,33 @@ +from setuptools import setup, find_packages +import os +from glob import glob + +package_name = "saltybot_mode_switch" + +setup( + name=package_name, + version="0.1.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", + [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (os.path.join("share", package_name, "config"), + glob("config/*.yaml")), + (os.path.join("share", package_name, "launch"), + glob("launch/*.py")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="Autonomous/RC mode switch with 500 ms blend ramp (Issue #104)", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + f"mode_switch_node = {package_name}.mode_switch_node:main", + f"cmd_vel_mux_node = {package_name}.cmd_vel_mux_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py b/jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py new file mode 100644 index 0000000..30f1056 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py @@ -0,0 +1,481 @@ +""" +test_mode_logic.py — Unit tests for saltybot_mode_switch pure logic. + +Covers: + - ch6_wants_auto / sticks_active helpers + - ramp_alpha / alpha_for_mode + - led_pattern + - step_state state machine (all transitions) + - blend_twist + +No ROS2 runtime required. +""" + +import sys +import os +import math + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import pytest + +from saltybot_mode_switch.mode_logic import ( + alpha_for_mode, + blend_twist, + ch6_wants_auto, + led_pattern, + ramp_alpha, + step_state, + sticks_active, +) + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _step( + mode="RC", + ramp_elapsed=0.0, + ramp_s=0.5, + ch6_auto=False, + sticks_now_active=False, + neutral_elapsed=0.0, + return_delay=2.0, + slam_ok=True, + rc_link_ok=True, + override_active=False, +): + return step_state( + current_mode=mode, + ramp_elapsed_s=ramp_elapsed, + ramp_s=ramp_s, + ch6_auto=ch6_auto, + sticks_now_active=sticks_now_active, + sticks_neutral_elapsed_s=neutral_elapsed, + return_delay_s=return_delay, + slam_ok=slam_ok, + rc_link_ok=rc_link_ok, + override_active=override_active, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# ch6_wants_auto +# ───────────────────────────────────────────────────────────────────────────── + +class TestCh6WantsAuto: + + def test_high_value_returns_true(self): + assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 1.0], ch6_idx=5) is True + + def test_low_value_returns_false(self): + assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, -1.0], ch6_idx=5) is False + + def test_mid_value_below_threshold(self): + assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.4], ch6_idx=5) is False + + def test_exactly_at_threshold(self): + # 0.5 > 0.5 is False; require strictly above + assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.5], ch6_idx=5) is False + + def test_just_above_threshold(self): + assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.51], ch6_idx=5) is True + + def test_out_of_range_index(self): + assert ch6_wants_auto([1.0], ch6_idx=10) is False + + def test_empty_axes(self): + assert ch6_wants_auto([], ch6_idx=0) is False + + def test_custom_threshold(self): + assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.2) is True + assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.5) is False + + +# ───────────────────────────────────────────────────────────────────────────── +# sticks_active +# ───────────────────────────────────────────────────────────────────────────── + +class TestSticksActive: + + def test_all_zero_not_active(self): + assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 2, 3)) is False + + def test_one_stick_above_threshold(self): + assert sticks_active([0.0, 0.15, 0.0, 0.0], (0, 1, 2, 3)) is True + + def test_all_exactly_at_threshold_not_active(self): + axes = [0.10, 0.10, 0.10, 0.10] + assert sticks_active(axes, (0, 1, 2, 3), threshold=0.10) is False + + def test_just_above_threshold_active(self): + assert sticks_active([0.0, 0.0, 0.0, 0.11], (0, 1, 2, 3)) is True + + def test_negative_deflection_detected(self): + assert sticks_active([-0.5, 0.0, 0.0, 0.0], (0,)) is True + + def test_out_of_range_index_skipped(self): + # index 5 is out of range for a 4-element list — should not crash + assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 5)) is False + + def test_only_monitored_axes_checked(self): + # axes[4] = 0.9 but only (0,1,2,3) are monitored + assert sticks_active([0.0, 0.0, 0.0, 0.0, 0.9], (0, 1, 2, 3)) is False + + +# ───────────────────────────────────────────────────────────────────────────── +# ramp_alpha / alpha_for_mode +# ───────────────────────────────────────────────────────────────────────────── + +class TestRampAlpha: + + def test_zero_elapsed_returns_zero(self): + assert ramp_alpha(0.0, 0.5) == pytest.approx(0.0) + + def test_full_elapsed_returns_one(self): + assert ramp_alpha(0.5, 0.5) == pytest.approx(1.0) + + def test_beyond_duration_clamped_to_one(self): + assert ramp_alpha(1.0, 0.5) == pytest.approx(1.0) + + def test_halfway(self): + assert ramp_alpha(0.25, 0.5) == pytest.approx(0.5) + + def test_zero_duration_returns_one(self): + assert ramp_alpha(0.0, 0.0) == pytest.approx(1.0) + + +class TestAlphaForMode: + + def test_rc_returns_zero(self): + assert alpha_for_mode("RC", 0.3, 0.5) == 0.0 + + def test_auto_returns_one(self): + assert alpha_for_mode("AUTO", 0.3, 0.5) == 1.0 + + def test_ramp_to_auto_mid(self): + assert alpha_for_mode("RAMP_TO_AUTO", 0.25, 0.5) == pytest.approx(0.5) + + def test_ramp_to_rc_mid(self): + assert alpha_for_mode("RAMP_TO_RC", 0.25, 0.5) == pytest.approx(0.5) + + def test_ramp_to_auto_start(self): + assert alpha_for_mode("RAMP_TO_AUTO", 0.0, 0.5) == pytest.approx(0.0) + + def test_ramp_to_rc_start(self): + assert alpha_for_mode("RAMP_TO_RC", 0.0, 0.5) == pytest.approx(1.0) + + def test_ramp_to_auto_complete(self): + assert alpha_for_mode("RAMP_TO_AUTO", 0.5, 0.5) == pytest.approx(1.0) + + def test_ramp_to_rc_complete(self): + assert alpha_for_mode("RAMP_TO_RC", 0.5, 0.5) == pytest.approx(0.0) + + def test_unknown_mode_returns_zero(self): + assert alpha_for_mode("UNKNOWN", 0.3, 0.5) == 0.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# led_pattern +# ───────────────────────────────────────────────────────────────────────────── + +class TestLedPattern: + + def test_no_rc_link_overrides_all(self): + for mode in ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC"): + assert led_pattern(mode, slam_ok=True, rc_link_ok=False) == "blink_red_fast" + + def test_rc_mode_yellow(self): + assert led_pattern("RC", slam_ok=True, rc_link_ok=True) == "solid_yellow" + + def test_ramp_to_rc_yellow(self): + assert led_pattern("RAMP_TO_RC", slam_ok=True, rc_link_ok=True) == "solid_yellow" + + def test_auto_ok_green(self): + assert led_pattern("AUTO", slam_ok=True, rc_link_ok=True) == "solid_green" + + def test_auto_no_slam_orange(self): + assert led_pattern("AUTO", slam_ok=False, rc_link_ok=True) == "blink_orange_fast" + + def test_ramp_to_auto_blink_green(self): + assert led_pattern("RAMP_TO_AUTO", slam_ok=True, rc_link_ok=True) == "blink_green_slow" + + def test_rc_slam_state_irrelevant(self): + p1 = led_pattern("RC", slam_ok=True, rc_link_ok=True) + p2 = led_pattern("RC", slam_ok=False, rc_link_ok=True) + assert p1 == p2 # SLAM doesn't affect RC LED + + +# ───────────────────────────────────────────────────────────────────────────── +# step_state — RC transitions +# ───────────────────────────────────────────────────────────────────────────── + +class TestStepStateFromRC: + + def test_rc_stays_rc_by_default(self): + mode, override, instant = _step(mode="RC") + assert mode == "RC" + assert not instant + + def test_rc_to_ramp_when_ch6_auto(self): + mode, override, instant = _step(mode="RC", ch6_auto=True) + assert mode == "RAMP_TO_AUTO" + assert not instant + + def test_rc_stays_rc_when_ch6_auto_but_no_slam(self): + mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False) + assert mode == "RC" + + def test_rc_stays_rc_when_ch6_auto_but_no_rc_link(self): + mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False) + assert mode == "RC" + assert instant # RC link lost → instant + + def test_rc_stays_rc_when_override_active_and_sticks_not_neutral(self): + mode, _, _ = _step( + mode="RC", ch6_auto=True, slam_ok=True, + override_active=True, neutral_elapsed=1.0, return_delay=2.0) + assert mode == "RC" + + def test_rc_to_ramp_after_override_clears(self): + # override clears after sticks neutral >= return_delay + mode, override, _ = _step( + mode="RC", ch6_auto=True, slam_ok=True, + override_active=True, neutral_elapsed=2.5, return_delay=2.0) + assert mode == "RAMP_TO_AUTO" + assert not override + + +# ───────────────────────────────────────────────────────────────────────────── +# step_state — RAMP_TO_AUTO transitions +# ───────────────────────────────────────────────────────────────────────────── + +class TestStepStateFromRampToAuto: + + def test_continues_ramping(self): + mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.2, ramp_s=0.5, + ch6_auto=True) + assert mode == "RAMP_TO_AUTO" + + def test_completes_to_auto(self): + mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.5, ramp_s=0.5, + ch6_auto=True) + assert mode == "AUTO" + + def test_stick_override_instant_rc(self): + mode, override, instant = _step( + mode="RAMP_TO_AUTO", ch6_auto=True, + sticks_now_active=True) + assert mode == "RC" + assert override is True + assert instant is True + + def test_ch6_low_aborts_to_ramp_rc(self): + mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=False, ramp_elapsed=0.1) + assert mode == "RAMP_TO_RC" + + def test_slam_lost_aborts_to_ramp_rc(self): + mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=True, slam_ok=False) + assert mode == "RAMP_TO_RC" + + def test_rc_link_lost_instant_rc(self): + mode, _, instant = _step(mode="RAMP_TO_AUTO", ch6_auto=True, rc_link_ok=False) + assert mode == "RC" + assert instant is True + + +# ───────────────────────────────────────────────────────────────────────────── +# step_state — AUTO transitions +# ───────────────────────────────────────────────────────────────────────────── + +class TestStepStateFromAuto: + + def test_auto_stays_auto(self): + mode, _, _ = _step(mode="AUTO", ch6_auto=True) + assert mode == "AUTO" + + def test_stick_override_instant_rc(self): + mode, override, instant = _step( + mode="AUTO", ch6_auto=True, sticks_now_active=True) + assert mode == "RC" + assert override is True + assert instant is True + + def test_ch6_low_ramp_to_rc(self): + mode, _, instant = _step(mode="AUTO", ch6_auto=False) + assert mode == "RAMP_TO_RC" + assert not instant + + def test_slam_lost_ramp_to_rc(self): + mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False) + assert mode == "RAMP_TO_RC" + assert not instant + + def test_rc_link_lost_instant_rc(self): + mode, _, instant = _step(mode="AUTO", ch6_auto=True, rc_link_ok=False) + assert mode == "RC" + assert instant is True + + +# ───────────────────────────────────────────────────────────────────────────── +# step_state — RAMP_TO_RC transitions +# ───────────────────────────────────────────────────────────────────────────── + +class TestStepStateFromRampToRC: + + def test_continues_ramping(self): + mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.2, ramp_s=0.5) + assert mode == "RAMP_TO_RC" + + def test_completes_to_rc(self): + mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.5, ramp_s=0.5) + assert mode == "RC" + + def test_stick_override_instant_rc(self): + mode, override, instant = _step( + mode="RAMP_TO_RC", sticks_now_active=True, ch6_auto=True) + assert mode == "RC" + assert instant is True + + def test_ch6_still_auto_no_override_cancels_back(self): + # CH6 still says AUTO, no override → cancel to RAMP_TO_AUTO + mode, _, _ = _step(mode="RAMP_TO_RC", ch6_auto=True, slam_ok=True, + override_active=False, ramp_elapsed=0.1) + assert mode == "RAMP_TO_AUTO" + + def test_rc_link_lost_instant_rc(self): + _, _, instant = _step(mode="RAMP_TO_RC", rc_link_ok=False) + assert instant is True + + +# ───────────────────────────────────────────────────────────────────────────── +# blend_twist +# ───────────────────────────────────────────────────────────────────────────── + +class TestBlendTwist: + + def test_alpha_zero_zeros_output(self): + lx, az = blend_twist(1.0, 0.5, 0.0) + assert lx == pytest.approx(0.0) + assert az == pytest.approx(0.0) + + def test_alpha_one_passes_through(self): + lx, az = blend_twist(0.4, -0.7, 1.0) + assert lx == pytest.approx(0.4) + assert az == pytest.approx(-0.7) + + def test_alpha_half_scales(self): + lx, az = blend_twist(1.0, 1.0, 0.5) + assert lx == pytest.approx(0.5) + assert az == pytest.approx(0.5) + + def test_alpha_above_one_clamped(self): + lx, az = blend_twist(1.0, 1.0, 1.5) + assert lx == pytest.approx(1.0) + assert az == pytest.approx(1.0) + + def test_alpha_below_zero_clamped(self): + lx, az = blend_twist(1.0, 1.0, -0.5) + assert lx == pytest.approx(0.0) + assert az == pytest.approx(0.0) + + def test_negative_cmd_vel_preserved(self): + lx, az = blend_twist(-0.5, -1.0, 0.5) + assert lx == pytest.approx(-0.25) + assert az == pytest.approx(-0.5) + + +# ───────────────────────────────────────────────────────────────────────────── +# Integration scenarios +# ───────────────────────────────────────────────────────────────────────────── + +class TestIntegrationScenarios: + + def test_full_rc_to_auto_ramp_cycle(self): + """Simulate RC→RAMP_TO_AUTO→AUTO with 0.5s ramp at 20Hz.""" + mode = "RC" + override = False + ramp_s = 0.5 + dt = 1.0 / 20.0 + t_ramp = 0.0 + + # CH6 flipped to AUTO, SLAM ok + mode, override, instant = step_state( + mode, 0.0, ramp_s, True, False, 5.0, 2.0, True, True, override) + assert mode == "RAMP_TO_AUTO" + t_ramp = 0.0 + + # Simulate 10 ticks (0.5s) → should complete + for tick in range(10): + t_ramp += dt + mode, override, instant = step_state( + mode, t_ramp, ramp_s, True, False, 5.0, 2.0, True, True, override) + + assert mode == "AUTO" + assert alpha_for_mode("AUTO", t_ramp, ramp_s) == 1.0 + + def test_stick_override_then_return_to_auto(self): + """Stick override while AUTO → RC → neutral 2s → back to AUTO.""" + # Start in AUTO + mode, override, instant = _step( + mode="AUTO", ch6_auto=True, sticks_now_active=True) + assert mode == "RC" + assert override is True + assert instant is True + + # Sticks neutral but < 2s — stays RC + mode, override, _ = _step( + mode="RC", ch6_auto=True, slam_ok=True, + override_active=True, neutral_elapsed=1.0, return_delay=2.0) + assert mode == "RC" + + # Sticks neutral >= 2s — override clears, enters ramp + mode, override, _ = _step( + mode="RC", ch6_auto=True, slam_ok=True, + override_active=True, neutral_elapsed=2.0, return_delay=2.0) + assert mode == "RAMP_TO_AUTO" + assert override is False + + def test_slam_lost_during_auto_graceful_exit(self): + """SLAM fix lost during AUTO → graceful RAMP_TO_RC (not instant).""" + mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False) + assert mode == "RAMP_TO_RC" + assert not instant + + def test_rc_link_lost_always_instant(self): + """RC link lost from any state → instant RC.""" + for start_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"): + mode, _, instant = _step(mode=start_mode, ch6_auto=True, rc_link_ok=False) + assert mode == "RC", f"from {start_mode}" + assert instant is True, f"from {start_mode}" + + def test_blend_alpha_sequence_through_ramp(self): + """Blend alpha is monotonically increasing during RAMP_TO_AUTO.""" + ramp_s = 0.5 + alphas = [ + alpha_for_mode("RAMP_TO_AUTO", t * 0.05, ramp_s) + for t in range(11) # 0.0 to 0.5 s in 0.05 s steps + ] + for a, b in zip(alphas, alphas[1:]): + assert b >= a + + def test_ramp_to_rc_alpha_decreasing(self): + """Blend alpha is monotonically decreasing during RAMP_TO_RC.""" + ramp_s = 0.5 + alphas = [ + alpha_for_mode("RAMP_TO_RC", t * 0.05, ramp_s) + for t in range(11) + ] + for a, b in zip(alphas, alphas[1:]): + assert b <= a + + def test_no_auto_without_slam(self): + """From RC, ch6=AUTO but slam=False → cannot enter AUTO path.""" + mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False) + assert mode == "RC" + + def test_no_auto_without_rc_link(self): + """From RC, ch6=AUTO but rc_link=False → stays RC (instant).""" + mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False) + assert mode == "RC" + assert instant is True -- 2.47.2