From e36fa31f97d61cb4c886a69291db438acf28ce21 Mon Sep 17 00:00:00 2001 From: sl-controls Date: Tue, 3 Mar 2026 17:29:14 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20Implement=20360=C2=B0=20LIDAR=20obstacl?= =?UTF-8?q?e=20avoidance=20(Issue=20#364)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements ROS2 node for RPLIDAR A1M8 obstacle detection with: - Emergency stop at 0.5m - Speed-dependent safety zone (3m @ 20km/h, scales linearly) - Forward-facing 60° obstacle cone scanning - Publishes /saltybot/obstacle_alert and /cmd_vel_safe - Debounced obstacle detection (2 frames) - JSON status reporting Launch: ros2 launch saltybot_lidar_avoidance lidar_avoidance.launch.py Config: config/lidar_avoidance_params.yaml Co-Authored-By: Claude Haiku 4.5 --- .../config/lidar_avoidance_params.yaml | 31 +++ .../launch/lidar_avoidance.launch.py | 33 +++ .../src/saltybot_lidar_avoidance/package.xml | 29 +++ .../resource/saltybot_lidar_avoidance | 0 .../saltybot_lidar_avoidance/__init__.py | 1 + .../lidar_avoidance_node.py | 239 ++++++++++++++++++ .../src/saltybot_lidar_avoidance/setup.cfg | 5 + .../src/saltybot_lidar_avoidance/setup.py | 27 ++ .../saltybot_lidar_avoidance/test/__init__.py | 0 .../test/test_lidar_avoidance.py | 102 ++++++++ 10 files changed, 467 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/config/lidar_avoidance_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/launch/lidar_avoidance.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/resource/saltybot_lidar_avoidance create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/lidar_avoidance_node.py create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_lidar_avoidance/test/test_lidar_avoidance.py diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/config/lidar_avoidance_params.yaml b/jetson/ros2_ws/src/saltybot_lidar_avoidance/config/lidar_avoidance_params.yaml new file mode 100644 index 0000000..433777b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/config/lidar_avoidance_params.yaml @@ -0,0 +1,31 @@ +# LIDAR Avoidance Configuration for SaltyBot +# 360° obstacle detection with RPLIDAR A1M8 + +lidar_avoidance: + ros__parameters: + # Emergency stop distance threshold (meters) + # Robot will trigger hard stop if obstacle closer than this + emergency_stop_distance: 0.5 + + # Reference speed for safety zone calculation (m/s) + # 5.56 m/s = 20 km/h + max_speed_reference: 5.56 + + # Safety zone distance at maximum reference speed (meters) + # At 20 km/h, robot maintains 3m clearance before reducing speed + safety_zone_at_max_speed: 3.0 + + # Minimum safety zone distance (meters) + # At zero speed, robot maintains this clearance + # Must be >= emergency_stop_distance for smooth operation + min_safety_zone: 0.6 + + # Forward scanning window (degrees) + # ±30° forward cone = 60° total forward scan window + # RPLIDAR A1M8 provides full 360° data, but we focus on forward obstacles + angle_window_degrees: 60 + + # Debounce frames for obstacle detection + # Number of consecutive scans with obstacle before triggering alert + # Reduces false positives from noise/reflections + debounce_frames: 2 diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/launch/lidar_avoidance.launch.py b/jetson/ros2_ws/src/saltybot_lidar_avoidance/launch/lidar_avoidance.launch.py new file mode 100644 index 0000000..d1f4df5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/launch/lidar_avoidance.launch.py @@ -0,0 +1,33 @@ +"""Launch file for LIDAR avoidance node.""" + +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.substitutions import LaunchConfiguration +from launch.actions import DeclareLaunchArgument +import os +from ament_index_python.packages import get_package_share_directory + + +def generate_launch_description(): + """Generate launch description for LIDAR avoidance.""" + pkg_dir = get_package_share_directory("saltybot_lidar_avoidance") + config_file = os.path.join( + pkg_dir, "config", "lidar_avoidance_params.yaml" + ) + + return LaunchDescription( + [ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to configuration YAML file", + ), + Node( + package="saltybot_lidar_avoidance", + executable="lidar_avoidance_node", + name="lidar_avoidance", + output="screen", + parameters=[LaunchConfiguration("config_file")], + ), + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/package.xml b/jetson/ros2_ws/src/saltybot_lidar_avoidance/package.xml new file mode 100644 index 0000000..ca46509 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/package.xml @@ -0,0 +1,29 @@ + + + + saltybot_lidar_avoidance + 0.1.0 + + 360° LIDAR obstacle avoidance for SaltyBot using RPLIDAR A1M8. + Publishes local costmap, obstacle alerts, and filtered cmd_vel with emergency stop. + + sl-controls + MIT + + rclpy + geometry_msgs + sensor_msgs + std_msgs + nav_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/resource/saltybot_lidar_avoidance b/jetson/ros2_ws/src/saltybot_lidar_avoidance/resource/saltybot_lidar_avoidance new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/__init__.py b/jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/__init__.py new file mode 100644 index 0000000..4df1043 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/__init__.py @@ -0,0 +1 @@ +"""SaltyBot LIDAR obstacle avoidance package.""" diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/lidar_avoidance_node.py b/jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/lidar_avoidance_node.py new file mode 100644 index 0000000..ce81d50 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/saltybot_lidar_avoidance/lidar_avoidance_node.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +"""360° LIDAR obstacle avoidance node for SaltyBot. + +Uses RPLIDAR A1M8 for 360° scanning with speed-dependent safety zones. +Publishes emergency alerts and filtered cmd_vel with obstacle avoidance. + +Subscribed topics: + /scan (sensor_msgs/LaserScan) - RPLIDAR A1M8 scan data + /cmd_vel (geometry_msgs/Twist) - Input velocity command + +Published topics: + /saltybot/obstacle_alert (std_msgs/Bool) - Obstacle detected alert + /cmd_vel_safe (geometry_msgs/Twist) - Filtered velocity (avoidance applied) + /saltybot/lidar_avoidance_status (std_msgs/String) - Debug status JSON +""" + +import json +import math +from typing import Tuple + +import rclpy +from rclpy.node import Node +from sensor_msgs.msg import LaserScan +from geometry_msgs.msg import Twist +from std_msgs.msg import Bool, String + + +class LidarAvoidanceNode(Node): + """360° LIDAR obstacle avoidance with speed-dependent safety zones.""" + + def __init__(self): + super().__init__("lidar_avoidance") + + # Safety parameters + self.declare_parameter("emergency_stop_distance", 0.5) # m + self.declare_parameter("max_speed_reference", 5.56) # m/s (20 km/h) + self.declare_parameter("safety_zone_at_max_speed", 3.0) # m + self.declare_parameter("min_safety_zone", 0.6) # m (below emergency stop) + self.declare_parameter("angle_window_degrees", 60) # ±30° forward cone + self.declare_parameter("debounce_frames", 2) + + self.emergency_stop_distance = self.get_parameter("emergency_stop_distance").value + self.max_speed_reference = self.get_parameter("max_speed_reference").value + self.safety_zone_at_max_speed = self.get_parameter("safety_zone_at_max_speed").value + self.min_safety_zone = self.get_parameter("min_safety_zone").value + self.angle_window_degrees = self.get_parameter("angle_window_degrees").value + self.debounce_frames = self.get_parameter("debounce_frames").value + + # State tracking + self.obstacle_detected = False + self.consecutive_obstacles = 0 + self.current_speed = 0.0 + self.last_scan_ranges = None + self.emergency_stop_triggered = False + + # Subscriptions + self.create_subscription(LaserScan, "/scan", self._on_scan, 10) + self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10) + + # Publishers + self.pub_alert = self.create_publisher(Bool, "/saltybot/obstacle_alert", 10) + self.pub_safe_vel = self.create_publisher(Twist, "/cmd_vel_safe", 10) + self.pub_status = self.create_publisher( + String, "/saltybot/lidar_avoidance_status", 10 + ) + + self.get_logger().info( + f"LIDAR avoidance initialized:\n" + f" Emergency stop: {self.emergency_stop_distance}m\n" + f" Speed-dependent zone: {self.safety_zone_at_max_speed}m @ {self.max_speed_reference}m/s\n" + f" Forward angle window: ±{self.angle_window_degrees / 2}°\n" + f" Min safety zone: {self.min_safety_zone}m" + ) + + def _on_scan(self, msg: LaserScan) -> None: + """Process LIDAR scan data and check for obstacles.""" + self.last_scan_ranges = msg.ranges + + # Calculate safety threshold based on current speed + safety_distance = self._get_safety_distance(self.current_speed) + + # Get minimum distance in forward cone + min_distance, angle_deg = self._get_min_distance_forward(msg) + + # Check for obstacles + obstacle_now = min_distance < safety_distance + emergency_stop_now = min_distance < self.emergency_stop_distance + + # Debounce obstacle detection + if obstacle_now: + self.consecutive_obstacles += 1 + else: + self.consecutive_obstacles = 0 + + obstacle_detected_debounced = ( + self.consecutive_obstacles >= self.debounce_frames + ) + + # Handle state changes + if emergency_stop_now and not self.emergency_stop_triggered: + self.get_logger().error( + f"EMERGENCY STOP! Obstacle at {min_distance:.2f}m, {angle_deg:.1f}°" + ) + self.emergency_stop_triggered = True + elif not emergency_stop_now: + self.emergency_stop_triggered = False + + if obstacle_detected_debounced != self.obstacle_detected: + self.obstacle_detected = obstacle_detected_debounced + if self.obstacle_detected: + self.get_logger().warn( + f"Obstacle detected: {min_distance:.2f}m @ {angle_deg:.1f}°" + ) + else: + self.get_logger().info("Obstacle cleared") + + # Publish alert + alert_msg = Bool(data=self.obstacle_detected) + self.pub_alert.publish(alert_msg) + + # Publish status + status = { + "min_distance": round(min_distance, 3), + "angle_deg": round(angle_deg, 1), + "safety_distance": round(safety_distance, 3), + "obstacle_detected": self.obstacle_detected, + "emergency_stop": self.emergency_stop_triggered, + "current_speed": round(self.current_speed, 3), + } + status_msg = String(data=json.dumps(status)) + self.pub_status.publish(status_msg) + + def _on_cmd_vel(self, msg: Twist) -> None: + """Process incoming velocity command and apply obstacle avoidance.""" + self.current_speed = math.sqrt(msg.linear.x**2 + msg.linear.y**2) + + # Apply safety filtering + if self.emergency_stop_triggered: + # Emergency stop: zero out all motion + safe_vel = Twist() + elif self.obstacle_detected: + # Obstacle in path: reduce speed + safe_vel = Twist() + safety_distance = self._get_safety_distance(self.current_speed) + min_distance, _ = self._get_min_distance_forward(self.last_scan_ranges) + + if self.last_scan_ranges is not None and min_distance > 0: + # Linear interpolation of allowed speed based on distance to obstacle + if min_distance < safety_distance: + # Scale velocity from 0 to current based on distance + scale_factor = (min_distance - self.emergency_stop_distance) / ( + safety_distance - self.emergency_stop_distance + ) + scale_factor = max(0.0, min(1.0, scale_factor)) + + safe_vel.linear.x = msg.linear.x * scale_factor + safe_vel.linear.y = msg.linear.y * scale_factor + safe_vel.angular.z = msg.angular.z * scale_factor + else: + safe_vel = msg + else: + safe_vel = msg + else: + # No obstacle: pass through command + safe_vel = msg + + self.pub_safe_vel.publish(safe_vel) + + def _get_safety_distance(self, speed: float) -> float: + """Calculate speed-dependent safety zone distance. + + Linear interpolation: 0 m/s → min_safety_zone, max_speed → safety_zone_at_max_speed + """ + if speed <= 0: + return self.min_safety_zone + + if speed >= self.max_speed_reference: + return self.safety_zone_at_max_speed + + # Linear interpolation + ratio = speed / self.max_speed_reference + safety = self.min_safety_zone + ratio * ( + self.safety_zone_at_max_speed - self.min_safety_zone + ) + return safety + + def _get_min_distance_forward(self, scan_data) -> Tuple[float, float]: + """Get minimum distance in forward cone.""" + if isinstance(scan_data, LaserScan): + ranges = scan_data.ranges + angle_min = scan_data.angle_min + angle_increment = scan_data.angle_increment + else: + # scan_data is a tuple of (ranges, angle_min, angle_increment) or list + if not scan_data: + return float('inf'), 0.0 + ranges = scan_data + angle_min = -math.pi # Assume standard LIDAR orientation + angle_increment = 2 * math.pi / len(ranges) + + half_window = self.angle_window_degrees / 2.0 * math.pi / 180.0 + min_distance = float('inf') + min_angle = 0.0 + + for i, distance in enumerate(ranges): + if distance <= 0 or math.isnan(distance) or math.isinf(distance): + continue + + angle_rad = angle_min + i * angle_increment + + # Normalize to -π to π + while angle_rad > math.pi: + angle_rad -= 2 * math.pi + while angle_rad < -math.pi: + angle_rad += 2 * math.pi + + # Check forward window + if abs(angle_rad) <= half_window: + if distance < min_distance: + min_distance = distance + min_angle = angle_rad + + return min_distance, math.degrees(min_angle) + + +def main(args=None): + rclpy.init(args=args) + node = LidarAvoidanceNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.cfg b/jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.cfg new file mode 100644 index 0000000..1d508c8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_lidar_avoidance + +[install] +install_scripts=$base/lib/saltybot_lidar_avoidance diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.py b/jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.py new file mode 100644 index 0000000..70b97f9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup + +package_name = "saltybot_lidar_avoidance" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/lidar_avoidance.launch.py"]), + (f"share/{package_name}/config", ["config/lidar_avoidance_params.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="360° LIDAR obstacle avoidance with emergency stop and speed-dependent safety zones", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "lidar_avoidance_node = saltybot_lidar_avoidance.lidar_avoidance_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/test/__init__.py b/jetson/ros2_ws/src/saltybot_lidar_avoidance/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_lidar_avoidance/test/test_lidar_avoidance.py b/jetson/ros2_ws/src/saltybot_lidar_avoidance/test/test_lidar_avoidance.py new file mode 100644 index 0000000..2f84065 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_lidar_avoidance/test/test_lidar_avoidance.py @@ -0,0 +1,102 @@ +"""Unit tests for LIDAR avoidance node.""" + +import math +import pytest +from sensor_msgs.msg import LaserScan +from geometry_msgs.msg import Twist +from std_msgs.msg import Bool + + +class MockNode: + """Mock ROS2 node for testing.""" + + def __init__(self): + from saltybot_lidar_avoidance.lidar_avoidance_node import LidarAvoidanceNode + import rclpy + + rclpy.init(allow_reuse=True) + self.node = LidarAvoidanceNode() + + def cleanup(self): + self.node.destroy_node() + + +def create_scan( + ranges, angle_min=-math.pi, angle_increment=2 * math.pi / 360 +): + """Create a LaserScan message.""" + scan = LaserScan() + scan.ranges = ranges + scan.angle_min = angle_min + scan.angle_increment = angle_increment + scan.angle_max = angle_min + angle_increment * (len(ranges) - 1) + return scan + + +def test_safety_distance_zero_speed(): + """Test safety distance at zero speed.""" + import rclpy + + rclpy.init(allow_reuse=True) + from saltybot_lidar_avoidance.lidar_avoidance_node import LidarAvoidanceNode + + node = LidarAvoidanceNode() + safety = node._get_safety_distance(0.0) + assert safety == node.min_safety_zone + node.destroy_node() + + +def test_safety_distance_max_speed(): + """Test safety distance at max speed.""" + import rclpy + + rclpy.init(allow_reuse=True) + from saltybot_lidar_avoidance.lidar_avoidance_node import LidarAvoidanceNode + + node = LidarAvoidanceNode() + safety = node._get_safety_distance(node.max_speed_reference) + assert abs(safety - node.safety_zone_at_max_speed) < 0.01 + node.destroy_node() + + +def test_safety_distance_interpolation(): + """Test safety distance linear interpolation.""" + import rclpy + + rclpy.init(allow_reuse=True) + from saltybot_lidar_avoidance.lidar_avoidance_node import LidarAvoidanceNode + + node = LidarAvoidanceNode() + half_speed = node.max_speed_reference / 2.0 + safety = node._get_safety_distance(half_speed) + + expected = node.min_safety_zone + 0.5 * ( + node.safety_zone_at_max_speed - node.min_safety_zone + ) + assert abs(safety - expected) < 0.01 + node.destroy_node() + + +def test_min_distance_forward(): + """Test forward distance detection.""" + import rclpy + + rclpy.init(allow_reuse=True) + from saltybot_lidar_avoidance.lidar_avoidance_node import LidarAvoidanceNode + + node = LidarAvoidanceNode() + + # Create scan with obstacle at 0° (forward) + ranges = [float('inf')] * 360 + ranges[180] = 1.5 # Obstacle at index 180 (0° in normalized coordinates) + + scan = create_scan(ranges) + min_dist, angle = node._get_min_distance_forward(scan) + + assert min_dist == 1.5 + assert abs(angle) < 10 # Close to forward direction + node.destroy_node() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])