Cliff and drop-off detection safety node (Issue #288) #291
@ -0,0 +1,12 @@
|
|||||||
|
cliff_detector:
|
||||||
|
ros__parameters:
|
||||||
|
# Cliff threshold in meters (readings > this = drop-off)
|
||||||
|
cliff_threshold: 0.5
|
||||||
|
|
||||||
|
# Number of consecutive detections before triggering
|
||||||
|
# (debouncing to avoid false positives)
|
||||||
|
debounce_count: 3
|
||||||
|
|
||||||
|
# Minimum number of sensors that must exceed threshold
|
||||||
|
# (majority voting for robustness)
|
||||||
|
min_sensors: 2
|
||||||
@ -0,0 +1,31 @@
|
|||||||
|
"""Launch file for cliff detector node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description for cliff detector."""
|
||||||
|
pkg_dir = get_package_share_directory("saltybot_cliff_detector")
|
||||||
|
config_file = os.path.join(pkg_dir, "config", "cliff_detector_config.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"config_file",
|
||||||
|
default_value=config_file,
|
||||||
|
description="Path to configuration YAML file",
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
package="saltybot_cliff_detector",
|
||||||
|
executable="cliff_detector_node",
|
||||||
|
name="cliff_detector",
|
||||||
|
output="screen",
|
||||||
|
parameters=[LaunchConfiguration("config_file")],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
21
jetson/ros2_ws/src/saltybot_cliff_detector/package.xml
Normal file
21
jetson/ros2_ws/src/saltybot_cliff_detector/package.xml
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_cliff_detector</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Cliff and drop-off detection safety node for SaltyBot</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">SaltyLab Controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
<test_depend>std_msgs</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,105 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Cliff and drop-off detection node for SaltyBot.
|
||||||
|
|
||||||
|
Monitors downward-facing IR range sensors for drop-offs/stair edges.
|
||||||
|
Triggers emergency stop when any sensor reading exceeds threshold.
|
||||||
|
|
||||||
|
Subscribed topics:
|
||||||
|
/saltybot/cliff_sensors (sensor_msgs/LaserScan) - IR range array
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/saltybot/cliff_detected (std_msgs/Bool) - Cliff/drop-off detected
|
||||||
|
"""
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from sensor_msgs.msg import LaserScan
|
||||||
|
from std_msgs.msg import Bool
|
||||||
|
|
||||||
|
|
||||||
|
class CliffDetectorNode(Node):
|
||||||
|
"""ROS2 node for cliff and drop-off detection."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("cliff_detector")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter("cliff_threshold", 0.5)
|
||||||
|
self.declare_parameter("debounce_count", 3)
|
||||||
|
self.declare_parameter("min_sensors", 2)
|
||||||
|
|
||||||
|
self.cliff_threshold = self.get_parameter("cliff_threshold").value
|
||||||
|
self.debounce_count = self.get_parameter("debounce_count").value
|
||||||
|
self.min_sensors = self.get_parameter("min_sensors").value
|
||||||
|
|
||||||
|
# State tracking
|
||||||
|
self.cliff_detected = False
|
||||||
|
self.consecutive_detections = 0
|
||||||
|
|
||||||
|
# Subscription to cliff sensors
|
||||||
|
self.sub_cliff = self.create_subscription(
|
||||||
|
LaserScan, "/saltybot/cliff_sensors", self._on_cliff_sensors, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publisher for cliff detection
|
||||||
|
self.pub_cliff = self.create_publisher(Bool, "/saltybot/cliff_detected", 10)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Cliff detector initialized. "
|
||||||
|
f"Threshold: {self.cliff_threshold}m, "
|
||||||
|
f"Debounce: {self.debounce_count} frames, "
|
||||||
|
f"Min sensors: {self.min_sensors}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_cliff_sensors(self, msg: LaserScan) -> None:
|
||||||
|
"""Process cliff sensor readings."""
|
||||||
|
# Filter out NaN/inf values and check against threshold
|
||||||
|
valid_ranges = [r for r in msg.ranges if 0 < r < float('inf')]
|
||||||
|
|
||||||
|
if len(valid_ranges) == 0:
|
||||||
|
# No valid readings - assume safe
|
||||||
|
self._update_cliff_state(False)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Count how many sensors exceed threshold
|
||||||
|
exceeding_sensors = sum(1 for r in valid_ranges if r > self.cliff_threshold)
|
||||||
|
|
||||||
|
# Cliff detected if min_sensors or more exceed threshold
|
||||||
|
cliff_detected_now = exceeding_sensors >= self.min_sensors
|
||||||
|
|
||||||
|
if cliff_detected_now:
|
||||||
|
self.consecutive_detections += 1
|
||||||
|
if self.consecutive_detections >= self.debounce_count:
|
||||||
|
self._update_cliff_state(True)
|
||||||
|
else:
|
||||||
|
self.consecutive_detections = 0
|
||||||
|
self._update_cliff_state(False)
|
||||||
|
|
||||||
|
def _update_cliff_state(self, detected: bool) -> None:
|
||||||
|
"""Update cliff state and publish."""
|
||||||
|
if detected != self.cliff_detected:
|
||||||
|
self.cliff_detected = detected
|
||||||
|
if detected:
|
||||||
|
self.get_logger().warn("CLIFF DETECTED! Emergency stop triggered!")
|
||||||
|
else:
|
||||||
|
self.get_logger().info("Cliff cleared, continuing operation")
|
||||||
|
|
||||||
|
# Always publish current state
|
||||||
|
msg = Bool(data=self.cliff_detected)
|
||||||
|
self.pub_cliff.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = CliffDetectorNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
5
jetson/ros2_ws/src/saltybot_cliff_detector/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_cliff_detector/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_cliff_detector
|
||||||
|
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_cliff_detector
|
||||||
24
jetson/ros2_ws/src/saltybot_cliff_detector/setup.py
Normal file
24
jetson/ros2_ws/src/saltybot_cliff_detector/setup.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name='saltybot_cliff_detector',
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages', ['resource/saltybot_cliff_detector']),
|
||||||
|
('share/saltybot_cliff_detector', ['package.xml']),
|
||||||
|
('share/saltybot_cliff_detector/config', ['config/cliff_detector_config.yaml']),
|
||||||
|
('share/saltybot_cliff_detector/launch', ['launch/cliff_detector.launch.py']),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
author='SaltyLab Controls',
|
||||||
|
author_email='sl-controls@saltylab.local',
|
||||||
|
description='Cliff and drop-off detection safety node for SaltyBot',
|
||||||
|
license='MIT',
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'cliff_detector_node=saltybot_cliff_detector.cliff_detector_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,205 @@
|
|||||||
|
"""Tests for cliff detector node."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sensor_msgs.msg import LaserScan
|
||||||
|
from std_msgs.msg import Bool
|
||||||
|
import rclpy
|
||||||
|
|
||||||
|
from saltybot_cliff_detector.cliff_detector_node import CliffDetectorNode
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rclpy_fixture():
|
||||||
|
rclpy.init()
|
||||||
|
yield
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def node(rclpy_fixture):
|
||||||
|
node = CliffDetectorNode()
|
||||||
|
yield node
|
||||||
|
node.destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
class TestInit:
|
||||||
|
def test_node_initialization(self, node):
|
||||||
|
assert node.cliff_threshold == 0.5
|
||||||
|
assert node.debounce_count == 3
|
||||||
|
assert node.min_sensors == 2
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
assert node.consecutive_detections == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestCliffDetection:
|
||||||
|
def test_cliff_threshold_exceeded(self, node):
|
||||||
|
"""Test detection when sensor exceeds threshold."""
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [0.3, 0.6, 0.4] # 1 sensor exceeds threshold
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.consecutive_detections == 1
|
||||||
|
|
||||||
|
def test_multiple_sensors_exceed(self, node):
|
||||||
|
"""Test detection with multiple sensors exceeding threshold."""
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [0.6, 0.7, 0.4] # 2 sensors exceed threshold
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.consecutive_detections == 1
|
||||||
|
|
||||||
|
def test_debounce_threshold(self, node):
|
||||||
|
"""Test debouncing requires multiple consecutive detections."""
|
||||||
|
node.cliff_threshold = 0.5
|
||||||
|
node.debounce_count = 3
|
||||||
|
node.min_sensors = 1
|
||||||
|
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [0.6, 0.3, 0.4] # 1 exceeds
|
||||||
|
|
||||||
|
# First detection
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
assert node.consecutive_detections == 1
|
||||||
|
|
||||||
|
# Second detection
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
assert node.consecutive_detections == 2
|
||||||
|
|
||||||
|
# Third detection - should trigger
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is True
|
||||||
|
assert node.consecutive_detections == 3
|
||||||
|
|
||||||
|
def test_debounce_reset_on_clear(self, node):
|
||||||
|
"""Test debounce counter resets when cliff clears."""
|
||||||
|
node.debounce_count = 2
|
||||||
|
node.min_sensors = 1
|
||||||
|
|
||||||
|
msg_cliff = LaserScan()
|
||||||
|
msg_cliff.ranges = [0.6, 0.3]
|
||||||
|
|
||||||
|
msg_clear = LaserScan()
|
||||||
|
msg_clear.ranges = [0.3, 0.2]
|
||||||
|
|
||||||
|
# Detection
|
||||||
|
node._on_cliff_sensors(msg_cliff)
|
||||||
|
assert node.consecutive_detections == 1
|
||||||
|
|
||||||
|
# Clear
|
||||||
|
node._on_cliff_sensors(msg_clear)
|
||||||
|
assert node.consecutive_detections == 0
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
def test_min_sensors_voting(self, node):
|
||||||
|
"""Test minimum sensor threshold (majority voting)."""
|
||||||
|
node.min_sensors = 2
|
||||||
|
node.debounce_count = 1
|
||||||
|
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [0.6, 0.3, 0.4] # Only 1 exceeds
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
msg.ranges = [0.6, 0.7, 0.4] # 2 exceed
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is True
|
||||||
|
|
||||||
|
def test_nan_values_filtered(self, node):
|
||||||
|
"""Test that NaN/inf values are filtered out."""
|
||||||
|
node.debounce_count = 1
|
||||||
|
node.min_sensors = 1
|
||||||
|
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [float('nan'), 0.6, float('inf')]
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
# Only 0.6 is valid and exceeds threshold
|
||||||
|
assert node.cliff_detected is True
|
||||||
|
|
||||||
|
def test_empty_ranges(self, node):
|
||||||
|
"""Test behavior with empty or all-invalid ranges."""
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = []
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
msg.ranges = [float('nan'), float('inf')]
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestStateTransitions:
|
||||||
|
def test_cliff_to_clear_transition(self, node):
|
||||||
|
"""Test state transition from cliff to clear."""
|
||||||
|
node.debounce_count = 1
|
||||||
|
node.min_sensors = 1
|
||||||
|
|
||||||
|
# Trigger cliff
|
||||||
|
msg_cliff = LaserScan()
|
||||||
|
msg_cliff.ranges = [0.6, 0.3]
|
||||||
|
node._on_cliff_sensors(msg_cliff)
|
||||||
|
|
||||||
|
assert node.cliff_detected is True
|
||||||
|
|
||||||
|
# Clear cliff
|
||||||
|
msg_clear = LaserScan()
|
||||||
|
msg_clear.ranges = [0.3, 0.2]
|
||||||
|
node._on_cliff_sensors(msg_clear)
|
||||||
|
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
def test_clear_to_cliff_transition(self, node):
|
||||||
|
"""Test state transition from clear to cliff."""
|
||||||
|
node.debounce_count = 1
|
||||||
|
node.min_sensors = 1
|
||||||
|
|
||||||
|
# Start clear
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
# Trigger cliff
|
||||||
|
msg_cliff = LaserScan()
|
||||||
|
msg_cliff.ranges = [0.6, 0.3]
|
||||||
|
node._on_cliff_sensors(msg_cliff)
|
||||||
|
|
||||||
|
assert node.cliff_detected is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestEdgeCases:
|
||||||
|
def test_single_sensor_exceeds(self, node):
|
||||||
|
"""Test with single sensor exceeding threshold."""
|
||||||
|
node.min_sensors = 1
|
||||||
|
node.debounce_count = 1
|
||||||
|
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [0.6]
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is True
|
||||||
|
|
||||||
|
def test_zero_readings(self, node):
|
||||||
|
"""Test that zero readings are filtered (invalid)."""
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [0.0, 0.6, 0.0] # Only 0.6 is valid
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
# 0.6 exceeds 0.5, but need 2 sensors with min_sensors=2
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
def test_threshold_boundary(self, node):
|
||||||
|
"""Test boundary condition at exact threshold."""
|
||||||
|
node.debounce_count = 1
|
||||||
|
node.min_sensors = 1
|
||||||
|
|
||||||
|
msg = LaserScan()
|
||||||
|
msg.ranges = [0.5, 0.3] # Exactly at threshold (not exceeding)
|
||||||
|
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is False
|
||||||
|
|
||||||
|
msg.ranges = [0.50001, 0.3] # Just above threshold
|
||||||
|
node._on_cliff_sensors(msg)
|
||||||
|
assert node.cliff_detected is True
|
||||||
Loading…
x
Reference in New Issue
Block a user