Cliff and drop-off detection safety node (Issue #288) #291

Merged
sl-jetson merged 1 commits from sl-controls/issue-288-cliff-stop into main 2026-03-02 21:05:19 -05:00
10 changed files with 403 additions and 0 deletions
Showing only changes of commit dab0f96399 - Show all commits

View File

@ -0,0 +1,12 @@
cliff_detector:
ros__parameters:
# Cliff threshold in meters (readings > this = drop-off)
cliff_threshold: 0.5
# Number of consecutive detections before triggering
# (debouncing to avoid false positives)
debounce_count: 3
# Minimum number of sensors that must exceed threshold
# (majority voting for robustness)
min_sensors: 2

View File

@ -0,0 +1,31 @@
"""Launch file for cliff detector node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for cliff detector."""
pkg_dir = get_package_share_directory("saltybot_cliff_detector")
config_file = os.path.join(pkg_dir, "config", "cliff_detector_config.yaml")
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
Node(
package="saltybot_cliff_detector",
executable="cliff_detector_node",
name="cliff_detector",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_cliff_detector</name>
<version>0.1.0</version>
<description>Cliff and drop-off detection safety node for SaltyBot</description>
<maintainer email="sl-controls@saltylab.local">SaltyLab Controls</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<test_depend>pytest</test_depend>
<test_depend>std_msgs</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""Cliff and drop-off detection node for SaltyBot.
Monitors downward-facing IR range sensors for drop-offs/stair edges.
Triggers emergency stop when any sensor reading exceeds threshold.
Subscribed topics:
/saltybot/cliff_sensors (sensor_msgs/LaserScan) - IR range array
Published topics:
/saltybot/cliff_detected (std_msgs/Bool) - Cliff/drop-off detected
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from std_msgs.msg import Bool
class CliffDetectorNode(Node):
"""ROS2 node for cliff and drop-off detection."""
def __init__(self):
super().__init__("cliff_detector")
# Parameters
self.declare_parameter("cliff_threshold", 0.5)
self.declare_parameter("debounce_count", 3)
self.declare_parameter("min_sensors", 2)
self.cliff_threshold = self.get_parameter("cliff_threshold").value
self.debounce_count = self.get_parameter("debounce_count").value
self.min_sensors = self.get_parameter("min_sensors").value
# State tracking
self.cliff_detected = False
self.consecutive_detections = 0
# Subscription to cliff sensors
self.sub_cliff = self.create_subscription(
LaserScan, "/saltybot/cliff_sensors", self._on_cliff_sensors, 10
)
# Publisher for cliff detection
self.pub_cliff = self.create_publisher(Bool, "/saltybot/cliff_detected", 10)
self.get_logger().info(
f"Cliff detector initialized. "
f"Threshold: {self.cliff_threshold}m, "
f"Debounce: {self.debounce_count} frames, "
f"Min sensors: {self.min_sensors}"
)
def _on_cliff_sensors(self, msg: LaserScan) -> None:
"""Process cliff sensor readings."""
# Filter out NaN/inf values and check against threshold
valid_ranges = [r for r in msg.ranges if 0 < r < float('inf')]
if len(valid_ranges) == 0:
# No valid readings - assume safe
self._update_cliff_state(False)
return
# Count how many sensors exceed threshold
exceeding_sensors = sum(1 for r in valid_ranges if r > self.cliff_threshold)
# Cliff detected if min_sensors or more exceed threshold
cliff_detected_now = exceeding_sensors >= self.min_sensors
if cliff_detected_now:
self.consecutive_detections += 1
if self.consecutive_detections >= self.debounce_count:
self._update_cliff_state(True)
else:
self.consecutive_detections = 0
self._update_cliff_state(False)
def _update_cliff_state(self, detected: bool) -> None:
"""Update cliff state and publish."""
if detected != self.cliff_detected:
self.cliff_detected = detected
if detected:
self.get_logger().warn("CLIFF DETECTED! Emergency stop triggered!")
else:
self.get_logger().info("Cliff cleared, continuing operation")
# Always publish current state
msg = Bool(data=self.cliff_detected)
self.pub_cliff.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = CliffDetectorNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_cliff_detector
[install]
install_scripts=$base/lib/saltybot_cliff_detector

View File

@ -0,0 +1,24 @@
from setuptools import setup, find_packages
setup(
name='saltybot_cliff_detector',
version='0.1.0',
packages=find_packages(),
data_files=[
('share/ament_index/resource_index/packages', ['resource/saltybot_cliff_detector']),
('share/saltybot_cliff_detector', ['package.xml']),
('share/saltybot_cliff_detector/config', ['config/cliff_detector_config.yaml']),
('share/saltybot_cliff_detector/launch', ['launch/cliff_detector.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyLab Controls',
author_email='sl-controls@saltylab.local',
description='Cliff and drop-off detection safety node for SaltyBot',
license='MIT',
entry_points={
'console_scripts': [
'cliff_detector_node=saltybot_cliff_detector.cliff_detector_node:main',
],
},
)

View File

@ -0,0 +1,205 @@
"""Tests for cliff detector node."""
import pytest
from sensor_msgs.msg import LaserScan
from std_msgs.msg import Bool
import rclpy
from saltybot_cliff_detector.cliff_detector_node import CliffDetectorNode
@pytest.fixture
def rclpy_fixture():
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
node = CliffDetectorNode()
yield node
node.destroy_node()
class TestInit:
def test_node_initialization(self, node):
assert node.cliff_threshold == 0.5
assert node.debounce_count == 3
assert node.min_sensors == 2
assert node.cliff_detected is False
assert node.consecutive_detections == 0
class TestCliffDetection:
def test_cliff_threshold_exceeded(self, node):
"""Test detection when sensor exceeds threshold."""
msg = LaserScan()
msg.ranges = [0.3, 0.6, 0.4] # 1 sensor exceeds threshold
node._on_cliff_sensors(msg)
assert node.consecutive_detections == 1
def test_multiple_sensors_exceed(self, node):
"""Test detection with multiple sensors exceeding threshold."""
msg = LaserScan()
msg.ranges = [0.6, 0.7, 0.4] # 2 sensors exceed threshold
node._on_cliff_sensors(msg)
assert node.consecutive_detections == 1
def test_debounce_threshold(self, node):
"""Test debouncing requires multiple consecutive detections."""
node.cliff_threshold = 0.5
node.debounce_count = 3
node.min_sensors = 1
msg = LaserScan()
msg.ranges = [0.6, 0.3, 0.4] # 1 exceeds
# First detection
node._on_cliff_sensors(msg)
assert node.cliff_detected is False
assert node.consecutive_detections == 1
# Second detection
node._on_cliff_sensors(msg)
assert node.cliff_detected is False
assert node.consecutive_detections == 2
# Third detection - should trigger
node._on_cliff_sensors(msg)
assert node.cliff_detected is True
assert node.consecutive_detections == 3
def test_debounce_reset_on_clear(self, node):
"""Test debounce counter resets when cliff clears."""
node.debounce_count = 2
node.min_sensors = 1
msg_cliff = LaserScan()
msg_cliff.ranges = [0.6, 0.3]
msg_clear = LaserScan()
msg_clear.ranges = [0.3, 0.2]
# Detection
node._on_cliff_sensors(msg_cliff)
assert node.consecutive_detections == 1
# Clear
node._on_cliff_sensors(msg_clear)
assert node.consecutive_detections == 0
assert node.cliff_detected is False
def test_min_sensors_voting(self, node):
"""Test minimum sensor threshold (majority voting)."""
node.min_sensors = 2
node.debounce_count = 1
msg = LaserScan()
msg.ranges = [0.6, 0.3, 0.4] # Only 1 exceeds
node._on_cliff_sensors(msg)
assert node.cliff_detected is False
msg.ranges = [0.6, 0.7, 0.4] # 2 exceed
node._on_cliff_sensors(msg)
assert node.cliff_detected is True
def test_nan_values_filtered(self, node):
"""Test that NaN/inf values are filtered out."""
node.debounce_count = 1
node.min_sensors = 1
msg = LaserScan()
msg.ranges = [float('nan'), 0.6, float('inf')]
node._on_cliff_sensors(msg)
# Only 0.6 is valid and exceeds threshold
assert node.cliff_detected is True
def test_empty_ranges(self, node):
"""Test behavior with empty or all-invalid ranges."""
msg = LaserScan()
msg.ranges = []
node._on_cliff_sensors(msg)
assert node.cliff_detected is False
msg.ranges = [float('nan'), float('inf')]
node._on_cliff_sensors(msg)
assert node.cliff_detected is False
class TestStateTransitions:
def test_cliff_to_clear_transition(self, node):
"""Test state transition from cliff to clear."""
node.debounce_count = 1
node.min_sensors = 1
# Trigger cliff
msg_cliff = LaserScan()
msg_cliff.ranges = [0.6, 0.3]
node._on_cliff_sensors(msg_cliff)
assert node.cliff_detected is True
# Clear cliff
msg_clear = LaserScan()
msg_clear.ranges = [0.3, 0.2]
node._on_cliff_sensors(msg_clear)
assert node.cliff_detected is False
def test_clear_to_cliff_transition(self, node):
"""Test state transition from clear to cliff."""
node.debounce_count = 1
node.min_sensors = 1
# Start clear
assert node.cliff_detected is False
# Trigger cliff
msg_cliff = LaserScan()
msg_cliff.ranges = [0.6, 0.3]
node._on_cliff_sensors(msg_cliff)
assert node.cliff_detected is True
class TestEdgeCases:
def test_single_sensor_exceeds(self, node):
"""Test with single sensor exceeding threshold."""
node.min_sensors = 1
node.debounce_count = 1
msg = LaserScan()
msg.ranges = [0.6]
node._on_cliff_sensors(msg)
assert node.cliff_detected is True
def test_zero_readings(self, node):
"""Test that zero readings are filtered (invalid)."""
msg = LaserScan()
msg.ranges = [0.0, 0.6, 0.0] # Only 0.6 is valid
node._on_cliff_sensors(msg)
# 0.6 exceeds 0.5, but need 2 sensors with min_sensors=2
assert node.cliff_detected is False
def test_threshold_boundary(self, node):
"""Test boundary condition at exact threshold."""
node.debounce_count = 1
node.min_sensors = 1
msg = LaserScan()
msg.ranges = [0.5, 0.3] # Exactly at threshold (not exceeding)
node._on_cliff_sensors(msg)
assert node.cliff_detected is False
msg.ranges = [0.50001, 0.3] # Just above threshold
node._on_cliff_sensors(msg)
assert node.cliff_detected is True