From 173b538f4474b2dc0b00bf9d5fb5c5aafe084a62 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Mon, 2 Mar 2026 12:01:00 -0500 Subject: [PATCH] feat(controls): Priority-based cmd_vel multiplexer (Issue #228) Implement priority-based velocity command routing with timeout-based fallback: - Teleop (priority 1) > Nav2 (priority 2) > Docking (priority 3) - 0.5s timeout per source: inactive sources are skipped in priority selection - When no source is active, publish zero command for safety Features: - 50Hz multiplexing frequency with configurable parameters - JSON status publishing (/saltybot/cmd_vel_mux_status) for telemetry - Automatic priority escalation: teleop preempts nav2, nav2 preempts docking - Fallback chain: if teleop times out, nav2 takes over; if nav2 times out, docking active - Zero command safety: all sources timeout = immediate motor stop Test Coverage: - 20+ unit tests covering enum, initialization, subscriptions - Priority selection logic (teleop preemption, nav2 preemption) - Timeout detection and source fallback - Realistic scenario tests (teleop release, priority escalation chains) - JSON status format validation Config: frequency=50Hz, source_timeout=0.5s Co-Authored-By: Claude Haiku 4.5 --- .../config/mux_config.yaml | 10 + .../launch/cmd_vel_mux.launch.py | 36 ++ .../src/saltybot_cmd_vel_mux/package.xml | 28 ++ .../resource/saltybot_cmd_vel_mux | 0 .../saltybot_cmd_vel_mux/__init__.py | 0 .../saltybot_cmd_vel_mux/cmd_vel_mux_node.py | 186 +++++++++ .../src/saltybot_cmd_vel_mux/setup.cfg | 4 + .../ros2_ws/src/saltybot_cmd_vel_mux/setup.py | 29 ++ .../src/saltybot_cmd_vel_mux/test/__init__.py | 0 .../test/test_cmd_vel_mux.py | 373 ++++++++++++++++++ 10 files changed, 666 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/config/mux_config.yaml create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/launch/cmd_vel_mux.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/resource/saltybot_cmd_vel_mux create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/saltybot_cmd_vel_mux/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/saltybot_cmd_vel_mux/cmd_vel_mux_node.py create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/test_cmd_vel_mux.py diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/config/mux_config.yaml b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/config/mux_config.yaml new file mode 100644 index 0000000..15cdaa3 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/config/mux_config.yaml @@ -0,0 +1,10 @@ +# cmd_vel multiplexer configuration + +cmd_vel_mux: + ros__parameters: + # Multiplexing frequency (Hz) + frequency: 50 # 50 Hz = 20ms cycle + + # Source timeout (seconds) + # If no message from a source for this duration, it becomes inactive + source_timeout: 0.5 diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/launch/cmd_vel_mux.launch.py b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/launch/cmd_vel_mux.launch.py new file mode 100644 index 0000000..bb291b7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/launch/cmd_vel_mux.launch.py @@ -0,0 +1,36 @@ +"""Launch file for cmd_vel_mux_node.""" + +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration +from launch.actions import DeclareLaunchArgument +import os +from ament_index_python.packages import get_package_share_directory + + +def generate_launch_description(): + """Generate launch description for cmd_vel multiplexer.""" + # Package directory + pkg_dir = get_package_share_directory("saltybot_cmd_vel_mux") + + # Parameters + config_file = os.path.join(pkg_dir, "config", "mux_config.yaml") + + # Declare launch arguments + return LaunchDescription( + [ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to configuration YAML file", + ), + # cmd_vel multiplexer node + Node( + package="saltybot_cmd_vel_mux", + executable="cmd_vel_mux_node", + name="cmd_vel_mux", + output="screen", + parameters=[LaunchConfiguration("config_file")], + ), + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/package.xml b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/package.xml new file mode 100644 index 0000000..4485133 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/package.xml @@ -0,0 +1,28 @@ + + + + saltybot_cmd_vel_mux + 0.1.0 + + cmd_vel multiplexer for SaltyBot: priority-based routing with 0.5s timeout. + Routes cmd_vel from teleop (priority 1), nav2 (priority 2), or docking (priority 3) + based on source activity and priority. Publishes mux status at 50Hz. + + sl-controls + MIT + + rclpy + geometry_msgs + std_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/resource/saltybot_cmd_vel_mux b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/resource/saltybot_cmd_vel_mux new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/saltybot_cmd_vel_mux/__init__.py b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/saltybot_cmd_vel_mux/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/saltybot_cmd_vel_mux/cmd_vel_mux_node.py b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/saltybot_cmd_vel_mux/cmd_vel_mux_node.py new file mode 100644 index 0000000..21436e7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/saltybot_cmd_vel_mux/cmd_vel_mux_node.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +"""cmd_vel multiplexer for SaltyBot. + +Priority-based routing with 0.5s timeout per source. +Teleop (1) > Nav2 (2) > Docking (3) + +Published topics: + /cmd_vel (geometry_msgs/Twist) - Multiplexed velocity command + /saltybot/cmd_vel_mux_status (std_msgs/String) - JSON mux status + +Subscribed topics: + /cmd_vel_teleop (geometry_msgs/Twist) - Teleop command (priority 1) + /cmd_vel_nav2 (geometry_msgs/Twist) - Nav2 command (priority 2) + /cmd_vel_docking (geometry_msgs/Twist) - Docking command (priority 3) +""" + +import json +from enum import Enum +from typing import Optional + +import rclpy +from rclpy.node import Node +from rclpy.timer import Timer +from geometry_msgs.msg import Twist +from std_msgs.msg import String + + +class CmdSource(Enum): + """Command velocity sources with priority.""" + TELEOP = 1 + NAV2 = 2 + DOCKING = 3 + NONE = 0 + + +class CmdVelMuxNode(Node): + """ROS2 node for priority-based cmd_vel multiplexing.""" + + def __init__(self): + super().__init__("cmd_vel_mux") + + # Parameters + self.declare_parameter("frequency", 50) # Hz + self.declare_parameter("source_timeout", 0.5) # seconds + + frequency = self.get_parameter("frequency").value + self.source_timeout = self.get_parameter("source_timeout").value + + # Last update times for each source + self.last_update = { + CmdSource.TELEOP: None, + CmdSource.NAV2: None, + CmdSource.DOCKING: None, + } + + # Last received commands + self.last_cmd = { + CmdSource.TELEOP: None, + CmdSource.NAV2: None, + CmdSource.DOCKING: None, + } + + # Current active source + self.active_source = CmdSource.NONE + + # Subscriptions + self.create_subscription( + Twist, "/cmd_vel_teleop", self._on_cmd_vel_teleop, 10 + ) + self.create_subscription( + Twist, "/cmd_vel_nav2", self._on_cmd_vel_nav2, 10 + ) + self.create_subscription( + Twist, "/cmd_vel_docking", self._on_cmd_vel_docking, 10 + ) + + # Publications + self.pub_mux = self.create_publisher(Twist, "/cmd_vel", 10) + self.pub_status = self.create_publisher(String, "/saltybot/cmd_vel_mux_status", 10) + + # Timer for multiplexing at 50Hz + period = 1.0 / frequency + self.timer: Timer = self.create_timer(period, self._timer_callback) + + self.get_logger().info( + f"cmd_vel mux initialized at {frequency}Hz. " + f"Source timeout: {self.source_timeout}s. " + f"Priority: Teleop(1) > Nav2(2) > Docking(3)" + ) + + def _on_cmd_vel_teleop(self, msg: Twist) -> None: + """Update teleop command.""" + self.last_update[CmdSource.TELEOP] = self.get_clock().now() + self.last_cmd[CmdSource.TELEOP] = msg + + def _on_cmd_vel_nav2(self, msg: Twist) -> None: + """Update Nav2 command.""" + self.last_update[CmdSource.NAV2] = self.get_clock().now() + self.last_cmd[CmdSource.NAV2] = msg + + def _on_cmd_vel_docking(self, msg: Twist) -> None: + """Update docking command.""" + self.last_update[CmdSource.DOCKING] = self.get_clock().now() + self.last_cmd[CmdSource.DOCKING] = msg + + def _timer_callback(self) -> None: + """Multiplex cmd_vel based on priority and timeout at 50Hz.""" + now = self.get_clock().now() + + # Check which sources are active (within timeout) + active_sources = [] + + for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]: + last_time = self.last_update[source] + + if last_time is not None: + elapsed = (now - last_time).nanoseconds / 1e9 + + if elapsed <= self.source_timeout: + active_sources.append(source) + + # Select highest priority active source + if active_sources: + # Sources are already in priority order + self.active_source = active_sources[0] + else: + self.active_source = CmdSource.NONE + + # Publish multiplexed command + if self.active_source != CmdSource.NONE: + cmd = self.last_cmd[self.active_source] + self.pub_mux.publish(cmd) + else: + # No active source: publish zero command + self.pub_mux.publish(Twist()) + + # Publish mux status + self._publish_status() + + def _publish_status(self) -> None: + """Publish multiplexer status as JSON.""" + now = self.get_clock().now() + + # Build source status + sources_status = {} + for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]: + last_time = self.last_update[source] + + if last_time is not None: + elapsed = (now - last_time).nanoseconds / 1e9 + is_active = elapsed <= self.source_timeout + else: + elapsed = None + is_active = False + + sources_status[source.name] = { + "priority": source.value, + "active": is_active, + "elapsed_s": elapsed, + } + + status = { + "timestamp": now.nanoseconds / 1e9, + "active_source": self.active_source.name, + "active_priority": self.active_source.value, + "sources": sources_status, + } + + msg = String(data=json.dumps(status)) + self.pub_status.publish(msg) + + +def main(args=None): + rclpy.init(args=args) + node = CmdVelMuxNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.cfg b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.cfg new file mode 100644 index 0000000..0b59ff6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_cmd_vel_mux +[egg_info] +tag_date = 0 diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.py b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.py new file mode 100644 index 0000000..50e00d8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.py @@ -0,0 +1,29 @@ +from setuptools import setup + +package_name = "saltybot_cmd_vel_mux" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/cmd_vel_mux.launch.py"]), + (f"share/{package_name}/config", ["config/mux_config.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description=( + "cmd_vel multiplexer: priority-based routing with 0.5s timeout" + ), + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "cmd_vel_mux_node = saltybot_cmd_vel_mux.cmd_vel_mux_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/__init__.py b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/test_cmd_vel_mux.py b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/test_cmd_vel_mux.py new file mode 100644 index 0000000..048de63 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/test_cmd_vel_mux.py @@ -0,0 +1,373 @@ +"""Unit tests for cmd_vel_mux_node.""" + +import pytest +import json +from geometry_msgs.msg import Twist +from std_msgs.msg import String + +import rclpy +from rclpy.time import Time + +# Import the node and classes under test +from saltybot_cmd_vel_mux.cmd_vel_mux_node import ( + CmdVelMuxNode, + CmdSource, +) + + +@pytest.fixture +def rclpy_fixture(): + """Initialize and cleanup rclpy.""" + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def node(rclpy_fixture): + """Create a cmd_vel mux node instance.""" + node = CmdVelMuxNode() + yield node + node.destroy_node() + + +class TestCmdSource: + """Test suite for CmdSource enum.""" + + def test_cmd_source_values(self): + """Test CmdSource enum values.""" + assert CmdSource.TELEOP.value == 1 + assert CmdSource.NAV2.value == 2 + assert CmdSource.DOCKING.value == 3 + assert CmdSource.NONE.value == 0 + + def test_cmd_source_names(self): + """Test CmdSource enum names.""" + assert CmdSource.TELEOP.name == "TELEOP" + assert CmdSource.NAV2.name == "NAV2" + assert CmdSource.DOCKING.name == "DOCKING" + assert CmdSource.NONE.name == "NONE" + + +class TestCmdVelMuxNode: + """Test suite for CmdVelMuxNode.""" + + def test_node_initialization(self, node): + """Test that node initializes with correct defaults.""" + assert node.source_timeout == 0.5 + assert node.active_source == CmdSource.NONE + + for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]: + assert node.last_update[source] is None + assert node.last_cmd[source] is None + + def test_teleop_subscription(self, node): + """Test teleop command subscription.""" + cmd = Twist() + cmd.linear.x = 1.0 + cmd.angular.z = 0.5 + + node._on_cmd_vel_teleop(cmd) + + assert node.last_update[CmdSource.TELEOP] is not None + assert node.last_cmd[CmdSource.TELEOP] is not None + assert node.last_cmd[CmdSource.TELEOP].linear.x == 1.0 + + def test_nav2_subscription(self, node): + """Test Nav2 command subscription.""" + cmd = Twist() + cmd.linear.x = 0.5 + cmd.angular.z = 0.2 + + node._on_cmd_vel_nav2(cmd) + + assert node.last_update[CmdSource.NAV2] is not None + assert node.last_cmd[CmdSource.NAV2].linear.x == 0.5 + + def test_docking_subscription(self, node): + """Test docking command subscription.""" + cmd = Twist() + cmd.linear.x = 0.2 + + node._on_cmd_vel_docking(cmd) + + assert node.last_update[CmdSource.DOCKING] is not None + assert node.last_cmd[CmdSource.DOCKING].linear.x == 0.2 + + def test_no_active_source(self, node): + """Test with no active sources.""" + node._timer_callback() + + assert node.active_source == CmdSource.NONE + + def test_teleop_only(self, node): + """Test teleop as only active source.""" + teleop_cmd = Twist() + teleop_cmd.linear.x = 1.5 + + node._on_cmd_vel_teleop(teleop_cmd) + node._timer_callback() + + assert node.active_source == CmdSource.TELEOP + + def test_priority_teleop_over_nav2(self, node): + """Test teleop has priority over Nav2.""" + teleop_cmd = Twist() + teleop_cmd.linear.x = 1.0 + + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + + node._on_cmd_vel_teleop(teleop_cmd) + node._on_cmd_vel_nav2(nav2_cmd) + node._timer_callback() + + assert node.active_source == CmdSource.TELEOP + + def test_priority_nav2_over_docking(self, node): + """Test Nav2 has priority over docking.""" + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + + docking_cmd = Twist() + docking_cmd.linear.x = 0.2 + + node._on_cmd_vel_nav2(nav2_cmd) + node._on_cmd_vel_docking(docking_cmd) + node._timer_callback() + + assert node.active_source == CmdSource.NAV2 + + def test_fallback_to_lower_priority(self, node): + """Test fallback to lower priority when higher priority times out.""" + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + + docking_cmd = Twist() + docking_cmd.linear.x = 0.2 + + # Update both sources + node._on_cmd_vel_nav2(nav2_cmd) + node._on_cmd_vel_docking(docking_cmd) + + # Both should be active + node._timer_callback() + assert node.active_source == CmdSource.NAV2 + + # Simulate Nav2 timeout (>0.5s) + now = node.get_clock().now() + old_time = Time(nanoseconds=now.nanoseconds - int(0.6 * 1e9)) + node.last_update[CmdSource.NAV2] = old_time + + # Should fall back to docking + node._timer_callback() + assert node.active_source == CmdSource.DOCKING + + def test_source_timeout_detection(self, node): + """Test that sources timeout after 0.5s.""" + cmd = Twist() + cmd.linear.x = 1.0 + + # Just updated teleop + node._on_cmd_vel_teleop(cmd) + + # Immediately, it should be active + node._timer_callback() + assert node.active_source == CmdSource.TELEOP + + # Simulate time passage: >0.5s + now = node.get_clock().now() + old_time = Time(nanoseconds=now.nanoseconds - int(0.6 * 1e9)) + node.last_update[CmdSource.TELEOP] = old_time + + # Should timeout and become inactive + node._timer_callback() + assert node.active_source == CmdSource.NONE + + def test_all_three_sources_active(self, node): + """Test with all three sources active - teleop should win.""" + teleop_cmd = Twist() + teleop_cmd.linear.x = 1.0 + + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + + docking_cmd = Twist() + docking_cmd.linear.x = 0.2 + + node._on_cmd_vel_teleop(teleop_cmd) + node._on_cmd_vel_nav2(nav2_cmd) + node._on_cmd_vel_docking(docking_cmd) + + node._timer_callback() + + assert node.active_source == CmdSource.TELEOP + + def test_mux_status_json_format(self, node): + """Test multiplexer status JSON format.""" + cmd = Twist() + cmd.linear.x = 1.0 + + node._on_cmd_vel_teleop(cmd) + node._timer_callback() + + now = node.get_clock().now() + + # Manually create status as node would + sources_status = {} + for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]: + last_time = node.last_update[source] + + if last_time is not None: + elapsed = (now - last_time).nanoseconds / 1e9 + is_active = elapsed <= node.source_timeout + else: + elapsed = None + is_active = False + + sources_status[source.name] = { + "priority": source.value, + "active": is_active, + "elapsed_s": elapsed, + } + + status = { + "active_source": node.active_source.name, + "active_priority": node.active_source.value, + "sources": sources_status, + } + + json_str = json.dumps(status) + parsed = json.loads(json_str) + + assert parsed["active_source"] == "TELEOP" + assert parsed["active_priority"] == 1 + assert "TELEOP" in parsed["sources"] + assert parsed["sources"]["TELEOP"]["priority"] == 1 + + +class TestCmdVelMuxScenarios: + """Integration-style tests for realistic scenarios.""" + + def test_scenario_teleop_control(self, node): + """Scenario: user teleop control only.""" + cmd = Twist() + cmd.linear.x = 1.5 + cmd.angular.z = 0.3 + + node._on_cmd_vel_teleop(cmd) + node._timer_callback() + + assert node.active_source == CmdSource.TELEOP + + def test_scenario_nav2_autonomous(self, node): + """Scenario: Nav2 autonomous navigation.""" + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + nav2_cmd.angular.z = 0.1 + + node._on_cmd_vel_nav2(nav2_cmd) + node._timer_callback() + + assert node.active_source == CmdSource.NAV2 + + def test_scenario_docking_sequence(self, node): + """Scenario: docking operation with no higher priority.""" + docking_cmd = Twist() + docking_cmd.linear.x = 0.1 + + node._on_cmd_vel_docking(docking_cmd) + node._timer_callback() + + assert node.active_source == CmdSource.DOCKING + + def test_scenario_teleop_preempts_nav2(self, node): + """Scenario: user teleop preempts Nav2 navigation.""" + # Nav2 is running + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + + node._on_cmd_vel_nav2(nav2_cmd) + node._timer_callback() + assert node.active_source == CmdSource.NAV2 + + # User takes teleop control + teleop_cmd = Twist() + teleop_cmd.linear.x = 1.0 + + node._on_cmd_vel_teleop(teleop_cmd) + node._timer_callback() + assert node.active_source == CmdSource.TELEOP + + def test_scenario_teleop_release_back_to_nav2(self, node): + """Scenario: user releases teleop, Nav2 resumes.""" + # Both sources active + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + teleop_cmd = Twist() + teleop_cmd.linear.x = 1.0 + + node._on_cmd_vel_nav2(nav2_cmd) + node._on_cmd_vel_teleop(teleop_cmd) + node._timer_callback() + + # Teleop is active + assert node.active_source == CmdSource.TELEOP + + # Simulate teleop timeout + now = node.get_clock().now() + old_time = Time(nanoseconds=now.nanoseconds - int(0.6 * 1e9)) + node.last_update[CmdSource.TELEOP] = old_time + + node._timer_callback() + + # Nav2 should take over + assert node.active_source == CmdSource.NAV2 + + def test_scenario_timeout_all_sources(self, node): + """Scenario: all sources timeout, publish zero command.""" + # Update all sources + cmd = Twist() + cmd.linear.x = 1.0 + + node._on_cmd_vel_teleop(cmd) + node._on_cmd_vel_nav2(cmd) + node._on_cmd_vel_docking(cmd) + + # Simulate timeout for all + now = node.get_clock().now() + old_time = Time(nanoseconds=now.nanoseconds - int(1.0 * 1e9)) + + node.last_update[CmdSource.TELEOP] = old_time + node.last_update[CmdSource.NAV2] = old_time + node.last_update[CmdSource.DOCKING] = old_time + + node._timer_callback() + + assert node.active_source == CmdSource.NONE + + def test_scenario_docking_to_nav2_to_teleop(self, node): + """Scenario: priority escalation through modes.""" + # Start with docking + docking_cmd = Twist() + docking_cmd.linear.x = 0.1 + + node._on_cmd_vel_docking(docking_cmd) + node._timer_callback() + assert node.active_source == CmdSource.DOCKING + + # Nav2 starts + nav2_cmd = Twist() + nav2_cmd.linear.x = 0.5 + + node._on_cmd_vel_nav2(nav2_cmd) + node._timer_callback() + assert node.active_source == CmdSource.NAV2 + + # User takes control + teleop_cmd = Twist() + teleop_cmd.linear.x = 1.0 + + node._on_cmd_vel_teleop(teleop_cmd) + node._timer_callback() + assert node.active_source == CmdSource.TELEOP -- 2.47.2