feat(controls): Priority-based cmd_vel multiplexer (Issue #228)
Implement priority-based velocity command routing with timeout-based fallback: - Teleop (priority 1) > Nav2 (priority 2) > Docking (priority 3) - 0.5s timeout per source: inactive sources are skipped in priority selection - When no source is active, publish zero command for safety Features: - 50Hz multiplexing frequency with configurable parameters - JSON status publishing (/saltybot/cmd_vel_mux_status) for telemetry - Automatic priority escalation: teleop preempts nav2, nav2 preempts docking - Fallback chain: if teleop times out, nav2 takes over; if nav2 times out, docking active - Zero command safety: all sources timeout = immediate motor stop Test Coverage: - 20+ unit tests covering enum, initialization, subscriptions - Priority selection logic (teleop preemption, nav2 preemption) - Timeout detection and source fallback - Realistic scenario tests (teleop release, priority escalation chains) - JSON status format validation Config: frequency=50Hz, source_timeout=0.5s Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
3a7cd3649a
commit
173b538f44
@ -0,0 +1,10 @@
|
|||||||
|
# cmd_vel multiplexer configuration
|
||||||
|
|
||||||
|
cmd_vel_mux:
|
||||||
|
ros__parameters:
|
||||||
|
# Multiplexing frequency (Hz)
|
||||||
|
frequency: 50 # 50 Hz = 20ms cycle
|
||||||
|
|
||||||
|
# Source timeout (seconds)
|
||||||
|
# If no message from a source for this duration, it becomes inactive
|
||||||
|
source_timeout: 0.5
|
||||||
@ -0,0 +1,36 @@
|
|||||||
|
"""Launch file for cmd_vel_mux_node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description for cmd_vel multiplexer."""
|
||||||
|
# Package directory
|
||||||
|
pkg_dir = get_package_share_directory("saltybot_cmd_vel_mux")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
config_file = os.path.join(pkg_dir, "config", "mux_config.yaml")
|
||||||
|
|
||||||
|
# Declare launch arguments
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"config_file",
|
||||||
|
default_value=config_file,
|
||||||
|
description="Path to configuration YAML file",
|
||||||
|
),
|
||||||
|
# cmd_vel multiplexer node
|
||||||
|
Node(
|
||||||
|
package="saltybot_cmd_vel_mux",
|
||||||
|
executable="cmd_vel_mux_node",
|
||||||
|
name="cmd_vel_mux",
|
||||||
|
output="screen",
|
||||||
|
parameters=[LaunchConfiguration("config_file")],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
28
jetson/ros2_ws/src/saltybot_cmd_vel_mux/package.xml
Normal file
28
jetson/ros2_ws/src/saltybot_cmd_vel_mux/package.xml
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_cmd_vel_mux</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
cmd_vel multiplexer for SaltyBot: priority-based routing with 0.5s timeout.
|
||||||
|
Routes cmd_vel from teleop (priority 1), nav2 (priority 2), or docking (priority 3)
|
||||||
|
based on source activity and priority. Publishes mux status at 50Hz.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,186 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""cmd_vel multiplexer for SaltyBot.
|
||||||
|
|
||||||
|
Priority-based routing with 0.5s timeout per source.
|
||||||
|
Teleop (1) > Nav2 (2) > Docking (3)
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/cmd_vel (geometry_msgs/Twist) - Multiplexed velocity command
|
||||||
|
/saltybot/cmd_vel_mux_status (std_msgs/String) - JSON mux status
|
||||||
|
|
||||||
|
Subscribed topics:
|
||||||
|
/cmd_vel_teleop (geometry_msgs/Twist) - Teleop command (priority 1)
|
||||||
|
/cmd_vel_nav2 (geometry_msgs/Twist) - Nav2 command (priority 2)
|
||||||
|
/cmd_vel_docking (geometry_msgs/Twist) - Docking command (priority 3)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.timer import Timer
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
|
||||||
|
class CmdSource(Enum):
|
||||||
|
"""Command velocity sources with priority."""
|
||||||
|
TELEOP = 1
|
||||||
|
NAV2 = 2
|
||||||
|
DOCKING = 3
|
||||||
|
NONE = 0
|
||||||
|
|
||||||
|
|
||||||
|
class CmdVelMuxNode(Node):
|
||||||
|
"""ROS2 node for priority-based cmd_vel multiplexing."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("cmd_vel_mux")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter("frequency", 50) # Hz
|
||||||
|
self.declare_parameter("source_timeout", 0.5) # seconds
|
||||||
|
|
||||||
|
frequency = self.get_parameter("frequency").value
|
||||||
|
self.source_timeout = self.get_parameter("source_timeout").value
|
||||||
|
|
||||||
|
# Last update times for each source
|
||||||
|
self.last_update = {
|
||||||
|
CmdSource.TELEOP: None,
|
||||||
|
CmdSource.NAV2: None,
|
||||||
|
CmdSource.DOCKING: None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Last received commands
|
||||||
|
self.last_cmd = {
|
||||||
|
CmdSource.TELEOP: None,
|
||||||
|
CmdSource.NAV2: None,
|
||||||
|
CmdSource.DOCKING: None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Current active source
|
||||||
|
self.active_source = CmdSource.NONE
|
||||||
|
|
||||||
|
# Subscriptions
|
||||||
|
self.create_subscription(
|
||||||
|
Twist, "/cmd_vel_teleop", self._on_cmd_vel_teleop, 10
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
Twist, "/cmd_vel_nav2", self._on_cmd_vel_nav2, 10
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
Twist, "/cmd_vel_docking", self._on_cmd_vel_docking, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publications
|
||||||
|
self.pub_mux = self.create_publisher(Twist, "/cmd_vel", 10)
|
||||||
|
self.pub_status = self.create_publisher(String, "/saltybot/cmd_vel_mux_status", 10)
|
||||||
|
|
||||||
|
# Timer for multiplexing at 50Hz
|
||||||
|
period = 1.0 / frequency
|
||||||
|
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"cmd_vel mux initialized at {frequency}Hz. "
|
||||||
|
f"Source timeout: {self.source_timeout}s. "
|
||||||
|
f"Priority: Teleop(1) > Nav2(2) > Docking(3)"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_cmd_vel_teleop(self, msg: Twist) -> None:
|
||||||
|
"""Update teleop command."""
|
||||||
|
self.last_update[CmdSource.TELEOP] = self.get_clock().now()
|
||||||
|
self.last_cmd[CmdSource.TELEOP] = msg
|
||||||
|
|
||||||
|
def _on_cmd_vel_nav2(self, msg: Twist) -> None:
|
||||||
|
"""Update Nav2 command."""
|
||||||
|
self.last_update[CmdSource.NAV2] = self.get_clock().now()
|
||||||
|
self.last_cmd[CmdSource.NAV2] = msg
|
||||||
|
|
||||||
|
def _on_cmd_vel_docking(self, msg: Twist) -> None:
|
||||||
|
"""Update docking command."""
|
||||||
|
self.last_update[CmdSource.DOCKING] = self.get_clock().now()
|
||||||
|
self.last_cmd[CmdSource.DOCKING] = msg
|
||||||
|
|
||||||
|
def _timer_callback(self) -> None:
|
||||||
|
"""Multiplex cmd_vel based on priority and timeout at 50Hz."""
|
||||||
|
now = self.get_clock().now()
|
||||||
|
|
||||||
|
# Check which sources are active (within timeout)
|
||||||
|
active_sources = []
|
||||||
|
|
||||||
|
for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]:
|
||||||
|
last_time = self.last_update[source]
|
||||||
|
|
||||||
|
if last_time is not None:
|
||||||
|
elapsed = (now - last_time).nanoseconds / 1e9
|
||||||
|
|
||||||
|
if elapsed <= self.source_timeout:
|
||||||
|
active_sources.append(source)
|
||||||
|
|
||||||
|
# Select highest priority active source
|
||||||
|
if active_sources:
|
||||||
|
# Sources are already in priority order
|
||||||
|
self.active_source = active_sources[0]
|
||||||
|
else:
|
||||||
|
self.active_source = CmdSource.NONE
|
||||||
|
|
||||||
|
# Publish multiplexed command
|
||||||
|
if self.active_source != CmdSource.NONE:
|
||||||
|
cmd = self.last_cmd[self.active_source]
|
||||||
|
self.pub_mux.publish(cmd)
|
||||||
|
else:
|
||||||
|
# No active source: publish zero command
|
||||||
|
self.pub_mux.publish(Twist())
|
||||||
|
|
||||||
|
# Publish mux status
|
||||||
|
self._publish_status()
|
||||||
|
|
||||||
|
def _publish_status(self) -> None:
|
||||||
|
"""Publish multiplexer status as JSON."""
|
||||||
|
now = self.get_clock().now()
|
||||||
|
|
||||||
|
# Build source status
|
||||||
|
sources_status = {}
|
||||||
|
for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]:
|
||||||
|
last_time = self.last_update[source]
|
||||||
|
|
||||||
|
if last_time is not None:
|
||||||
|
elapsed = (now - last_time).nanoseconds / 1e9
|
||||||
|
is_active = elapsed <= self.source_timeout
|
||||||
|
else:
|
||||||
|
elapsed = None
|
||||||
|
is_active = False
|
||||||
|
|
||||||
|
sources_status[source.name] = {
|
||||||
|
"priority": source.value,
|
||||||
|
"active": is_active,
|
||||||
|
"elapsed_s": elapsed,
|
||||||
|
}
|
||||||
|
|
||||||
|
status = {
|
||||||
|
"timestamp": now.nanoseconds / 1e9,
|
||||||
|
"active_source": self.active_source.name,
|
||||||
|
"active_priority": self.active_source.value,
|
||||||
|
"sources": sources_status,
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = String(data=json.dumps(status))
|
||||||
|
self.pub_status.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = CmdVelMuxNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_cmd_vel_mux
|
||||||
|
[egg_info]
|
||||||
|
tag_date = 0
|
||||||
29
jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_cmd_vel_mux/setup.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_cmd_vel_mux"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/cmd_vel_mux.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/mux_config.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description=(
|
||||||
|
"cmd_vel multiplexer: priority-based routing with 0.5s timeout"
|
||||||
|
),
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"cmd_vel_mux_node = saltybot_cmd_vel_mux.cmd_vel_mux_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
373
jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/test_cmd_vel_mux.py
Normal file
373
jetson/ros2_ws/src/saltybot_cmd_vel_mux/test/test_cmd_vel_mux.py
Normal file
@ -0,0 +1,373 @@
|
|||||||
|
"""Unit tests for cmd_vel_mux_node."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import json
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.time import Time
|
||||||
|
|
||||||
|
# Import the node and classes under test
|
||||||
|
from saltybot_cmd_vel_mux.cmd_vel_mux_node import (
|
||||||
|
CmdVelMuxNode,
|
||||||
|
CmdSource,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rclpy_fixture():
|
||||||
|
"""Initialize and cleanup rclpy."""
|
||||||
|
rclpy.init()
|
||||||
|
yield
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def node(rclpy_fixture):
|
||||||
|
"""Create a cmd_vel mux node instance."""
|
||||||
|
node = CmdVelMuxNode()
|
||||||
|
yield node
|
||||||
|
node.destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
class TestCmdSource:
|
||||||
|
"""Test suite for CmdSource enum."""
|
||||||
|
|
||||||
|
def test_cmd_source_values(self):
|
||||||
|
"""Test CmdSource enum values."""
|
||||||
|
assert CmdSource.TELEOP.value == 1
|
||||||
|
assert CmdSource.NAV2.value == 2
|
||||||
|
assert CmdSource.DOCKING.value == 3
|
||||||
|
assert CmdSource.NONE.value == 0
|
||||||
|
|
||||||
|
def test_cmd_source_names(self):
|
||||||
|
"""Test CmdSource enum names."""
|
||||||
|
assert CmdSource.TELEOP.name == "TELEOP"
|
||||||
|
assert CmdSource.NAV2.name == "NAV2"
|
||||||
|
assert CmdSource.DOCKING.name == "DOCKING"
|
||||||
|
assert CmdSource.NONE.name == "NONE"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCmdVelMuxNode:
|
||||||
|
"""Test suite for CmdVelMuxNode."""
|
||||||
|
|
||||||
|
def test_node_initialization(self, node):
|
||||||
|
"""Test that node initializes with correct defaults."""
|
||||||
|
assert node.source_timeout == 0.5
|
||||||
|
assert node.active_source == CmdSource.NONE
|
||||||
|
|
||||||
|
for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]:
|
||||||
|
assert node.last_update[source] is None
|
||||||
|
assert node.last_cmd[source] is None
|
||||||
|
|
||||||
|
def test_teleop_subscription(self, node):
|
||||||
|
"""Test teleop command subscription."""
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 1.0
|
||||||
|
cmd.angular.z = 0.5
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(cmd)
|
||||||
|
|
||||||
|
assert node.last_update[CmdSource.TELEOP] is not None
|
||||||
|
assert node.last_cmd[CmdSource.TELEOP] is not None
|
||||||
|
assert node.last_cmd[CmdSource.TELEOP].linear.x == 1.0
|
||||||
|
|
||||||
|
def test_nav2_subscription(self, node):
|
||||||
|
"""Test Nav2 command subscription."""
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 0.5
|
||||||
|
cmd.angular.z = 0.2
|
||||||
|
|
||||||
|
node._on_cmd_vel_nav2(cmd)
|
||||||
|
|
||||||
|
assert node.last_update[CmdSource.NAV2] is not None
|
||||||
|
assert node.last_cmd[CmdSource.NAV2].linear.x == 0.5
|
||||||
|
|
||||||
|
def test_docking_subscription(self, node):
|
||||||
|
"""Test docking command subscription."""
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 0.2
|
||||||
|
|
||||||
|
node._on_cmd_vel_docking(cmd)
|
||||||
|
|
||||||
|
assert node.last_update[CmdSource.DOCKING] is not None
|
||||||
|
assert node.last_cmd[CmdSource.DOCKING].linear.x == 0.2
|
||||||
|
|
||||||
|
def test_no_active_source(self, node):
|
||||||
|
"""Test with no active sources."""
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.NONE
|
||||||
|
|
||||||
|
def test_teleop_only(self, node):
|
||||||
|
"""Test teleop as only active source."""
|
||||||
|
teleop_cmd = Twist()
|
||||||
|
teleop_cmd.linear.x = 1.5
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(teleop_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
|
|
||||||
|
def test_priority_teleop_over_nav2(self, node):
|
||||||
|
"""Test teleop has priority over Nav2."""
|
||||||
|
teleop_cmd = Twist()
|
||||||
|
teleop_cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(teleop_cmd)
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
|
|
||||||
|
def test_priority_nav2_over_docking(self, node):
|
||||||
|
"""Test Nav2 has priority over docking."""
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
|
||||||
|
docking_cmd = Twist()
|
||||||
|
docking_cmd.linear.x = 0.2
|
||||||
|
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._on_cmd_vel_docking(docking_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.NAV2
|
||||||
|
|
||||||
|
def test_fallback_to_lower_priority(self, node):
|
||||||
|
"""Test fallback to lower priority when higher priority times out."""
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
|
||||||
|
docking_cmd = Twist()
|
||||||
|
docking_cmd.linear.x = 0.2
|
||||||
|
|
||||||
|
# Update both sources
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._on_cmd_vel_docking(docking_cmd)
|
||||||
|
|
||||||
|
# Both should be active
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.NAV2
|
||||||
|
|
||||||
|
# Simulate Nav2 timeout (>0.5s)
|
||||||
|
now = node.get_clock().now()
|
||||||
|
old_time = Time(nanoseconds=now.nanoseconds - int(0.6 * 1e9))
|
||||||
|
node.last_update[CmdSource.NAV2] = old_time
|
||||||
|
|
||||||
|
# Should fall back to docking
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.DOCKING
|
||||||
|
|
||||||
|
def test_source_timeout_detection(self, node):
|
||||||
|
"""Test that sources timeout after 0.5s."""
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
# Just updated teleop
|
||||||
|
node._on_cmd_vel_teleop(cmd)
|
||||||
|
|
||||||
|
# Immediately, it should be active
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
|
|
||||||
|
# Simulate time passage: >0.5s
|
||||||
|
now = node.get_clock().now()
|
||||||
|
old_time = Time(nanoseconds=now.nanoseconds - int(0.6 * 1e9))
|
||||||
|
node.last_update[CmdSource.TELEOP] = old_time
|
||||||
|
|
||||||
|
# Should timeout and become inactive
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.NONE
|
||||||
|
|
||||||
|
def test_all_three_sources_active(self, node):
|
||||||
|
"""Test with all three sources active - teleop should win."""
|
||||||
|
teleop_cmd = Twist()
|
||||||
|
teleop_cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
|
||||||
|
docking_cmd = Twist()
|
||||||
|
docking_cmd.linear.x = 0.2
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(teleop_cmd)
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._on_cmd_vel_docking(docking_cmd)
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
|
|
||||||
|
def test_mux_status_json_format(self, node):
|
||||||
|
"""Test multiplexer status JSON format."""
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
now = node.get_clock().now()
|
||||||
|
|
||||||
|
# Manually create status as node would
|
||||||
|
sources_status = {}
|
||||||
|
for source in [CmdSource.TELEOP, CmdSource.NAV2, CmdSource.DOCKING]:
|
||||||
|
last_time = node.last_update[source]
|
||||||
|
|
||||||
|
if last_time is not None:
|
||||||
|
elapsed = (now - last_time).nanoseconds / 1e9
|
||||||
|
is_active = elapsed <= node.source_timeout
|
||||||
|
else:
|
||||||
|
elapsed = None
|
||||||
|
is_active = False
|
||||||
|
|
||||||
|
sources_status[source.name] = {
|
||||||
|
"priority": source.value,
|
||||||
|
"active": is_active,
|
||||||
|
"elapsed_s": elapsed,
|
||||||
|
}
|
||||||
|
|
||||||
|
status = {
|
||||||
|
"active_source": node.active_source.name,
|
||||||
|
"active_priority": node.active_source.value,
|
||||||
|
"sources": sources_status,
|
||||||
|
}
|
||||||
|
|
||||||
|
json_str = json.dumps(status)
|
||||||
|
parsed = json.loads(json_str)
|
||||||
|
|
||||||
|
assert parsed["active_source"] == "TELEOP"
|
||||||
|
assert parsed["active_priority"] == 1
|
||||||
|
assert "TELEOP" in parsed["sources"]
|
||||||
|
assert parsed["sources"]["TELEOP"]["priority"] == 1
|
||||||
|
|
||||||
|
|
||||||
|
class TestCmdVelMuxScenarios:
|
||||||
|
"""Integration-style tests for realistic scenarios."""
|
||||||
|
|
||||||
|
def test_scenario_teleop_control(self, node):
|
||||||
|
"""Scenario: user teleop control only."""
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 1.5
|
||||||
|
cmd.angular.z = 0.3
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
|
|
||||||
|
def test_scenario_nav2_autonomous(self, node):
|
||||||
|
"""Scenario: Nav2 autonomous navigation."""
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
nav2_cmd.angular.z = 0.1
|
||||||
|
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.NAV2
|
||||||
|
|
||||||
|
def test_scenario_docking_sequence(self, node):
|
||||||
|
"""Scenario: docking operation with no higher priority."""
|
||||||
|
docking_cmd = Twist()
|
||||||
|
docking_cmd.linear.x = 0.1
|
||||||
|
|
||||||
|
node._on_cmd_vel_docking(docking_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.DOCKING
|
||||||
|
|
||||||
|
def test_scenario_teleop_preempts_nav2(self, node):
|
||||||
|
"""Scenario: user teleop preempts Nav2 navigation."""
|
||||||
|
# Nav2 is running
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.NAV2
|
||||||
|
|
||||||
|
# User takes teleop control
|
||||||
|
teleop_cmd = Twist()
|
||||||
|
teleop_cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(teleop_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
|
|
||||||
|
def test_scenario_teleop_release_back_to_nav2(self, node):
|
||||||
|
"""Scenario: user releases teleop, Nav2 resumes."""
|
||||||
|
# Both sources active
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
teleop_cmd = Twist()
|
||||||
|
teleop_cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._on_cmd_vel_teleop(teleop_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Teleop is active
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
|
|
||||||
|
# Simulate teleop timeout
|
||||||
|
now = node.get_clock().now()
|
||||||
|
old_time = Time(nanoseconds=now.nanoseconds - int(0.6 * 1e9))
|
||||||
|
node.last_update[CmdSource.TELEOP] = old_time
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Nav2 should take over
|
||||||
|
assert node.active_source == CmdSource.NAV2
|
||||||
|
|
||||||
|
def test_scenario_timeout_all_sources(self, node):
|
||||||
|
"""Scenario: all sources timeout, publish zero command."""
|
||||||
|
# Update all sources
|
||||||
|
cmd = Twist()
|
||||||
|
cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(cmd)
|
||||||
|
node._on_cmd_vel_nav2(cmd)
|
||||||
|
node._on_cmd_vel_docking(cmd)
|
||||||
|
|
||||||
|
# Simulate timeout for all
|
||||||
|
now = node.get_clock().now()
|
||||||
|
old_time = Time(nanoseconds=now.nanoseconds - int(1.0 * 1e9))
|
||||||
|
|
||||||
|
node.last_update[CmdSource.TELEOP] = old_time
|
||||||
|
node.last_update[CmdSource.NAV2] = old_time
|
||||||
|
node.last_update[CmdSource.DOCKING] = old_time
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.active_source == CmdSource.NONE
|
||||||
|
|
||||||
|
def test_scenario_docking_to_nav2_to_teleop(self, node):
|
||||||
|
"""Scenario: priority escalation through modes."""
|
||||||
|
# Start with docking
|
||||||
|
docking_cmd = Twist()
|
||||||
|
docking_cmd.linear.x = 0.1
|
||||||
|
|
||||||
|
node._on_cmd_vel_docking(docking_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.DOCKING
|
||||||
|
|
||||||
|
# Nav2 starts
|
||||||
|
nav2_cmd = Twist()
|
||||||
|
nav2_cmd.linear.x = 0.5
|
||||||
|
|
||||||
|
node._on_cmd_vel_nav2(nav2_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.NAV2
|
||||||
|
|
||||||
|
# User takes control
|
||||||
|
teleop_cmd = Twist()
|
||||||
|
teleop_cmd.linear.x = 1.0
|
||||||
|
|
||||||
|
node._on_cmd_vel_teleop(teleop_cmd)
|
||||||
|
node._timer_callback()
|
||||||
|
assert node.active_source == CmdSource.TELEOP
|
||||||
Loading…
x
Reference in New Issue
Block a user