feat(controls): speed limiter node (Issue #194) #200
@ -0,0 +1,6 @@
|
|||||||
|
# Speed limiter node configuration
|
||||||
|
|
||||||
|
speed_limiter:
|
||||||
|
ros__parameters:
|
||||||
|
# Publishing frequency in Hz
|
||||||
|
frequency: 50 # 50 Hz = 20ms cycle
|
||||||
@ -0,0 +1,36 @@
|
|||||||
|
"""Launch file for speed_limiter_node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description for speed limiter node."""
|
||||||
|
# Package directory
|
||||||
|
pkg_dir = get_package_share_directory("saltybot_speed_limiter")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
config_file = os.path.join(pkg_dir, "config", "limiter_params.yaml")
|
||||||
|
|
||||||
|
# Declare launch arguments
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"config_file",
|
||||||
|
default_value=config_file,
|
||||||
|
description="Path to configuration YAML file",
|
||||||
|
),
|
||||||
|
# Speed limiter node
|
||||||
|
Node(
|
||||||
|
package="saltybot_speed_limiter",
|
||||||
|
executable="speed_limiter_node",
|
||||||
|
name="speed_limiter",
|
||||||
|
output="screen",
|
||||||
|
parameters=[LaunchConfiguration("config_file")],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
29
jetson/ros2_ws/src/saltybot_speed_limiter/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_speed_limiter/package.xml
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_speed_limiter</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Speed limiter node for SaltyBot: enforces safety constraints from terrain type,
|
||||||
|
battery level, and emergency system. Computes minimum scale factor from multiple
|
||||||
|
sources and applies to cmd_vel with JSON telemetry. Runs at 50Hz.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>saltybot_emergency_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,155 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Speed limiter node for SaltyBot.
|
||||||
|
|
||||||
|
Enforces safety constraints from terrain type, battery level, and emergency system.
|
||||||
|
Computes minimum scale factor and applies to cmd_vel at 50Hz (20ms).
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/saltybot/cmd_vel_limited (geometry_msgs/Twist)
|
||||||
|
/saltybot/speed_limit_info (std_msgs/String) - JSON telemetry
|
||||||
|
|
||||||
|
Subscribed topics:
|
||||||
|
/cmd_vel (geometry_msgs/Twist)
|
||||||
|
/saltybot/terrain_speed_scale (std_msgs/Float32) - 0.0 to 1.0
|
||||||
|
/saltybot/speed_limit (std_msgs/Float32) - 0.0 to 1.0, from battery level
|
||||||
|
/saltybot/emergency (saltybot_emergency_msgs/EmergencyEvent)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.timer import Timer
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from std_msgs.msg import Float32, String
|
||||||
|
from saltybot_emergency_msgs.msg import EmergencyEvent
|
||||||
|
|
||||||
|
|
||||||
|
class SpeedLimiterNode(Node):
|
||||||
|
"""ROS2 node that enforces speed limits from multiple constraints."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("speed_limiter")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter("frequency", 50) # Hz
|
||||||
|
frequency = self.get_parameter("frequency").value
|
||||||
|
|
||||||
|
# Last received values (with defaults)
|
||||||
|
self.terrain_scale = 1.0
|
||||||
|
self.battery_limit = 1.0
|
||||||
|
self.emergency_scale = 1.0
|
||||||
|
self.last_cmd_vel = None
|
||||||
|
|
||||||
|
# Subscriptions
|
||||||
|
self.sub_cmd_vel = self.create_subscription(
|
||||||
|
Twist, "/cmd_vel", self._on_cmd_vel, 10
|
||||||
|
)
|
||||||
|
self.sub_terrain = self.create_subscription(
|
||||||
|
Float32, "/saltybot/terrain_speed_scale", self._on_terrain_scale, 10
|
||||||
|
)
|
||||||
|
self.sub_battery = self.create_subscription(
|
||||||
|
Float32, "/saltybot/speed_limit", self._on_battery_limit, 10
|
||||||
|
)
|
||||||
|
self.sub_emergency = self.create_subscription(
|
||||||
|
EmergencyEvent, "/saltybot/emergency", self._on_emergency, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publications
|
||||||
|
self.pub_cmd_vel_limited = self.create_publisher(
|
||||||
|
Twist, "/saltybot/cmd_vel_limited", 10
|
||||||
|
)
|
||||||
|
self.pub_info = self.create_publisher(
|
||||||
|
String, "/saltybot/speed_limit_info", 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Timer for periodic publishing at specified frequency
|
||||||
|
period = 1.0 / frequency
|
||||||
|
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Speed limiter initialized at {frequency}Hz. "
|
||||||
|
"Awaiting terrain_speed_scale, speed_limit, and emergency signals."
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_cmd_vel(self, msg: Twist) -> None:
|
||||||
|
"""Cache the last received cmd_vel."""
|
||||||
|
self.last_cmd_vel = msg
|
||||||
|
|
||||||
|
def _on_terrain_scale(self, msg: Float32) -> None:
|
||||||
|
"""Update terrain speed scale (0.0 to 1.0)."""
|
||||||
|
self.terrain_scale = max(0.0, min(1.0, msg.data))
|
||||||
|
|
||||||
|
def _on_battery_limit(self, msg: Float32) -> None:
|
||||||
|
"""Update battery-driven speed limit (0.0 to 1.0)."""
|
||||||
|
self.battery_limit = max(0.0, min(1.0, msg.data))
|
||||||
|
|
||||||
|
def _on_emergency(self, msg: EmergencyEvent) -> None:
|
||||||
|
"""Update emergency-driven speed scale based on system state."""
|
||||||
|
# Map emergency state to speed reduction factor
|
||||||
|
if msg.state == "ESCALATED":
|
||||||
|
self.emergency_scale = 0.0 # Complete stop
|
||||||
|
elif msg.state == "STOPPING":
|
||||||
|
self.emergency_scale = 0.1 # Minimal movement for safe deceleration
|
||||||
|
elif msg.state == "RECOVERING":
|
||||||
|
self.emergency_scale = 0.3 # Cautious recovery
|
||||||
|
else: # NOMINAL
|
||||||
|
self.emergency_scale = 1.0 # Full speed allowed
|
||||||
|
|
||||||
|
def _timer_callback(self) -> None:
|
||||||
|
"""Process and publish speed-limited cmd_vel at 50Hz."""
|
||||||
|
if self.last_cmd_vel is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Compute minimum scale factor from all constraints
|
||||||
|
scale_factor = min(self.terrain_scale, self.battery_limit, self.emergency_scale)
|
||||||
|
|
||||||
|
# Apply scale factor to cmd_vel
|
||||||
|
limited_cmd_vel = self._scale_twist(self.last_cmd_vel, scale_factor)
|
||||||
|
|
||||||
|
# Publish limited command
|
||||||
|
self.pub_cmd_vel_limited.publish(limited_cmd_vel)
|
||||||
|
|
||||||
|
# Publish telemetry
|
||||||
|
info = {
|
||||||
|
"timestamp": self.get_clock().now().nanoseconds / 1e9,
|
||||||
|
"scale_factor": float(scale_factor),
|
||||||
|
"terrain_scale": float(self.terrain_scale),
|
||||||
|
"battery_limit": float(self.battery_limit),
|
||||||
|
"emergency_scale": float(self.emergency_scale),
|
||||||
|
"linear_x": float(limited_cmd_vel.linear.x),
|
||||||
|
"linear_y": float(limited_cmd_vel.linear.y),
|
||||||
|
"linear_z": float(limited_cmd_vel.linear.z),
|
||||||
|
"angular_x": float(limited_cmd_vel.angular.x),
|
||||||
|
"angular_y": float(limited_cmd_vel.angular.y),
|
||||||
|
"angular_z": float(limited_cmd_vel.angular.z),
|
||||||
|
}
|
||||||
|
info_msg = String(data=json.dumps(info))
|
||||||
|
self.pub_info.publish(info_msg)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _scale_twist(msg: Twist, scale: float) -> Twist:
|
||||||
|
"""Scale all velocities in a Twist message by a factor."""
|
||||||
|
scaled = Twist()
|
||||||
|
scaled.linear.x = msg.linear.x * scale
|
||||||
|
scaled.linear.y = msg.linear.y * scale
|
||||||
|
scaled.linear.z = msg.linear.z * scale
|
||||||
|
scaled.angular.x = msg.angular.x * scale
|
||||||
|
scaled.angular.y = msg.angular.y * scale
|
||||||
|
scaled.angular.z = msg.angular.z * scale
|
||||||
|
return scaled
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = SpeedLimiterNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_speed_limiter/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_speed_limiter/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_speed_limiter
|
||||||
|
[egg_info]
|
||||||
|
tag_date = 0
|
||||||
29
jetson/ros2_ws/src/saltybot_speed_limiter/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_speed_limiter/setup.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_speed_limiter"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/speed_limiter.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/limiter_params.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description=(
|
||||||
|
"Speed limiter: enforces terrain, battery, and emergency constraints on cmd_vel"
|
||||||
|
),
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"speed_limiter_node = saltybot_speed_limiter.speed_limiter_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,269 @@
|
|||||||
|
"""Unit tests for speed_limiter_node."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import json
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from std_msgs.msg import Float32, String
|
||||||
|
from saltybot_emergency_msgs.msg import EmergencyEvent
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.executors import SingleThreadedExecutor
|
||||||
|
|
||||||
|
# Import the node under test
|
||||||
|
from saltybot_speed_limiter.speed_limiter_node import SpeedLimiterNode
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rclpy_fixture():
|
||||||
|
"""Initialize and cleanup rclpy."""
|
||||||
|
rclpy.init()
|
||||||
|
yield
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def node(rclpy_fixture):
|
||||||
|
"""Create a speed limiter node instance."""
|
||||||
|
node = SpeedLimiterNode()
|
||||||
|
yield node
|
||||||
|
node.destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSpeedLimiterNode:
|
||||||
|
"""Test suite for SpeedLimiterNode."""
|
||||||
|
|
||||||
|
def test_node_initialization(self, node):
|
||||||
|
"""Test that node initializes with correct defaults."""
|
||||||
|
assert node.terrain_scale == 1.0
|
||||||
|
assert node.battery_limit == 1.0
|
||||||
|
assert node.emergency_scale == 1.0
|
||||||
|
assert node.last_cmd_vel is None
|
||||||
|
|
||||||
|
def test_terrain_scale_subscription(self, node):
|
||||||
|
"""Test terrain scale value update."""
|
||||||
|
msg = Float32(data=0.5)
|
||||||
|
node._on_terrain_scale(msg)
|
||||||
|
assert node.terrain_scale == 0.5
|
||||||
|
|
||||||
|
def test_terrain_scale_clamping(self, node):
|
||||||
|
"""Test that terrain scale is clamped to [0, 1]."""
|
||||||
|
node._on_terrain_scale(Float32(data=1.5))
|
||||||
|
assert node.terrain_scale == 1.0
|
||||||
|
|
||||||
|
node._on_terrain_scale(Float32(data=-0.5))
|
||||||
|
assert node.terrain_scale == 0.0
|
||||||
|
|
||||||
|
def test_battery_limit_subscription(self, node):
|
||||||
|
"""Test battery limit value update."""
|
||||||
|
msg = Float32(data=0.7)
|
||||||
|
node._on_battery_limit(msg)
|
||||||
|
assert node.battery_limit == 0.7
|
||||||
|
|
||||||
|
def test_battery_limit_clamping(self, node):
|
||||||
|
"""Test that battery limit is clamped to [0, 1]."""
|
||||||
|
node._on_battery_limit(Float32(data=2.0))
|
||||||
|
assert node.battery_limit == 1.0
|
||||||
|
|
||||||
|
node._on_battery_limit(Float32(data=-1.0))
|
||||||
|
assert node.battery_limit == 0.0
|
||||||
|
|
||||||
|
def test_emergency_nominal_state(self, node):
|
||||||
|
"""Test emergency scale in NOMINAL state."""
|
||||||
|
msg = EmergencyEvent()
|
||||||
|
msg.state = "NOMINAL"
|
||||||
|
node._on_emergency(msg)
|
||||||
|
assert node.emergency_scale == 1.0
|
||||||
|
|
||||||
|
def test_emergency_recovering_state(self, node):
|
||||||
|
"""Test emergency scale in RECOVERING state."""
|
||||||
|
msg = EmergencyEvent()
|
||||||
|
msg.state = "RECOVERING"
|
||||||
|
node._on_emergency(msg)
|
||||||
|
assert node.emergency_scale == 0.3
|
||||||
|
|
||||||
|
def test_emergency_stopping_state(self, node):
|
||||||
|
"""Test emergency scale in STOPPING state."""
|
||||||
|
msg = EmergencyEvent()
|
||||||
|
msg.state = "STOPPING"
|
||||||
|
node._on_emergency(msg)
|
||||||
|
assert node.emergency_scale == 0.1
|
||||||
|
|
||||||
|
def test_emergency_escalated_state(self, node):
|
||||||
|
"""Test emergency scale in ESCALATED state (complete stop)."""
|
||||||
|
msg = EmergencyEvent()
|
||||||
|
msg.state = "ESCALATED"
|
||||||
|
node._on_emergency(msg)
|
||||||
|
assert node.emergency_scale == 0.0
|
||||||
|
|
||||||
|
def test_cmd_vel_caching(self, node):
|
||||||
|
"""Test that cmd_vel messages are cached."""
|
||||||
|
msg = Twist()
|
||||||
|
msg.linear.x = 1.0
|
||||||
|
node._on_cmd_vel(msg)
|
||||||
|
assert node.last_cmd_vel is not None
|
||||||
|
assert node.last_cmd_vel.linear.x == 1.0
|
||||||
|
|
||||||
|
def test_scale_twist_zero(self, node):
|
||||||
|
"""Test scaling a twist by zero."""
|
||||||
|
msg = Twist()
|
||||||
|
msg.linear.x = 1.0
|
||||||
|
msg.angular.z = 0.5
|
||||||
|
result = node._scale_twist(msg, 0.0)
|
||||||
|
assert result.linear.x == 0.0
|
||||||
|
assert result.angular.z == 0.0
|
||||||
|
|
||||||
|
def test_scale_twist_half(self, node):
|
||||||
|
"""Test scaling a twist by 0.5."""
|
||||||
|
msg = Twist()
|
||||||
|
msg.linear.x = 2.0
|
||||||
|
msg.linear.y = 4.0
|
||||||
|
msg.angular.z = 1.0
|
||||||
|
result = node._scale_twist(msg, 0.5)
|
||||||
|
assert abs(result.linear.x - 1.0) < 1e-6
|
||||||
|
assert abs(result.linear.y - 2.0) < 1e-6
|
||||||
|
assert abs(result.angular.z - 0.5) < 1e-6
|
||||||
|
|
||||||
|
def test_scale_twist_full(self, node):
|
||||||
|
"""Test scaling a twist by 1.0 (no change)."""
|
||||||
|
msg = Twist()
|
||||||
|
msg.linear.x = 1.5
|
||||||
|
msg.angular.z = 0.8
|
||||||
|
result = node._scale_twist(msg, 1.0)
|
||||||
|
assert abs(result.linear.x - 1.5) < 1e-6
|
||||||
|
assert abs(result.angular.z - 0.8) < 1e-6
|
||||||
|
|
||||||
|
def test_scale_twist_all_components(self, node):
|
||||||
|
"""Test that all components of twist are scaled."""
|
||||||
|
msg = Twist()
|
||||||
|
msg.linear.x = 1.0
|
||||||
|
msg.linear.y = 2.0
|
||||||
|
msg.linear.z = 3.0
|
||||||
|
msg.angular.x = 0.1
|
||||||
|
msg.angular.y = 0.2
|
||||||
|
msg.angular.z = 0.3
|
||||||
|
result = node._scale_twist(msg, 0.5)
|
||||||
|
assert abs(result.linear.x - 0.5) < 1e-6
|
||||||
|
assert abs(result.linear.y - 1.0) < 1e-6
|
||||||
|
assert abs(result.linear.z - 1.5) < 1e-6
|
||||||
|
assert abs(result.angular.x - 0.05) < 1e-6
|
||||||
|
assert abs(result.angular.y - 0.1) < 1e-6
|
||||||
|
assert abs(result.angular.z - 0.15) < 1e-6
|
||||||
|
|
||||||
|
def test_min_scale_factor_all_one(self, node):
|
||||||
|
"""Test min-of-three logic with all scales = 1.0."""
|
||||||
|
node.terrain_scale = 1.0
|
||||||
|
node.battery_limit = 1.0
|
||||||
|
node.emergency_scale = 1.0
|
||||||
|
# This would be tested via _timer_callback, but we'll test the logic directly
|
||||||
|
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
|
||||||
|
assert scale == 1.0
|
||||||
|
|
||||||
|
def test_min_scale_factor_terrain_limiting(self, node):
|
||||||
|
"""Test that terrain scale limits when it's the minimum."""
|
||||||
|
node.terrain_scale = 0.3
|
||||||
|
node.battery_limit = 0.8
|
||||||
|
node.emergency_scale = 0.9
|
||||||
|
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
|
||||||
|
assert scale == 0.3
|
||||||
|
|
||||||
|
def test_min_scale_factor_battery_limiting(self, node):
|
||||||
|
"""Test that battery limit constrains when it's the minimum."""
|
||||||
|
node.terrain_scale = 0.9
|
||||||
|
node.battery_limit = 0.2
|
||||||
|
node.emergency_scale = 0.8
|
||||||
|
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
|
||||||
|
assert scale == 0.2
|
||||||
|
|
||||||
|
def test_min_scale_factor_emergency_limiting(self, node):
|
||||||
|
"""Test that emergency scale limits when it's the minimum."""
|
||||||
|
node.terrain_scale = 0.9
|
||||||
|
node.battery_limit = 0.8
|
||||||
|
node.emergency_scale = 0.0
|
||||||
|
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
|
||||||
|
assert scale == 0.0
|
||||||
|
|
||||||
|
def test_timer_callback_no_cmd_vel(self, node):
|
||||||
|
"""Test timer callback with no cmd_vel received."""
|
||||||
|
# Should not crash or publish if last_cmd_vel is None
|
||||||
|
node.last_cmd_vel = None
|
||||||
|
node._timer_callback() # Should return early
|
||||||
|
|
||||||
|
def test_info_json_structure(self, node):
|
||||||
|
"""Test that published info has correct JSON structure."""
|
||||||
|
node.last_cmd_vel = Twist()
|
||||||
|
node.last_cmd_vel.linear.x = 1.0
|
||||||
|
node.terrain_scale = 0.5
|
||||||
|
node.battery_limit = 0.8
|
||||||
|
node.emergency_scale = 1.0
|
||||||
|
|
||||||
|
# Manually create info dict as the node would
|
||||||
|
scale_factor = min(0.5, 0.8, 1.0)
|
||||||
|
limited = node._scale_twist(node.last_cmd_vel, scale_factor)
|
||||||
|
|
||||||
|
info = {
|
||||||
|
"timestamp": node.get_clock().now().nanoseconds / 1e9,
|
||||||
|
"scale_factor": float(scale_factor),
|
||||||
|
"terrain_scale": float(0.5),
|
||||||
|
"battery_limit": float(0.8),
|
||||||
|
"emergency_scale": float(1.0),
|
||||||
|
"linear_x": float(limited.linear.x),
|
||||||
|
"linear_y": float(limited.linear.y),
|
||||||
|
"linear_z": float(limited.linear.z),
|
||||||
|
"angular_x": float(limited.angular.x),
|
||||||
|
"angular_y": float(limited.angular.y),
|
||||||
|
"angular_z": float(limited.angular.z),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Verify it can be serialized/deserialized
|
||||||
|
json_str = json.dumps(info)
|
||||||
|
parsed = json.loads(json_str)
|
||||||
|
|
||||||
|
assert parsed["scale_factor"] == 0.5
|
||||||
|
assert parsed["terrain_scale"] == 0.5
|
||||||
|
assert parsed["battery_limit"] == 0.8
|
||||||
|
assert parsed["emergency_scale"] == 1.0
|
||||||
|
assert abs(parsed["linear_x"] - 0.5) < 1e-6
|
||||||
|
|
||||||
|
|
||||||
|
class TestSpeedLimiterScenarios:
|
||||||
|
"""Integration-style tests for realistic scenarios."""
|
||||||
|
|
||||||
|
def test_scenario_rocky_terrain_low_battery(self, node):
|
||||||
|
"""Scenario: rocky terrain with low battery."""
|
||||||
|
# Rocky terrain severely limits speed
|
||||||
|
node._on_terrain_scale(Float32(data=0.2))
|
||||||
|
# Battery is critical
|
||||||
|
node._on_battery_limit(Float32(data=0.3))
|
||||||
|
# No emergency
|
||||||
|
msg = EmergencyEvent()
|
||||||
|
msg.state = "NOMINAL"
|
||||||
|
node._on_emergency(msg)
|
||||||
|
|
||||||
|
# Scale should be min(0.2, 0.3, 1.0) = 0.2
|
||||||
|
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
|
||||||
|
assert scale == 0.2
|
||||||
|
|
||||||
|
def test_scenario_emergency_stop(self, node):
|
||||||
|
"""Scenario: emergency escalation triggers immediate stop."""
|
||||||
|
# Nominal terrain and battery levels
|
||||||
|
node._on_terrain_scale(Float32(data=1.0))
|
||||||
|
node._on_battery_limit(Float32(data=1.0))
|
||||||
|
# Emergency escalation
|
||||||
|
msg = EmergencyEvent()
|
||||||
|
msg.state = "ESCALATED"
|
||||||
|
node._on_emergency(msg)
|
||||||
|
|
||||||
|
# Scale should be 0.0 due to emergency override
|
||||||
|
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
|
||||||
|
assert scale == 0.0
|
||||||
|
|
||||||
|
def test_scenario_smooth_terrain_good_battery(self, node):
|
||||||
|
"""Scenario: ideal conditions."""
|
||||||
|
node._on_terrain_scale(Float32(data=1.0))
|
||||||
|
node._on_battery_limit(Float32(data=0.9))
|
||||||
|
msg = EmergencyEvent()
|
||||||
|
msg.state = "NOMINAL"
|
||||||
|
node._on_emergency(msg)
|
||||||
|
|
||||||
|
scale = min(node.terrain_scale, node.battery_limit, node.emergency_scale)
|
||||||
|
assert scale == 0.9
|
||||||
Loading…
x
Reference in New Issue
Block a user