diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/config/cliff_detector_config.yaml b/jetson/ros2_ws/src/saltybot_cliff_detector/config/cliff_detector_config.yaml new file mode 100644 index 0000000..90a8fd1 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cliff_detector/config/cliff_detector_config.yaml @@ -0,0 +1,12 @@ +cliff_detector: + ros__parameters: + # Cliff threshold in meters (readings > this = drop-off) + cliff_threshold: 0.5 + + # Number of consecutive detections before triggering + # (debouncing to avoid false positives) + debounce_count: 3 + + # Minimum number of sensors that must exceed threshold + # (majority voting for robustness) + min_sensors: 2 diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/launch/cliff_detector.launch.py b/jetson/ros2_ws/src/saltybot_cliff_detector/launch/cliff_detector.launch.py new file mode 100644 index 0000000..7632a09 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cliff_detector/launch/cliff_detector.launch.py @@ -0,0 +1,31 @@ +"""Launch file for cliff detector node.""" + +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration +from launch.actions import DeclareLaunchArgument +import os +from ament_index_python.packages import get_package_share_directory + + +def generate_launch_description(): + """Generate launch description for cliff detector.""" + pkg_dir = get_package_share_directory("saltybot_cliff_detector") + config_file = os.path.join(pkg_dir, "config", "cliff_detector_config.yaml") + + return LaunchDescription( + [ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to configuration YAML file", + ), + Node( + package="saltybot_cliff_detector", + executable="cliff_detector_node", + name="cliff_detector", + output="screen", + parameters=[LaunchConfiguration("config_file")], + ), + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/package.xml b/jetson/ros2_ws/src/saltybot_cliff_detector/package.xml new file mode 100644 index 0000000..4def308 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cliff_detector/package.xml @@ -0,0 +1,21 @@ + + + + saltybot_cliff_detector + 0.1.0 + Cliff and drop-off detection safety node for SaltyBot + SaltyLab Controls + MIT + + ament_python + rclpy + sensor_msgs + std_msgs + + pytest + std_msgs + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/resource/saltybot_cliff_detector b/jetson/ros2_ws/src/saltybot_cliff_detector/resource/saltybot_cliff_detector new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/saltybot_cliff_detector/__init__.py b/jetson/ros2_ws/src/saltybot_cliff_detector/saltybot_cliff_detector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/saltybot_cliff_detector/cliff_detector_node.py b/jetson/ros2_ws/src/saltybot_cliff_detector/saltybot_cliff_detector/cliff_detector_node.py new file mode 100644 index 0000000..3fbfd80 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cliff_detector/saltybot_cliff_detector/cliff_detector_node.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +"""Cliff and drop-off detection node for SaltyBot. + +Monitors downward-facing IR range sensors for drop-offs/stair edges. +Triggers emergency stop when any sensor reading exceeds threshold. + +Subscribed topics: + /saltybot/cliff_sensors (sensor_msgs/LaserScan) - IR range array + +Published topics: + /saltybot/cliff_detected (std_msgs/Bool) - Cliff/drop-off detected +""" + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import LaserScan +from std_msgs.msg import Bool + + +class CliffDetectorNode(Node): + """ROS2 node for cliff and drop-off detection.""" + + def __init__(self): + super().__init__("cliff_detector") + + # Parameters + self.declare_parameter("cliff_threshold", 0.5) + self.declare_parameter("debounce_count", 3) + self.declare_parameter("min_sensors", 2) + + self.cliff_threshold = self.get_parameter("cliff_threshold").value + self.debounce_count = self.get_parameter("debounce_count").value + self.min_sensors = self.get_parameter("min_sensors").value + + # State tracking + self.cliff_detected = False + self.consecutive_detections = 0 + + # Subscription to cliff sensors + self.sub_cliff = self.create_subscription( + LaserScan, "/saltybot/cliff_sensors", self._on_cliff_sensors, 10 + ) + + # Publisher for cliff detection + self.pub_cliff = self.create_publisher(Bool, "/saltybot/cliff_detected", 10) + + self.get_logger().info( + f"Cliff detector initialized. " + f"Threshold: {self.cliff_threshold}m, " + f"Debounce: {self.debounce_count} frames, " + f"Min sensors: {self.min_sensors}" + ) + + def _on_cliff_sensors(self, msg: LaserScan) -> None: + """Process cliff sensor readings.""" + # Filter out NaN/inf values and check against threshold + valid_ranges = [r for r in msg.ranges if 0 < r < float('inf')] + + if len(valid_ranges) == 0: + # No valid readings - assume safe + self._update_cliff_state(False) + return + + # Count how many sensors exceed threshold + exceeding_sensors = sum(1 for r in valid_ranges if r > self.cliff_threshold) + + # Cliff detected if min_sensors or more exceed threshold + cliff_detected_now = exceeding_sensors >= self.min_sensors + + if cliff_detected_now: + self.consecutive_detections += 1 + if self.consecutive_detections >= self.debounce_count: + self._update_cliff_state(True) + else: + self.consecutive_detections = 0 + self._update_cliff_state(False) + + def _update_cliff_state(self, detected: bool) -> None: + """Update cliff state and publish.""" + if detected != self.cliff_detected: + self.cliff_detected = detected + if detected: + self.get_logger().warn("CLIFF DETECTED! Emergency stop triggered!") + else: + self.get_logger().info("Cliff cleared, continuing operation") + + # Always publish current state + msg = Bool(data=self.cliff_detected) + self.pub_cliff.publish(msg) + + +def main(args=None): + rclpy.init(args=args) + node = CliffDetectorNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/setup.cfg b/jetson/ros2_ws/src/saltybot_cliff_detector/setup.cfg new file mode 100644 index 0000000..bb21d74 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cliff_detector/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_cliff_detector + +[install] +install_scripts=$base/lib/saltybot_cliff_detector diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/setup.py b/jetson/ros2_ws/src/saltybot_cliff_detector/setup.py new file mode 100644 index 0000000..556342d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cliff_detector/setup.py @@ -0,0 +1,24 @@ +from setuptools import setup, find_packages + +setup( + name='saltybot_cliff_detector', + version='0.1.0', + packages=find_packages(), + data_files=[ + ('share/ament_index/resource_index/packages', ['resource/saltybot_cliff_detector']), + ('share/saltybot_cliff_detector', ['package.xml']), + ('share/saltybot_cliff_detector/config', ['config/cliff_detector_config.yaml']), + ('share/saltybot_cliff_detector/launch', ['launch/cliff_detector.launch.py']), + ], + install_requires=['setuptools'], + zip_safe=True, + author='SaltyLab Controls', + author_email='sl-controls@saltylab.local', + description='Cliff and drop-off detection safety node for SaltyBot', + license='MIT', + entry_points={ + 'console_scripts': [ + 'cliff_detector_node=saltybot_cliff_detector.cliff_detector_node:main', + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/test/__init__.py b/jetson/ros2_ws/src/saltybot_cliff_detector/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_cliff_detector/test/test_cliff_detector.py b/jetson/ros2_ws/src/saltybot_cliff_detector/test/test_cliff_detector.py new file mode 100644 index 0000000..a7ecfe3 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_cliff_detector/test/test_cliff_detector.py @@ -0,0 +1,205 @@ +"""Tests for cliff detector node.""" + +import pytest +from sensor_msgs.msg import LaserScan +from std_msgs.msg import Bool +import rclpy + +from saltybot_cliff_detector.cliff_detector_node import CliffDetectorNode + + +@pytest.fixture +def rclpy_fixture(): + rclpy.init() + yield + rclpy.shutdown() + + +@pytest.fixture +def node(rclpy_fixture): + node = CliffDetectorNode() + yield node + node.destroy_node() + + +class TestInit: + def test_node_initialization(self, node): + assert node.cliff_threshold == 0.5 + assert node.debounce_count == 3 + assert node.min_sensors == 2 + assert node.cliff_detected is False + assert node.consecutive_detections == 0 + + +class TestCliffDetection: + def test_cliff_threshold_exceeded(self, node): + """Test detection when sensor exceeds threshold.""" + msg = LaserScan() + msg.ranges = [0.3, 0.6, 0.4] # 1 sensor exceeds threshold + + node._on_cliff_sensors(msg) + assert node.consecutive_detections == 1 + + def test_multiple_sensors_exceed(self, node): + """Test detection with multiple sensors exceeding threshold.""" + msg = LaserScan() + msg.ranges = [0.6, 0.7, 0.4] # 2 sensors exceed threshold + + node._on_cliff_sensors(msg) + assert node.consecutive_detections == 1 + + def test_debounce_threshold(self, node): + """Test debouncing requires multiple consecutive detections.""" + node.cliff_threshold = 0.5 + node.debounce_count = 3 + node.min_sensors = 1 + + msg = LaserScan() + msg.ranges = [0.6, 0.3, 0.4] # 1 exceeds + + # First detection + node._on_cliff_sensors(msg) + assert node.cliff_detected is False + assert node.consecutive_detections == 1 + + # Second detection + node._on_cliff_sensors(msg) + assert node.cliff_detected is False + assert node.consecutive_detections == 2 + + # Third detection - should trigger + node._on_cliff_sensors(msg) + assert node.cliff_detected is True + assert node.consecutive_detections == 3 + + def test_debounce_reset_on_clear(self, node): + """Test debounce counter resets when cliff clears.""" + node.debounce_count = 2 + node.min_sensors = 1 + + msg_cliff = LaserScan() + msg_cliff.ranges = [0.6, 0.3] + + msg_clear = LaserScan() + msg_clear.ranges = [0.3, 0.2] + + # Detection + node._on_cliff_sensors(msg_cliff) + assert node.consecutive_detections == 1 + + # Clear + node._on_cliff_sensors(msg_clear) + assert node.consecutive_detections == 0 + assert node.cliff_detected is False + + def test_min_sensors_voting(self, node): + """Test minimum sensor threshold (majority voting).""" + node.min_sensors = 2 + node.debounce_count = 1 + + msg = LaserScan() + msg.ranges = [0.6, 0.3, 0.4] # Only 1 exceeds + + node._on_cliff_sensors(msg) + assert node.cliff_detected is False + + msg.ranges = [0.6, 0.7, 0.4] # 2 exceed + node._on_cliff_sensors(msg) + assert node.cliff_detected is True + + def test_nan_values_filtered(self, node): + """Test that NaN/inf values are filtered out.""" + node.debounce_count = 1 + node.min_sensors = 1 + + msg = LaserScan() + msg.ranges = [float('nan'), 0.6, float('inf')] + + node._on_cliff_sensors(msg) + # Only 0.6 is valid and exceeds threshold + assert node.cliff_detected is True + + def test_empty_ranges(self, node): + """Test behavior with empty or all-invalid ranges.""" + msg = LaserScan() + msg.ranges = [] + + node._on_cliff_sensors(msg) + assert node.cliff_detected is False + + msg.ranges = [float('nan'), float('inf')] + node._on_cliff_sensors(msg) + assert node.cliff_detected is False + + +class TestStateTransitions: + def test_cliff_to_clear_transition(self, node): + """Test state transition from cliff to clear.""" + node.debounce_count = 1 + node.min_sensors = 1 + + # Trigger cliff + msg_cliff = LaserScan() + msg_cliff.ranges = [0.6, 0.3] + node._on_cliff_sensors(msg_cliff) + + assert node.cliff_detected is True + + # Clear cliff + msg_clear = LaserScan() + msg_clear.ranges = [0.3, 0.2] + node._on_cliff_sensors(msg_clear) + + assert node.cliff_detected is False + + def test_clear_to_cliff_transition(self, node): + """Test state transition from clear to cliff.""" + node.debounce_count = 1 + node.min_sensors = 1 + + # Start clear + assert node.cliff_detected is False + + # Trigger cliff + msg_cliff = LaserScan() + msg_cliff.ranges = [0.6, 0.3] + node._on_cliff_sensors(msg_cliff) + + assert node.cliff_detected is True + + +class TestEdgeCases: + def test_single_sensor_exceeds(self, node): + """Test with single sensor exceeding threshold.""" + node.min_sensors = 1 + node.debounce_count = 1 + + msg = LaserScan() + msg.ranges = [0.6] + + node._on_cliff_sensors(msg) + assert node.cliff_detected is True + + def test_zero_readings(self, node): + """Test that zero readings are filtered (invalid).""" + msg = LaserScan() + msg.ranges = [0.0, 0.6, 0.0] # Only 0.6 is valid + + node._on_cliff_sensors(msg) + # 0.6 exceeds 0.5, but need 2 sensors with min_sensors=2 + assert node.cliff_detected is False + + def test_threshold_boundary(self, node): + """Test boundary condition at exact threshold.""" + node.debounce_count = 1 + node.min_sensors = 1 + + msg = LaserScan() + msg.ranges = [0.5, 0.3] # Exactly at threshold (not exceeding) + + node._on_cliff_sensors(msg) + assert node.cliff_detected is False + + msg.ranges = [0.50001, 0.3] # Just above threshold + node._on_cliff_sensors(msg) + assert node.cliff_detected is True