Merge pull request 'Cliff and drop-off detection safety node (Issue #288)' (#291) from sl-controls/issue-288-cliff-stop into main
This commit is contained in:
commit
6518171d42
@ -0,0 +1,12 @@
|
||||
cliff_detector:
|
||||
ros__parameters:
|
||||
# Cliff threshold in meters (readings > this = drop-off)
|
||||
cliff_threshold: 0.5
|
||||
|
||||
# Number of consecutive detections before triggering
|
||||
# (debouncing to avoid false positives)
|
||||
debounce_count: 3
|
||||
|
||||
# Minimum number of sensors that must exceed threshold
|
||||
# (majority voting for robustness)
|
||||
min_sensors: 2
|
||||
@ -0,0 +1,31 @@
|
||||
"""Launch file for cliff detector node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for cliff detector."""
|
||||
pkg_dir = get_package_share_directory("saltybot_cliff_detector")
|
||||
config_file = os.path.join(pkg_dir, "config", "cliff_detector_config.yaml")
|
||||
|
||||
return LaunchDescription(
|
||||
[
|
||||
DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to configuration YAML file",
|
||||
),
|
||||
Node(
|
||||
package="saltybot_cliff_detector",
|
||||
executable="cliff_detector_node",
|
||||
name="cliff_detector",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("config_file")],
|
||||
),
|
||||
]
|
||||
)
|
||||
21
jetson/ros2_ws/src/saltybot_cliff_detector/package.xml
Normal file
21
jetson/ros2_ws/src/saltybot_cliff_detector/package.xml
Normal file
@ -0,0 +1,21 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_cliff_detector</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Cliff and drop-off detection safety node for SaltyBot</description>
|
||||
<maintainer email="sl-controls@saltylab.local">SaltyLab Controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
<depend>rclpy</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<test_depend>pytest</test_depend>
|
||||
<test_depend>std_msgs</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,105 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Cliff and drop-off detection node for SaltyBot.
|
||||
|
||||
Monitors downward-facing IR range sensors for drop-offs/stair edges.
|
||||
Triggers emergency stop when any sensor reading exceeds threshold.
|
||||
|
||||
Subscribed topics:
|
||||
/saltybot/cliff_sensors (sensor_msgs/LaserScan) - IR range array
|
||||
|
||||
Published topics:
|
||||
/saltybot/cliff_detected (std_msgs/Bool) - Cliff/drop-off detected
|
||||
"""
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import LaserScan
|
||||
from std_msgs.msg import Bool
|
||||
|
||||
|
||||
class CliffDetectorNode(Node):
|
||||
"""ROS2 node for cliff and drop-off detection."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("cliff_detector")
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter("cliff_threshold", 0.5)
|
||||
self.declare_parameter("debounce_count", 3)
|
||||
self.declare_parameter("min_sensors", 2)
|
||||
|
||||
self.cliff_threshold = self.get_parameter("cliff_threshold").value
|
||||
self.debounce_count = self.get_parameter("debounce_count").value
|
||||
self.min_sensors = self.get_parameter("min_sensors").value
|
||||
|
||||
# State tracking
|
||||
self.cliff_detected = False
|
||||
self.consecutive_detections = 0
|
||||
|
||||
# Subscription to cliff sensors
|
||||
self.sub_cliff = self.create_subscription(
|
||||
LaserScan, "/saltybot/cliff_sensors", self._on_cliff_sensors, 10
|
||||
)
|
||||
|
||||
# Publisher for cliff detection
|
||||
self.pub_cliff = self.create_publisher(Bool, "/saltybot/cliff_detected", 10)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Cliff detector initialized. "
|
||||
f"Threshold: {self.cliff_threshold}m, "
|
||||
f"Debounce: {self.debounce_count} frames, "
|
||||
f"Min sensors: {self.min_sensors}"
|
||||
)
|
||||
|
||||
def _on_cliff_sensors(self, msg: LaserScan) -> None:
|
||||
"""Process cliff sensor readings."""
|
||||
# Filter out NaN/inf values and check against threshold
|
||||
valid_ranges = [r for r in msg.ranges if 0 < r < float('inf')]
|
||||
|
||||
if len(valid_ranges) == 0:
|
||||
# No valid readings - assume safe
|
||||
self._update_cliff_state(False)
|
||||
return
|
||||
|
||||
# Count how many sensors exceed threshold
|
||||
exceeding_sensors = sum(1 for r in valid_ranges if r > self.cliff_threshold)
|
||||
|
||||
# Cliff detected if min_sensors or more exceed threshold
|
||||
cliff_detected_now = exceeding_sensors >= self.min_sensors
|
||||
|
||||
if cliff_detected_now:
|
||||
self.consecutive_detections += 1
|
||||
if self.consecutive_detections >= self.debounce_count:
|
||||
self._update_cliff_state(True)
|
||||
else:
|
||||
self.consecutive_detections = 0
|
||||
self._update_cliff_state(False)
|
||||
|
||||
def _update_cliff_state(self, detected: bool) -> None:
|
||||
"""Update cliff state and publish."""
|
||||
if detected != self.cliff_detected:
|
||||
self.cliff_detected = detected
|
||||
if detected:
|
||||
self.get_logger().warn("CLIFF DETECTED! Emergency stop triggered!")
|
||||
else:
|
||||
self.get_logger().info("Cliff cleared, continuing operation")
|
||||
|
||||
# Always publish current state
|
||||
msg = Bool(data=self.cliff_detected)
|
||||
self.pub_cliff.publish(msg)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = CliffDetectorNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
5
jetson/ros2_ws/src/saltybot_cliff_detector/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_cliff_detector/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_cliff_detector
|
||||
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_cliff_detector
|
||||
24
jetson/ros2_ws/src/saltybot_cliff_detector/setup.py
Normal file
24
jetson/ros2_ws/src/saltybot_cliff_detector/setup.py
Normal file
@ -0,0 +1,24 @@
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name='saltybot_cliff_detector',
|
||||
version='0.1.0',
|
||||
packages=find_packages(),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages', ['resource/saltybot_cliff_detector']),
|
||||
('share/saltybot_cliff_detector', ['package.xml']),
|
||||
('share/saltybot_cliff_detector/config', ['config/cliff_detector_config.yaml']),
|
||||
('share/saltybot_cliff_detector/launch', ['launch/cliff_detector.launch.py']),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
author='SaltyLab Controls',
|
||||
author_email='sl-controls@saltylab.local',
|
||||
description='Cliff and drop-off detection safety node for SaltyBot',
|
||||
license='MIT',
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'cliff_detector_node=saltybot_cliff_detector.cliff_detector_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,205 @@
|
||||
"""Tests for cliff detector node."""
|
||||
|
||||
import pytest
|
||||
from sensor_msgs.msg import LaserScan
|
||||
from std_msgs.msg import Bool
|
||||
import rclpy
|
||||
|
||||
from saltybot_cliff_detector.cliff_detector_node import CliffDetectorNode
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rclpy_fixture():
|
||||
rclpy.init()
|
||||
yield
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node(rclpy_fixture):
|
||||
node = CliffDetectorNode()
|
||||
yield node
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
class TestInit:
|
||||
def test_node_initialization(self, node):
|
||||
assert node.cliff_threshold == 0.5
|
||||
assert node.debounce_count == 3
|
||||
assert node.min_sensors == 2
|
||||
assert node.cliff_detected is False
|
||||
assert node.consecutive_detections == 0
|
||||
|
||||
|
||||
class TestCliffDetection:
|
||||
def test_cliff_threshold_exceeded(self, node):
|
||||
"""Test detection when sensor exceeds threshold."""
|
||||
msg = LaserScan()
|
||||
msg.ranges = [0.3, 0.6, 0.4] # 1 sensor exceeds threshold
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.consecutive_detections == 1
|
||||
|
||||
def test_multiple_sensors_exceed(self, node):
|
||||
"""Test detection with multiple sensors exceeding threshold."""
|
||||
msg = LaserScan()
|
||||
msg.ranges = [0.6, 0.7, 0.4] # 2 sensors exceed threshold
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.consecutive_detections == 1
|
||||
|
||||
def test_debounce_threshold(self, node):
|
||||
"""Test debouncing requires multiple consecutive detections."""
|
||||
node.cliff_threshold = 0.5
|
||||
node.debounce_count = 3
|
||||
node.min_sensors = 1
|
||||
|
||||
msg = LaserScan()
|
||||
msg.ranges = [0.6, 0.3, 0.4] # 1 exceeds
|
||||
|
||||
# First detection
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is False
|
||||
assert node.consecutive_detections == 1
|
||||
|
||||
# Second detection
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is False
|
||||
assert node.consecutive_detections == 2
|
||||
|
||||
# Third detection - should trigger
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is True
|
||||
assert node.consecutive_detections == 3
|
||||
|
||||
def test_debounce_reset_on_clear(self, node):
|
||||
"""Test debounce counter resets when cliff clears."""
|
||||
node.debounce_count = 2
|
||||
node.min_sensors = 1
|
||||
|
||||
msg_cliff = LaserScan()
|
||||
msg_cliff.ranges = [0.6, 0.3]
|
||||
|
||||
msg_clear = LaserScan()
|
||||
msg_clear.ranges = [0.3, 0.2]
|
||||
|
||||
# Detection
|
||||
node._on_cliff_sensors(msg_cliff)
|
||||
assert node.consecutive_detections == 1
|
||||
|
||||
# Clear
|
||||
node._on_cliff_sensors(msg_clear)
|
||||
assert node.consecutive_detections == 0
|
||||
assert node.cliff_detected is False
|
||||
|
||||
def test_min_sensors_voting(self, node):
|
||||
"""Test minimum sensor threshold (majority voting)."""
|
||||
node.min_sensors = 2
|
||||
node.debounce_count = 1
|
||||
|
||||
msg = LaserScan()
|
||||
msg.ranges = [0.6, 0.3, 0.4] # Only 1 exceeds
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is False
|
||||
|
||||
msg.ranges = [0.6, 0.7, 0.4] # 2 exceed
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is True
|
||||
|
||||
def test_nan_values_filtered(self, node):
|
||||
"""Test that NaN/inf values are filtered out."""
|
||||
node.debounce_count = 1
|
||||
node.min_sensors = 1
|
||||
|
||||
msg = LaserScan()
|
||||
msg.ranges = [float('nan'), 0.6, float('inf')]
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
# Only 0.6 is valid and exceeds threshold
|
||||
assert node.cliff_detected is True
|
||||
|
||||
def test_empty_ranges(self, node):
|
||||
"""Test behavior with empty or all-invalid ranges."""
|
||||
msg = LaserScan()
|
||||
msg.ranges = []
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is False
|
||||
|
||||
msg.ranges = [float('nan'), float('inf')]
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is False
|
||||
|
||||
|
||||
class TestStateTransitions:
|
||||
def test_cliff_to_clear_transition(self, node):
|
||||
"""Test state transition from cliff to clear."""
|
||||
node.debounce_count = 1
|
||||
node.min_sensors = 1
|
||||
|
||||
# Trigger cliff
|
||||
msg_cliff = LaserScan()
|
||||
msg_cliff.ranges = [0.6, 0.3]
|
||||
node._on_cliff_sensors(msg_cliff)
|
||||
|
||||
assert node.cliff_detected is True
|
||||
|
||||
# Clear cliff
|
||||
msg_clear = LaserScan()
|
||||
msg_clear.ranges = [0.3, 0.2]
|
||||
node._on_cliff_sensors(msg_clear)
|
||||
|
||||
assert node.cliff_detected is False
|
||||
|
||||
def test_clear_to_cliff_transition(self, node):
|
||||
"""Test state transition from clear to cliff."""
|
||||
node.debounce_count = 1
|
||||
node.min_sensors = 1
|
||||
|
||||
# Start clear
|
||||
assert node.cliff_detected is False
|
||||
|
||||
# Trigger cliff
|
||||
msg_cliff = LaserScan()
|
||||
msg_cliff.ranges = [0.6, 0.3]
|
||||
node._on_cliff_sensors(msg_cliff)
|
||||
|
||||
assert node.cliff_detected is True
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_single_sensor_exceeds(self, node):
|
||||
"""Test with single sensor exceeding threshold."""
|
||||
node.min_sensors = 1
|
||||
node.debounce_count = 1
|
||||
|
||||
msg = LaserScan()
|
||||
msg.ranges = [0.6]
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is True
|
||||
|
||||
def test_zero_readings(self, node):
|
||||
"""Test that zero readings are filtered (invalid)."""
|
||||
msg = LaserScan()
|
||||
msg.ranges = [0.0, 0.6, 0.0] # Only 0.6 is valid
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
# 0.6 exceeds 0.5, but need 2 sensors with min_sensors=2
|
||||
assert node.cliff_detected is False
|
||||
|
||||
def test_threshold_boundary(self, node):
|
||||
"""Test boundary condition at exact threshold."""
|
||||
node.debounce_count = 1
|
||||
node.min_sensors = 1
|
||||
|
||||
msg = LaserScan()
|
||||
msg.ranges = [0.5, 0.3] # Exactly at threshold (not exceeding)
|
||||
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is False
|
||||
|
||||
msg.ranges = [0.50001, 0.3] # Just above threshold
|
||||
node._on_cliff_sensors(msg)
|
||||
assert node.cliff_detected is True
|
||||
Loading…
x
Reference in New Issue
Block a user