diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/.gitignore b/jetson/ros2_ws/src/saltybot_mode_switch/.gitignore
new file mode 100644
index 0000000..4e33d94
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/.gitignore
@@ -0,0 +1,4 @@
+__pycache__/
+*.pyc
+*.egg-info/
+.pytest_cache/
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/config/mode_switch_params.yaml b/jetson/ros2_ws/src/saltybot_mode_switch/config/mode_switch_params.yaml
new file mode 100644
index 0000000..7ac783d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/config/mode_switch_params.yaml
@@ -0,0 +1,58 @@
+# mode_switch_params.yaml — saltybot_mode_switch
+#
+# Launch with:
+# ros2 launch saltybot_mode_switch mode_switch.launch.py
+#
+# Topic wiring:
+# /rc/joy → mode_switch_node (CRSF channels)
+# /saltybot/balance_state → mode_switch_node (STM32 state)
+# /slam_toolbox/pose_with_covariance_stamped → mode_switch_node (SLAM fix)
+# /saltybot/control_mode ← mode_switch_node (JSON mode + alpha)
+# /saltybot/led_pattern ← mode_switch_node (LED name)
+#
+# /cmd_vel_auto → cmd_vel_mux_node (from Nav2/follower)
+# /saltybot/control_mode → cmd_vel_mux_node
+# /cmd_vel ← cmd_vel_mux_node (to bridge)
+#
+# IMPORTANT: remap Nav2 and person_follower to publish /cmd_vel_auto:
+# ros2 launch saltybot_bringup nav2.launch.py cmd_vel_topic:=/cmd_vel_auto
+
+# ── CRSF channel mapping ──────────────────────────────────────────────────────
+# Joy axes index for the AUX2 / CH6 mode-toggle switch.
+# Typical CRSF→Joy mapping: CH1=axes[0] … CH6=axes[5].
+ch6_axis: 5
+
+# Space-separated Joy axes indices for the four control sticks (roll/pitch/throttle/yaw).
+# These are monitored for the 10% override threshold.
+stick_axes: "0 1 2 3"
+
+# ── Override / return-to-auto ──────────────────────────────────────────────────
+# Stick magnitude (0–1) that triggers instant RC override while in AUTO.
+# 10% (0.10) per spec — change only if stick noise causes false triggers.
+override_threshold: 0.10
+
+# Seconds sticks must remain neutral before AUTO re-engages after an override.
+return_delay_s: 2.0
+
+# ── Ramp ──────────────────────────────────────────────────────────────────────
+# RC↔AUTO blend ramp duration (seconds). cmd_vel_mux scales by blend_alpha
+# so autonomous commands are eased in/out over this window.
+ramp_duration_s: 0.5
+
+# ── RC link health ────────────────────────────────────────────────────────────
+# Seconds of Joy silence before RC link is declared lost → force RC mode.
+rc_link_timeout_s: 0.5
+
+# ── SLAM fix ──────────────────────────────────────────────────────────────────
+# ROS2 topic providing the SLAM localisation pose. Any message received within
+# slam_fix_timeout_s counts as a valid fix.
+slam_fix_topic: "/slam_toolbox/pose_with_covariance_stamped"
+
+# Seconds after the last SLAM pose before fix is considered lost.
+slam_fix_timeout_s: 3.0
+
+# ── Control loop ──────────────────────────────────────────────────────────────
+control_rate: 20.0 # Hz
+
+# ── cmd_vel_mux ───────────────────────────────────────────────────────────────
+cmd_vel_timeout_s: 0.5 # deadman: zero output if /cmd_vel_auto goes silent
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/launch/mode_switch.launch.py b/jetson/ros2_ws/src/saltybot_mode_switch/launch/mode_switch.launch.py
new file mode 100644
index 0000000..2a72e3c
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/launch/mode_switch.launch.py
@@ -0,0 +1,59 @@
+"""mode_switch.launch.py — launch mode_switch_node + cmd_vel_mux_node."""
+
+import os
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg_share = get_package_share_directory("saltybot_mode_switch")
+ default_cfg = os.path.join(pkg_share, "config", "mode_switch_params.yaml")
+
+ return LaunchDescription([
+ DeclareLaunchArgument(
+ "params_file",
+ default_value=default_cfg,
+ description="Path to mode_switch_params.yaml",
+ ),
+ DeclareLaunchArgument(
+ "ramp_duration_s",
+ default_value="0.5",
+ description="RC↔AUTO blend ramp duration (seconds)",
+ ),
+ DeclareLaunchArgument(
+ "return_delay_s",
+ default_value="2.0",
+ description="Sticks-neutral duration before auto re-engage (seconds)",
+ ),
+ DeclareLaunchArgument(
+ "slam_fix_topic",
+ default_value="/slam_toolbox/pose_with_covariance_stamped",
+ description="SLAM fix pose topic",
+ ),
+
+ Node(
+ package="saltybot_mode_switch",
+ executable="mode_switch_node",
+ name="mode_switch",
+ output="screen",
+ parameters=[
+ LaunchConfiguration("params_file"),
+ {
+ "ramp_duration_s": LaunchConfiguration("ramp_duration_s"),
+ "return_delay_s": LaunchConfiguration("return_delay_s"),
+ "slam_fix_topic": LaunchConfiguration("slam_fix_topic"),
+ },
+ ],
+ ),
+
+ Node(
+ package="saltybot_mode_switch",
+ executable="cmd_vel_mux_node",
+ name="cmd_vel_mux",
+ output="screen",
+ parameters=[LaunchConfiguration("params_file")],
+ ),
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/package.xml b/jetson/ros2_ws/src/saltybot_mode_switch/package.xml
new file mode 100644
index 0000000..330c0ef
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/package.xml
@@ -0,0 +1,31 @@
+
+
+
+ saltybot_mode_switch
+ 0.1.0
+
+ Autonomous/RC mode switch for saltybot (Issue #104).
+ Monitors CRSF CH6 toggle, RC stick override, SLAM fix status, and RC link
+ health to manage RC↔AUTO transitions with a 500 ms blend ramp.
+ Publishes /saltybot/control_mode (JSON) and /saltybot/led_pattern.
+ Includes cmd_vel_mux_node that scales autonomous cmd_vel by blend_alpha.
+
+ sl-controls
+ MIT
+
+ rclpy
+ geometry_msgs
+ sensor_msgs
+ std_msgs
+
+ ament_python
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/resource/saltybot_mode_switch b/jetson/ros2_ws/src/saltybot_mode_switch/resource/saltybot_mode_switch
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/__init__.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/cmd_vel_mux_node.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/cmd_vel_mux_node.py
new file mode 100644
index 0000000..3a05c02
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/cmd_vel_mux_node.py
@@ -0,0 +1,133 @@
+"""
+cmd_vel_mux_node.py — Blend-alpha cmd_vel multiplexer (Issue #104).
+
+Sits between the Nav2 / person-follower stack and the cmd_vel_bridge_node,
+scaling the autonomous cmd_vel output by the blend_alpha published by
+mode_switch_node so that the RC → AUTO handoff is smooth over 500 ms.
+
+Topic graph
+-----------
+ Nav2 / follower → /cmd_vel_auto ──┐
+ ├── cmd_vel_mux → /cmd_vel → bridge
+ mode_switch → /saltybot/control_mode (blend_alpha)
+
+In RC mode (blend_alpha ≈ 0) the node publishes Twist(0,0) so the bridge
+receives zeros — this is harmless because the bridge's mode gate already
+prevents autonomous commands when the STM32 is in RC_MANUAL.
+
+The bridge's existing ESC ramp handles hardware-level smoothing;
+the blend_alpha here provides the higher-level cmd_vel policy ramp.
+
+Parameters
+----------
+ cmd_vel_timeout_s Deadman: zero output if /cmd_vel_auto goes silent (0.5 s)
+ control_rate Output publish rate (Hz) (20.0)
+"""
+
+import json
+import time
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
+
+from geometry_msgs.msg import Twist
+from std_msgs.msg import String
+
+from saltybot_mode_switch.mode_logic import blend_twist
+
+_BIG = 1e9
+
+
+class CmdVelMuxNode(Node):
+
+ def __init__(self):
+ super().__init__("cmd_vel_mux")
+
+ self.declare_parameter("cmd_vel_timeout_s", 0.5)
+ self.declare_parameter("control_rate", 20.0)
+
+ self._timeout = self.get_parameter("cmd_vel_timeout_s").value
+ self._rate = self.get_parameter("control_rate").value
+
+ reliable = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=10,
+ )
+
+ # ── State ─────────────────────────────────────────────────────────────
+ self._auto_linear_x: float = 0.0
+ self._auto_angular_z: float = 0.0
+ self._blend_alpha: float = 0.0
+ self._last_auto_t: float = 0.0
+
+ # ── Subscriptions ─────────────────────────────────────────────────────
+ self.create_subscription(
+ Twist, "/cmd_vel_auto", self._auto_cb, reliable)
+ self.create_subscription(
+ String, "/saltybot/control_mode", self._mode_cb, reliable)
+
+ # ── Publisher ─────────────────────────────────────────────────────────
+ self._pub = self.create_publisher(Twist, "/cmd_vel", reliable)
+
+ # ── Timer ─────────────────────────────────────────────────────────────
+ self._timer = self.create_timer(1.0 / self._rate, self._publish_cb)
+
+ self.get_logger().info(
+ f"CmdVelMuxNode ready rate={self._rate:.0f}Hz "
+ f"timeout={self._timeout:.1f}s"
+ )
+
+ # ── Callbacks ─────────────────────────────────────────────────────────────
+
+ def _auto_cb(self, msg: Twist) -> None:
+ self._auto_linear_x = msg.linear.x
+ self._auto_angular_z = msg.angular.z
+ self._last_auto_t = time.monotonic()
+
+ def _mode_cb(self, msg: String) -> None:
+ try:
+ data = json.loads(msg.data)
+ self._blend_alpha = float(data.get("blend_alpha", 0.0))
+ except (json.JSONDecodeError, TypeError, ValueError):
+ self._blend_alpha = 0.0
+
+ # ── 20 Hz publish ─────────────────────────────────────────────────────────
+
+ def _publish_cb(self) -> None:
+ now = time.monotonic()
+
+ # Deadman: zero if autonomous source went silent
+ auto_age = (now - self._last_auto_t) if self._last_auto_t > 0.0 else _BIG
+ if auto_age > self._timeout:
+ linear_x, angular_z = 0.0, 0.0
+ else:
+ linear_x, angular_z = blend_twist(
+ self._auto_linear_x,
+ self._auto_angular_z,
+ self._blend_alpha,
+ )
+
+ twist = Twist()
+ twist.linear.x = linear_x
+ twist.angular.z = angular_z
+ self._pub.publish(twist)
+
+
+# ── Entry point ───────────────────────────────────────────────────────────────
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = CmdVelMuxNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.try_shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_logic.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_logic.py
new file mode 100644
index 0000000..1a9b2d5
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_logic.py
@@ -0,0 +1,241 @@
+"""
+mode_logic.py — Pure RC/Autonomous mode-switch logic for saltybot.
+
+All functions are stateless or operate on plain Python types so the full
+state machine can be exercised in unit tests without a ROS2 runtime.
+
+Mode vocabulary
+---------------
+ "RC" — STM32 executing RC pilot commands; Jetson cmd_vel blocked.
+ "RAMP_TO_AUTO" — Transitioning RC→AUTO; blend_alpha 0.0→1.0 over ramp_s.
+ "AUTO" — STM32 executing Jetson cmd_vel; RC sticks idle.
+ "RAMP_TO_RC" — Transitioning AUTO→RC; blend_alpha 1.0→0.0 over ramp_s.
+
+Blend alpha
+-----------
+ The node publishes blend_alpha ∈ [0, 1] in /saltybot/control_mode.
+ The cmd_vel_mux_node scales autonomous cmd_vel by alpha before publishing
+ to /cmd_vel (the bridge input). alpha=0 ⟹ no autonomous motion.
+
+RC override
+-----------
+ Any stick magnitude > override_threshold while in AUTO instantly forces RC
+ (alpha → 0 immediately; no ramp). _override_active is set so the
+ return-to-auto path requires sticks to be neutral for return_delay seconds.
+
+CH6 / toggle
+------------
+ ch6_auto=True means the pilot has requested AUTO (switch up/high).
+ ch6_auto=False means the pilot has requested RC (switch down/low).
+ Pure CH6 transitions (no stick activity) always ramp gracefully.
+
+Safety interlocks
+-----------------
+ • No AUTO without slam_ok (valid SLAM fix received recently).
+ • Instant drop to RC when rc_link_ok goes False (RC link lost).
+ These checks are applied at every control tick.
+"""
+
+import math
+
+# ─── Constants ──────────────────────────────────────────────────────────────
+
+MODES = ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC")
+
+LED_PATTERNS = {
+ "rc_normal": "solid_yellow",
+ "rc_no_slam": "solid_yellow", # no slam needed in RC
+ "auto_ok": "solid_green",
+ "auto_no_slam": "blink_orange_fast", # shouldn't happen (interlock), but defensive
+ "ramp_to_auto": "blink_green_slow",
+ "ramp_to_rc": "blink_yellow_slow",
+ "no_rc_link": "blink_red_fast",
+}
+
+# ─── Stick / CH6 helpers ────────────────────────────────────────────────────
+
+def ch6_wants_auto(axes: list, ch6_idx: int, threshold: float = 0.5) -> bool:
+ """
+ Return True when CH6 (aux switch) is in the AUTO position.
+
+ Typical CRSF→Joy mapping: AUX2 = axes[5].
+ Switch up (high) = +1.0 → AUTO; switch down (low) = -1.0 → RC.
+ threshold default 0.5 gives hysteresis against mid-travel noise.
+ """
+ if ch6_idx < 0 or ch6_idx >= len(axes):
+ return False
+ return float(axes[ch6_idx]) > threshold
+
+
+def sticks_active(
+ axes: list,
+ stick_indices: tuple,
+ threshold: float = 0.10,
+) -> bool:
+ """
+ Return True when any monitored stick axis exceeds threshold magnitude.
+
+ stick_indices: tuple of Joy axes indices for the four sticks
+ (roll, pitch, throttle, yaw). Default CRSF mapping:
+ (0, 1, 2, 3).
+ threshold: 10 % of full deflection (issue spec).
+ """
+ return any(
+ abs(float(axes[i])) > threshold
+ for i in stick_indices
+ if i < len(axes)
+ )
+
+
+# ─── Alpha / ramp helpers ───────────────────────────────────────────────────
+
+def ramp_alpha(elapsed_s: float, ramp_s: float) -> float:
+ """
+ Compute forward blend alpha [0, 1] linearly over ramp_s seconds.
+
+ elapsed_s : seconds since ramp start.
+ ramp_s : total ramp duration (seconds).
+ """
+ if ramp_s <= 0.0:
+ return 1.0
+ return min(1.0, elapsed_s / ramp_s)
+
+
+def alpha_for_mode(mode: str, elapsed_s: float, ramp_s: float) -> float:
+ """
+ Return the blend alpha appropriate for `mode`.
+
+ RC → 0.0
+ RAMP_TO_AUTO → 0.0 … 1.0 (linear with elapsed)
+ AUTO → 1.0
+ RAMP_TO_RC → 1.0 … 0.0 (linear with elapsed)
+ """
+ if mode == "RC":
+ return 0.0
+ if mode == "AUTO":
+ return 1.0
+ if mode == "RAMP_TO_AUTO":
+ return ramp_alpha(elapsed_s, ramp_s)
+ if mode == "RAMP_TO_RC":
+ return 1.0 - ramp_alpha(elapsed_s, ramp_s)
+ return 0.0
+
+
+# ─── LED pattern ────────────────────────────────────────────────────────────
+
+def led_pattern(mode: str, slam_ok: bool, rc_link_ok: bool) -> str:
+ """Return the LED pattern name for the current operating state."""
+ if not rc_link_ok:
+ return LED_PATTERNS["no_rc_link"]
+ if mode in ("RC", "RAMP_TO_RC"):
+ return LED_PATTERNS["rc_normal"]
+ if mode == "RAMP_TO_AUTO":
+ return LED_PATTERNS["ramp_to_auto"]
+ if mode == "AUTO":
+ return LED_PATTERNS["auto_ok"] if slam_ok else LED_PATTERNS["auto_no_slam"]
+ return LED_PATTERNS["no_rc_link"]
+
+
+# ─── State machine step ──────────────────────────────────────────────────────
+
+def step_state(
+ current_mode: str,
+ ramp_elapsed_s: float,
+ ramp_s: float,
+ ch6_auto: bool,
+ sticks_now_active: bool,
+ sticks_neutral_elapsed_s: float,
+ return_delay_s: float,
+ slam_ok: bool,
+ rc_link_ok: bool,
+ override_active: bool,
+) -> tuple:
+ """
+ Compute (next_mode, next_override_active, instant_to_rc).
+
+ Parameters
+ ----------
+ current_mode : current mode string
+ ramp_elapsed_s : seconds since last ramp start
+ ramp_s : configured ramp duration
+ ch6_auto : True = pilot switch requests AUTO
+ sticks_now_active : True = at least one stick > override_threshold
+ sticks_neutral_elapsed_s: seconds since all sticks went neutral (0 if still active)
+ return_delay_s : seconds of stick-neutral required to re-engage AUTO
+ slam_ok : True = SLAM fix is valid
+ rc_link_ok : True = CRSF link alive
+ override_active : True = currently in stick-override RC
+
+ Returns
+ -------
+ next_mode : new mode string
+ next_override_active: updated override flag
+ instant_to_rc : True when dropping to RC without ramp (safety / stick override)
+ """
+ instant_to_rc = False
+
+ # ── Safety: RC link lost → instant RC ────────────────────────────────────
+ if not rc_link_ok:
+ return "RC", False, True
+
+ # ── Compute override transitions ──────────────────────────────────────────
+ next_override = override_active
+ if sticks_now_active and current_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"):
+ # Stick input while AUTO or any transition → instant RC override
+ next_override = True
+ return "RC", True, True
+
+ if not sticks_now_active and sticks_neutral_elapsed_s >= return_delay_s:
+ # Sticks have been neutral long enough — clear override
+ next_override = False
+
+ # ── Ramp completion ───────────────────────────────────────────────────────
+ ramp_done = ramp_elapsed_s >= ramp_s - 1e-9 # epsilon guards float accumulation
+
+ # ── Transitions per state ─────────────────────────────────────────────────
+ if current_mode == "RC":
+ if ch6_auto and slam_ok and not next_override:
+ return "RAMP_TO_AUTO", next_override, False
+ return "RC", next_override, False
+
+ if current_mode == "RAMP_TO_AUTO":
+ if not ch6_auto:
+ return "RAMP_TO_RC", next_override, False
+ if not slam_ok:
+ return "RAMP_TO_RC", next_override, False
+ if ramp_done:
+ return "AUTO", next_override, False
+ return "RAMP_TO_AUTO", next_override, False
+
+ if current_mode == "AUTO":
+ if not ch6_auto:
+ return "RAMP_TO_RC", next_override, False
+ if not slam_ok:
+ return "RAMP_TO_RC", next_override, False
+ return "AUTO", next_override, False
+
+ if current_mode == "RAMP_TO_RC":
+ if ch6_auto and slam_ok and not next_override:
+ return "RAMP_TO_AUTO", next_override, False
+ if ramp_done:
+ return "RC", next_override, False
+ return "RAMP_TO_RC", next_override, False
+
+ return "RC", next_override, True # unknown state → safe fallback
+
+
+# ─── cmd_vel blending ───────────────────────────────────────────────────────
+
+def blend_twist(
+ auto_linear_x: float,
+ auto_angular_z: float,
+ alpha: float,
+) -> tuple:
+ """
+ Scale autonomous cmd_vel by blend alpha.
+
+ Returns (linear_x, angular_z) for the blended output.
+ alpha=0 → zeros; alpha=1 → full autonomous.
+ """
+ a = max(0.0, min(1.0, alpha))
+ return auto_linear_x * a, auto_angular_z * a
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_switch_node.py b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_switch_node.py
new file mode 100644
index 0000000..78ed0a1
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/saltybot_mode_switch/mode_switch_node.py
@@ -0,0 +1,307 @@
+"""
+mode_switch_node.py — Autonomous/RC mode switch for saltybot (Issue #104).
+
+Inputs
+------
+ /rc/joy (sensor_msgs/Joy)
+ CRSF channel axes:
+ axes[ch6_axis] AUX2 — mode toggle (+1=AUTO, -1=RC)
+ axes[stick_axes...] Roll/Pitch/Throttle/Yaw — override detection
+
+ /saltybot/balance_state (std_msgs/String JSON)
+ Parsed for RC link health (field "rc_link") and STM32 mode.
+
+ (geometry_msgs/PoseWithCovarianceStamped)
+ Any message received within slam_fix_timeout_s → SLAM fix valid.
+
+Outputs
+-------
+ /saltybot/control_mode (std_msgs/String JSON)
+ {
+ "mode": "RC" | "RAMP_TO_AUTO" | "AUTO" | "RAMP_TO_RC",
+ "blend_alpha": 0.0 … 1.0,
+ "slam_ok": bool,
+ "rc_link_ok": bool,
+ "override_active": bool
+ }
+
+ /saltybot/led_pattern (std_msgs/String)
+ LED pattern name consumed by the LED controller node.
+
+State machine
+-------------
+ See mode_logic.py for the full transition table.
+
+ RC — pilot in control; blend_alpha = 0.0
+ RAMP_TO_AUTO — transitioning; blend_alpha 0→1 over ramp_duration_s (500 ms)
+ AUTO — Jetson in control; blend_alpha = 1.0
+ RAMP_TO_RC — transitioning; blend_alpha 1→0 over ramp_duration_s (500 ms)
+
+Safety interlocks
+-----------------
+ • Any stick > override_threshold while AUTO/RAMP_TO_AUTO → instant RC.
+ • rc_link lost (Joy silent > rc_link_timeout_s) → instant RC.
+ • AUTO not entered without valid SLAM fix.
+ • SLAM fix lost while AUTO → RAMP_TO_RC.
+
+Parameters
+----------
+ ch6_axis Joy axes index for the mode-toggle switch (default 5)
+ stick_axes Space-separated Joy indices for 4 sticks (default "0 1 2 3")
+ override_threshold Stick magnitude for instant RC override (default 0.10)
+ return_delay_s Sticks-neutral duration before re-engage (default 2.0 s)
+ ramp_duration_s RC↔AUTO blend ramp duration (default 0.5 s)
+ rc_link_timeout_s Joy silence threshold → RC link lost (default 0.5 s)
+ slam_fix_topic Pose topic for SLAM fix detection (default /slam_toolbox/pose_with_covariance_stamped)
+ slam_fix_timeout_s Pose silence threshold → SLAM fix lost (default 3.0 s)
+ control_rate State machine tick rate (Hz) (default 20.0)
+"""
+
+import json
+import time
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy, DurabilityPolicy
+
+from geometry_msgs.msg import PoseWithCovarianceStamped
+from sensor_msgs.msg import Joy
+from std_msgs.msg import String
+
+from saltybot_mode_switch.mode_logic import (
+ alpha_for_mode,
+ ch6_wants_auto,
+ led_pattern,
+ step_state,
+ sticks_active,
+)
+
+_BIG = 1e9 # sentinel: "never received"
+
+
+class ModeSwitchNode(Node):
+
+ def __init__(self):
+ super().__init__("mode_switch")
+
+ # ── Parameters ────────────────────────────────────────────────────────
+ self.declare_parameter("ch6_axis", 5)
+ self.declare_parameter("stick_axes", "0 1 2 3")
+ self.declare_parameter("override_threshold", 0.10)
+ self.declare_parameter("return_delay_s", 2.0)
+ self.declare_parameter("ramp_duration_s", 0.5)
+ self.declare_parameter("rc_link_timeout_s", 0.5)
+ self.declare_parameter("slam_fix_topic",
+ "/slam_toolbox/pose_with_covariance_stamped")
+ self.declare_parameter("slam_fix_timeout_s", 3.0)
+ self.declare_parameter("control_rate", 20.0)
+
+ self._reload_params()
+
+ # ── State ─────────────────────────────────────────────────────────────
+ self._mode: str = "RC"
+ self._override_active: bool = False
+ self._ramp_start: float = 0.0 # monotonic time ramp began
+ self._sticks_neutral_since: float | None = None
+ self._last_joy_t: float = 0.0 # monotonic; 0 = never
+ self._last_slam_t: float = 0.0
+ self._joy_axes: list = []
+ self._stm32_mode: int = 0 # from balance_state JSON
+
+ # ── QoS ───────────────────────────────────────────────────────────────
+ best_effort = QoSProfile(
+ reliability=ReliabilityPolicy.BEST_EFFORT,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=5,
+ )
+ reliable = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=10,
+ )
+ # SLAM toolbox uses transient_local for its pose topic
+ transient = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST,
+ durability=DurabilityPolicy.TRANSIENT_LOCAL,
+ depth=1,
+ )
+
+ # ── Subscriptions ─────────────────────────────────────────────────────
+ self.create_subscription(Joy, "/rc/joy", self._joy_cb, best_effort)
+ self.create_subscription(
+ String, "/saltybot/balance_state", self._balance_cb, reliable)
+
+ slam_topic = self.get_parameter("slam_fix_topic").value
+ # Try transient_local first (SLAM Toolbox); fall back gracefully if not
+ try:
+ self.create_subscription(
+ PoseWithCovarianceStamped, slam_topic,
+ self._slam_cb, transient)
+ except Exception:
+ self.create_subscription(
+ PoseWithCovarianceStamped, slam_topic,
+ self._slam_cb, reliable)
+
+ # ── Publishers ────────────────────────────────────────────────────────
+ self._mode_pub = self.create_publisher(
+ String, "/saltybot/control_mode", reliable)
+ self._led_pub = self.create_publisher(
+ String, "/saltybot/led_pattern", reliable)
+
+ # ── Control timer ─────────────────────────────────────────────────────
+ self._timer = self.create_timer(
+ 1.0 / self._p["control_rate"], self._tick)
+
+ self.get_logger().info(
+ f"ModeSwitchNode ready "
+ f"ramp={self._p['ramp_duration_s']:.1f}s "
+ f"override_thresh={self._p['override_threshold']:.0%} "
+ f"return_delay={self._p['return_delay_s']:.1f}s "
+ f"slam_topic={slam_topic}"
+ )
+
+ # ── Parameter reload ──────────────────────────────────────────────────────
+
+ def _reload_params(self):
+ raw_stick_axes = self.get_parameter("stick_axes").value
+ self._p = {
+ "ch6_axis": int(self.get_parameter("ch6_axis").value),
+ "stick_indices": tuple(int(x) for x in raw_stick_axes.split()),
+ "override_threshold": float(self.get_parameter("override_threshold").value),
+ "return_delay_s": float(self.get_parameter("return_delay_s").value),
+ "ramp_duration_s": float(self.get_parameter("ramp_duration_s").value),
+ "rc_link_timeout_s": float(self.get_parameter("rc_link_timeout_s").value),
+ "slam_fix_timeout_s": float(self.get_parameter("slam_fix_timeout_s").value),
+ "control_rate": float(self.get_parameter("control_rate").value),
+ }
+
+ # ── Callbacks ─────────────────────────────────────────────────────────────
+
+ def _joy_cb(self, msg: Joy) -> None:
+ self._joy_axes = list(msg.axes)
+ self._last_joy_t = time.monotonic()
+
+ def _balance_cb(self, msg: String) -> None:
+ try:
+ data = json.loads(msg.data)
+ # "mode" is a label string; map back to int for reference
+ mode_label = data.get("mode", "RC_MANUAL")
+ self._stm32_mode = {"RC_MANUAL": 0, "RC_ASSISTED": 1,
+ "AUTONOMOUS": 2}.get(mode_label, 0)
+ except (json.JSONDecodeError, TypeError):
+ pass
+
+ def _slam_cb(self, msg: PoseWithCovarianceStamped) -> None:
+ self._last_slam_t = time.monotonic()
+
+ # ── 20 Hz control tick ────────────────────────────────────────────────────
+
+ def _tick(self) -> None:
+ self._reload_params()
+ now = time.monotonic()
+ p = self._p
+
+ # ── Derive inputs ─────────────────────────────────────────────────────
+ joy_age = (now - self._last_joy_t) if self._last_joy_t > 0.0 else _BIG
+ slam_age = (now - self._last_slam_t) if self._last_slam_t > 0.0 else _BIG
+ rc_link_ok = joy_age <= p["rc_link_timeout_s"]
+ slam_ok = slam_age <= p["slam_fix_timeout_s"]
+
+ axes = self._joy_axes
+ ch6 = ch6_wants_auto(axes, p["ch6_axis"])
+ active = sticks_active(axes, p["stick_indices"], p["override_threshold"])
+
+ # Track neutral timer
+ if active:
+ self._sticks_neutral_since = None
+ elif self._sticks_neutral_since is None:
+ self._sticks_neutral_since = now
+
+ neutral_elapsed = (
+ (now - self._sticks_neutral_since)
+ if self._sticks_neutral_since is not None else 0.0
+ )
+
+ ramp_elapsed = now - self._ramp_start
+
+ # ── State machine step ────────────────────────────────────────────────
+ prev_mode = self._mode
+ next_mode, next_override, instant = step_state(
+ current_mode=self._mode,
+ ramp_elapsed_s=ramp_elapsed,
+ ramp_s=p["ramp_duration_s"],
+ ch6_auto=ch6,
+ sticks_now_active=active,
+ sticks_neutral_elapsed_s=neutral_elapsed,
+ return_delay_s=p["return_delay_s"],
+ slam_ok=slam_ok,
+ rc_link_ok=rc_link_ok,
+ override_active=self._override_active,
+ )
+
+ if next_mode != prev_mode:
+ # Log transitions; reset ramp timer on ramp entry
+ self.get_logger().info(
+ f"Mode: {prev_mode} → {next_mode}"
+ f" slam_ok={slam_ok} rc_link={rc_link_ok}"
+ f" override={next_override}"
+ )
+ if next_mode in ("RAMP_TO_AUTO", "RAMP_TO_RC"):
+ self._ramp_start = now
+ ramp_elapsed = 0.0
+
+ self._mode = next_mode
+ self._override_active = next_override
+
+ # ── Compute blend alpha ───────────────────────────────────────────────
+ current_ramp_elapsed = now - self._ramp_start
+ alpha = alpha_for_mode(next_mode, current_ramp_elapsed, p["ramp_duration_s"])
+ if instant:
+ alpha = 0.0
+
+ # ── Log steady warnings ───────────────────────────────────────────────
+ if next_mode == "AUTO" and not slam_ok:
+ self.get_logger().warn(
+ "In AUTO but SLAM fix lost — will exit AUTO",
+ throttle_duration_sec=2.0)
+ if not rc_link_ok:
+ self.get_logger().warn(
+ "RC link lost — forced RC mode",
+ throttle_duration_sec=1.0)
+
+ # ── Publish /saltybot/control_mode ────────────────────────────────────
+ payload = {
+ "mode": next_mode,
+ "blend_alpha": round(alpha, 4),
+ "slam_ok": slam_ok,
+ "rc_link_ok": rc_link_ok,
+ "override_active": next_override,
+ }
+ mode_msg = String()
+ mode_msg.data = json.dumps(payload)
+ self._mode_pub.publish(mode_msg)
+
+ # ── Publish /saltybot/led_pattern ─────────────────────────────────────
+ pattern = led_pattern(next_mode, slam_ok, rc_link_ok)
+ led_msg = String()
+ led_msg.data = pattern
+ self._led_pub.publish(led_msg)
+
+
+# ── Entry point ───────────────────────────────────────────────────────────────
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = ModeSwitchNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.try_shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg b/jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg
new file mode 100644
index 0000000..27fd664
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/setup.cfg
@@ -0,0 +1,5 @@
+[develop]
+script_dir=$base/lib/saltybot_mode_switch
+
+[install]
+install_scripts=$base/lib/saltybot_mode_switch
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/setup.py b/jetson/ros2_ws/src/saltybot_mode_switch/setup.py
new file mode 100644
index 0000000..8b47063
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/setup.py
@@ -0,0 +1,33 @@
+from setuptools import setup, find_packages
+import os
+from glob import glob
+
+package_name = "saltybot_mode_switch"
+
+setup(
+ name=package_name,
+ version="0.1.0",
+ packages=find_packages(exclude=["test"]),
+ data_files=[
+ ("share/ament_index/resource_index/packages",
+ [f"resource/{package_name}"]),
+ (f"share/{package_name}", ["package.xml"]),
+ (os.path.join("share", package_name, "config"),
+ glob("config/*.yaml")),
+ (os.path.join("share", package_name, "launch"),
+ glob("launch/*.py")),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="sl-controls",
+ maintainer_email="sl-controls@saltylab.local",
+ description="Autonomous/RC mode switch with 500 ms blend ramp (Issue #104)",
+ license="MIT",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ f"mode_switch_node = {package_name}.mode_switch_node:main",
+ f"cmd_vel_mux_node = {package_name}.cmd_vel_mux_node:main",
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py b/jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py
new file mode 100644
index 0000000..30f1056
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_mode_switch/test/test_mode_logic.py
@@ -0,0 +1,481 @@
+"""
+test_mode_logic.py — Unit tests for saltybot_mode_switch pure logic.
+
+Covers:
+ - ch6_wants_auto / sticks_active helpers
+ - ramp_alpha / alpha_for_mode
+ - led_pattern
+ - step_state state machine (all transitions)
+ - blend_twist
+
+No ROS2 runtime required.
+"""
+
+import sys
+import os
+import math
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+import pytest
+
+from saltybot_mode_switch.mode_logic import (
+ alpha_for_mode,
+ blend_twist,
+ ch6_wants_auto,
+ led_pattern,
+ ramp_alpha,
+ step_state,
+ sticks_active,
+)
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Helpers
+# ─────────────────────────────────────────────────────────────────────────────
+
+def _step(
+ mode="RC",
+ ramp_elapsed=0.0,
+ ramp_s=0.5,
+ ch6_auto=False,
+ sticks_now_active=False,
+ neutral_elapsed=0.0,
+ return_delay=2.0,
+ slam_ok=True,
+ rc_link_ok=True,
+ override_active=False,
+):
+ return step_state(
+ current_mode=mode,
+ ramp_elapsed_s=ramp_elapsed,
+ ramp_s=ramp_s,
+ ch6_auto=ch6_auto,
+ sticks_now_active=sticks_now_active,
+ sticks_neutral_elapsed_s=neutral_elapsed,
+ return_delay_s=return_delay,
+ slam_ok=slam_ok,
+ rc_link_ok=rc_link_ok,
+ override_active=override_active,
+ )
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# ch6_wants_auto
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestCh6WantsAuto:
+
+ def test_high_value_returns_true(self):
+ assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 1.0], ch6_idx=5) is True
+
+ def test_low_value_returns_false(self):
+ assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, -1.0], ch6_idx=5) is False
+
+ def test_mid_value_below_threshold(self):
+ assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.4], ch6_idx=5) is False
+
+ def test_exactly_at_threshold(self):
+ # 0.5 > 0.5 is False; require strictly above
+ assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.5], ch6_idx=5) is False
+
+ def test_just_above_threshold(self):
+ assert ch6_wants_auto([0.0, 0.0, 0.0, 0.0, 0.0, 0.51], ch6_idx=5) is True
+
+ def test_out_of_range_index(self):
+ assert ch6_wants_auto([1.0], ch6_idx=10) is False
+
+ def test_empty_axes(self):
+ assert ch6_wants_auto([], ch6_idx=0) is False
+
+ def test_custom_threshold(self):
+ assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.2) is True
+ assert ch6_wants_auto([0.0, 0.3], ch6_idx=1, threshold=0.5) is False
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# sticks_active
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestSticksActive:
+
+ def test_all_zero_not_active(self):
+ assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 2, 3)) is False
+
+ def test_one_stick_above_threshold(self):
+ assert sticks_active([0.0, 0.15, 0.0, 0.0], (0, 1, 2, 3)) is True
+
+ def test_all_exactly_at_threshold_not_active(self):
+ axes = [0.10, 0.10, 0.10, 0.10]
+ assert sticks_active(axes, (0, 1, 2, 3), threshold=0.10) is False
+
+ def test_just_above_threshold_active(self):
+ assert sticks_active([0.0, 0.0, 0.0, 0.11], (0, 1, 2, 3)) is True
+
+ def test_negative_deflection_detected(self):
+ assert sticks_active([-0.5, 0.0, 0.0, 0.0], (0,)) is True
+
+ def test_out_of_range_index_skipped(self):
+ # index 5 is out of range for a 4-element list — should not crash
+ assert sticks_active([0.0, 0.0, 0.0, 0.0], (0, 1, 5)) is False
+
+ def test_only_monitored_axes_checked(self):
+ # axes[4] = 0.9 but only (0,1,2,3) are monitored
+ assert sticks_active([0.0, 0.0, 0.0, 0.0, 0.9], (0, 1, 2, 3)) is False
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# ramp_alpha / alpha_for_mode
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestRampAlpha:
+
+ def test_zero_elapsed_returns_zero(self):
+ assert ramp_alpha(0.0, 0.5) == pytest.approx(0.0)
+
+ def test_full_elapsed_returns_one(self):
+ assert ramp_alpha(0.5, 0.5) == pytest.approx(1.0)
+
+ def test_beyond_duration_clamped_to_one(self):
+ assert ramp_alpha(1.0, 0.5) == pytest.approx(1.0)
+
+ def test_halfway(self):
+ assert ramp_alpha(0.25, 0.5) == pytest.approx(0.5)
+
+ def test_zero_duration_returns_one(self):
+ assert ramp_alpha(0.0, 0.0) == pytest.approx(1.0)
+
+
+class TestAlphaForMode:
+
+ def test_rc_returns_zero(self):
+ assert alpha_for_mode("RC", 0.3, 0.5) == 0.0
+
+ def test_auto_returns_one(self):
+ assert alpha_for_mode("AUTO", 0.3, 0.5) == 1.0
+
+ def test_ramp_to_auto_mid(self):
+ assert alpha_for_mode("RAMP_TO_AUTO", 0.25, 0.5) == pytest.approx(0.5)
+
+ def test_ramp_to_rc_mid(self):
+ assert alpha_for_mode("RAMP_TO_RC", 0.25, 0.5) == pytest.approx(0.5)
+
+ def test_ramp_to_auto_start(self):
+ assert alpha_for_mode("RAMP_TO_AUTO", 0.0, 0.5) == pytest.approx(0.0)
+
+ def test_ramp_to_rc_start(self):
+ assert alpha_for_mode("RAMP_TO_RC", 0.0, 0.5) == pytest.approx(1.0)
+
+ def test_ramp_to_auto_complete(self):
+ assert alpha_for_mode("RAMP_TO_AUTO", 0.5, 0.5) == pytest.approx(1.0)
+
+ def test_ramp_to_rc_complete(self):
+ assert alpha_for_mode("RAMP_TO_RC", 0.5, 0.5) == pytest.approx(0.0)
+
+ def test_unknown_mode_returns_zero(self):
+ assert alpha_for_mode("UNKNOWN", 0.3, 0.5) == 0.0
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# led_pattern
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestLedPattern:
+
+ def test_no_rc_link_overrides_all(self):
+ for mode in ("RC", "RAMP_TO_AUTO", "AUTO", "RAMP_TO_RC"):
+ assert led_pattern(mode, slam_ok=True, rc_link_ok=False) == "blink_red_fast"
+
+ def test_rc_mode_yellow(self):
+ assert led_pattern("RC", slam_ok=True, rc_link_ok=True) == "solid_yellow"
+
+ def test_ramp_to_rc_yellow(self):
+ assert led_pattern("RAMP_TO_RC", slam_ok=True, rc_link_ok=True) == "solid_yellow"
+
+ def test_auto_ok_green(self):
+ assert led_pattern("AUTO", slam_ok=True, rc_link_ok=True) == "solid_green"
+
+ def test_auto_no_slam_orange(self):
+ assert led_pattern("AUTO", slam_ok=False, rc_link_ok=True) == "blink_orange_fast"
+
+ def test_ramp_to_auto_blink_green(self):
+ assert led_pattern("RAMP_TO_AUTO", slam_ok=True, rc_link_ok=True) == "blink_green_slow"
+
+ def test_rc_slam_state_irrelevant(self):
+ p1 = led_pattern("RC", slam_ok=True, rc_link_ok=True)
+ p2 = led_pattern("RC", slam_ok=False, rc_link_ok=True)
+ assert p1 == p2 # SLAM doesn't affect RC LED
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# step_state — RC transitions
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestStepStateFromRC:
+
+ def test_rc_stays_rc_by_default(self):
+ mode, override, instant = _step(mode="RC")
+ assert mode == "RC"
+ assert not instant
+
+ def test_rc_to_ramp_when_ch6_auto(self):
+ mode, override, instant = _step(mode="RC", ch6_auto=True)
+ assert mode == "RAMP_TO_AUTO"
+ assert not instant
+
+ def test_rc_stays_rc_when_ch6_auto_but_no_slam(self):
+ mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False)
+ assert mode == "RC"
+
+ def test_rc_stays_rc_when_ch6_auto_but_no_rc_link(self):
+ mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False)
+ assert mode == "RC"
+ assert instant # RC link lost → instant
+
+ def test_rc_stays_rc_when_override_active_and_sticks_not_neutral(self):
+ mode, _, _ = _step(
+ mode="RC", ch6_auto=True, slam_ok=True,
+ override_active=True, neutral_elapsed=1.0, return_delay=2.0)
+ assert mode == "RC"
+
+ def test_rc_to_ramp_after_override_clears(self):
+ # override clears after sticks neutral >= return_delay
+ mode, override, _ = _step(
+ mode="RC", ch6_auto=True, slam_ok=True,
+ override_active=True, neutral_elapsed=2.5, return_delay=2.0)
+ assert mode == "RAMP_TO_AUTO"
+ assert not override
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# step_state — RAMP_TO_AUTO transitions
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestStepStateFromRampToAuto:
+
+ def test_continues_ramping(self):
+ mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.2, ramp_s=0.5,
+ ch6_auto=True)
+ assert mode == "RAMP_TO_AUTO"
+
+ def test_completes_to_auto(self):
+ mode, _, _ = _step(mode="RAMP_TO_AUTO", ramp_elapsed=0.5, ramp_s=0.5,
+ ch6_auto=True)
+ assert mode == "AUTO"
+
+ def test_stick_override_instant_rc(self):
+ mode, override, instant = _step(
+ mode="RAMP_TO_AUTO", ch6_auto=True,
+ sticks_now_active=True)
+ assert mode == "RC"
+ assert override is True
+ assert instant is True
+
+ def test_ch6_low_aborts_to_ramp_rc(self):
+ mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=False, ramp_elapsed=0.1)
+ assert mode == "RAMP_TO_RC"
+
+ def test_slam_lost_aborts_to_ramp_rc(self):
+ mode, _, _ = _step(mode="RAMP_TO_AUTO", ch6_auto=True, slam_ok=False)
+ assert mode == "RAMP_TO_RC"
+
+ def test_rc_link_lost_instant_rc(self):
+ mode, _, instant = _step(mode="RAMP_TO_AUTO", ch6_auto=True, rc_link_ok=False)
+ assert mode == "RC"
+ assert instant is True
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# step_state — AUTO transitions
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestStepStateFromAuto:
+
+ def test_auto_stays_auto(self):
+ mode, _, _ = _step(mode="AUTO", ch6_auto=True)
+ assert mode == "AUTO"
+
+ def test_stick_override_instant_rc(self):
+ mode, override, instant = _step(
+ mode="AUTO", ch6_auto=True, sticks_now_active=True)
+ assert mode == "RC"
+ assert override is True
+ assert instant is True
+
+ def test_ch6_low_ramp_to_rc(self):
+ mode, _, instant = _step(mode="AUTO", ch6_auto=False)
+ assert mode == "RAMP_TO_RC"
+ assert not instant
+
+ def test_slam_lost_ramp_to_rc(self):
+ mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False)
+ assert mode == "RAMP_TO_RC"
+ assert not instant
+
+ def test_rc_link_lost_instant_rc(self):
+ mode, _, instant = _step(mode="AUTO", ch6_auto=True, rc_link_ok=False)
+ assert mode == "RC"
+ assert instant is True
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# step_state — RAMP_TO_RC transitions
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestStepStateFromRampToRC:
+
+ def test_continues_ramping(self):
+ mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.2, ramp_s=0.5)
+ assert mode == "RAMP_TO_RC"
+
+ def test_completes_to_rc(self):
+ mode, _, _ = _step(mode="RAMP_TO_RC", ramp_elapsed=0.5, ramp_s=0.5)
+ assert mode == "RC"
+
+ def test_stick_override_instant_rc(self):
+ mode, override, instant = _step(
+ mode="RAMP_TO_RC", sticks_now_active=True, ch6_auto=True)
+ assert mode == "RC"
+ assert instant is True
+
+ def test_ch6_still_auto_no_override_cancels_back(self):
+ # CH6 still says AUTO, no override → cancel to RAMP_TO_AUTO
+ mode, _, _ = _step(mode="RAMP_TO_RC", ch6_auto=True, slam_ok=True,
+ override_active=False, ramp_elapsed=0.1)
+ assert mode == "RAMP_TO_AUTO"
+
+ def test_rc_link_lost_instant_rc(self):
+ _, _, instant = _step(mode="RAMP_TO_RC", rc_link_ok=False)
+ assert instant is True
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# blend_twist
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestBlendTwist:
+
+ def test_alpha_zero_zeros_output(self):
+ lx, az = blend_twist(1.0, 0.5, 0.0)
+ assert lx == pytest.approx(0.0)
+ assert az == pytest.approx(0.0)
+
+ def test_alpha_one_passes_through(self):
+ lx, az = blend_twist(0.4, -0.7, 1.0)
+ assert lx == pytest.approx(0.4)
+ assert az == pytest.approx(-0.7)
+
+ def test_alpha_half_scales(self):
+ lx, az = blend_twist(1.0, 1.0, 0.5)
+ assert lx == pytest.approx(0.5)
+ assert az == pytest.approx(0.5)
+
+ def test_alpha_above_one_clamped(self):
+ lx, az = blend_twist(1.0, 1.0, 1.5)
+ assert lx == pytest.approx(1.0)
+ assert az == pytest.approx(1.0)
+
+ def test_alpha_below_zero_clamped(self):
+ lx, az = blend_twist(1.0, 1.0, -0.5)
+ assert lx == pytest.approx(0.0)
+ assert az == pytest.approx(0.0)
+
+ def test_negative_cmd_vel_preserved(self):
+ lx, az = blend_twist(-0.5, -1.0, 0.5)
+ assert lx == pytest.approx(-0.25)
+ assert az == pytest.approx(-0.5)
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Integration scenarios
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestIntegrationScenarios:
+
+ def test_full_rc_to_auto_ramp_cycle(self):
+ """Simulate RC→RAMP_TO_AUTO→AUTO with 0.5s ramp at 20Hz."""
+ mode = "RC"
+ override = False
+ ramp_s = 0.5
+ dt = 1.0 / 20.0
+ t_ramp = 0.0
+
+ # CH6 flipped to AUTO, SLAM ok
+ mode, override, instant = step_state(
+ mode, 0.0, ramp_s, True, False, 5.0, 2.0, True, True, override)
+ assert mode == "RAMP_TO_AUTO"
+ t_ramp = 0.0
+
+ # Simulate 10 ticks (0.5s) → should complete
+ for tick in range(10):
+ t_ramp += dt
+ mode, override, instant = step_state(
+ mode, t_ramp, ramp_s, True, False, 5.0, 2.0, True, True, override)
+
+ assert mode == "AUTO"
+ assert alpha_for_mode("AUTO", t_ramp, ramp_s) == 1.0
+
+ def test_stick_override_then_return_to_auto(self):
+ """Stick override while AUTO → RC → neutral 2s → back to AUTO."""
+ # Start in AUTO
+ mode, override, instant = _step(
+ mode="AUTO", ch6_auto=True, sticks_now_active=True)
+ assert mode == "RC"
+ assert override is True
+ assert instant is True
+
+ # Sticks neutral but < 2s — stays RC
+ mode, override, _ = _step(
+ mode="RC", ch6_auto=True, slam_ok=True,
+ override_active=True, neutral_elapsed=1.0, return_delay=2.0)
+ assert mode == "RC"
+
+ # Sticks neutral >= 2s — override clears, enters ramp
+ mode, override, _ = _step(
+ mode="RC", ch6_auto=True, slam_ok=True,
+ override_active=True, neutral_elapsed=2.0, return_delay=2.0)
+ assert mode == "RAMP_TO_AUTO"
+ assert override is False
+
+ def test_slam_lost_during_auto_graceful_exit(self):
+ """SLAM fix lost during AUTO → graceful RAMP_TO_RC (not instant)."""
+ mode, _, instant = _step(mode="AUTO", ch6_auto=True, slam_ok=False)
+ assert mode == "RAMP_TO_RC"
+ assert not instant
+
+ def test_rc_link_lost_always_instant(self):
+ """RC link lost from any state → instant RC."""
+ for start_mode in ("AUTO", "RAMP_TO_AUTO", "RAMP_TO_RC"):
+ mode, _, instant = _step(mode=start_mode, ch6_auto=True, rc_link_ok=False)
+ assert mode == "RC", f"from {start_mode}"
+ assert instant is True, f"from {start_mode}"
+
+ def test_blend_alpha_sequence_through_ramp(self):
+ """Blend alpha is monotonically increasing during RAMP_TO_AUTO."""
+ ramp_s = 0.5
+ alphas = [
+ alpha_for_mode("RAMP_TO_AUTO", t * 0.05, ramp_s)
+ for t in range(11) # 0.0 to 0.5 s in 0.05 s steps
+ ]
+ for a, b in zip(alphas, alphas[1:]):
+ assert b >= a
+
+ def test_ramp_to_rc_alpha_decreasing(self):
+ """Blend alpha is monotonically decreasing during RAMP_TO_RC."""
+ ramp_s = 0.5
+ alphas = [
+ alpha_for_mode("RAMP_TO_RC", t * 0.05, ramp_s)
+ for t in range(11)
+ ]
+ for a, b in zip(alphas, alphas[1:]):
+ assert b <= a
+
+ def test_no_auto_without_slam(self):
+ """From RC, ch6=AUTO but slam=False → cannot enter AUTO path."""
+ mode, _, _ = _step(mode="RC", ch6_auto=True, slam_ok=False)
+ assert mode == "RC"
+
+ def test_no_auto_without_rc_link(self):
+ """From RC, ch6=AUTO but rc_link=False → stays RC (instant)."""
+ mode, _, instant = _step(mode="RC", ch6_auto=True, rc_link_ok=False)
+ assert mode == "RC"
+ assert instant is True