feat: RPLIDAR safety zone detector (Issue #575)

Add saltybot_safety_zone — ROS2 Python node that processes the RPLIDAR
A1M8 /scan into three concentric 360° safety zones, latches an e-stop
when DANGER is detected in the forward arc, and overrides /cmd_vel to
zero while the latch is active.

Zone thresholds (default):
  DANGER  < 0.30 m — latching e-stop in forward arc
  WARN    < 1.00 m — advisory (published in sector data)
  CLEAR   otherwise

Sector grid:
  36 sectors of 10° each (sector 0 = robot forward, CCW positive).
  Per-sector: angle_deg, zone, min_range_m, in_forward_arc flag.

E-stop behaviour:
  - Latches after estop_debounce_frames (2) consecutive DANGER scans
    in the forward arc (configurable ±30°, or all-arcs mode).
  - While latched: zero Twist published to /cmd_vel every scan + every
    incoming /cmd_vel_input message is blocked.
  - Clear only via service (obstacle must be gone):
    /saltybot/safety_zone/clear_estop  (std_srvs/Trigger)

Published topics:
  /saltybot/safety_zone          String/JSON  every scan
    — per-sector {sector, angle_deg, zone, min_range_m, forward}
    — estop_active, estop_reason, danger_sectors[], warn_sectors[]
  /saltybot/safety_zone/status   String/JSON  10 Hz
    — forward_zone, closest_obstacle_m, danger/warn counts
  /cmd_vel                       Twist        zero when e-stopped

Subscribed topics:
  /scan           LaserScan  — RPLIDAR A1M8
  /cmd_vel_input  Twist      — upstream velocity (pass-through / block)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-14 11:54:52 -04:00
parent 35440b7463
commit 131d85a0d3
8 changed files with 490 additions and 0 deletions

View File

@ -0,0 +1,44 @@
# safety_zone_params.yaml — RPLIDAR 360° safety zone detector (Issue #575)
#
# Node: saltybot_safety_zone
#
# Usage:
# ros2 launch saltybot_safety_zone safety_zone.launch.py
#
# Zone thresholds:
# DANGER < danger_range_m → latching e-stop (if in forward arc)
# WARN < warn_range_m → caution / speed reduction (advisory)
# CLEAR otherwise
#
# E-stop clear:
# ros2 service call /saltybot/safety_zone/clear_estop std_srvs/srv/Trigger
safety_zone:
ros__parameters:
# ── Zone thresholds ──────────────────────────────────────────────────────
danger_range_m: 0.30 # m — obstacle closer than this → DANGER
warn_range_m: 1.00 # m — obstacle closer than this → WARN
# ── Sector grid ──────────────────────────────────────────────────────────
n_sectors: 36 # 360 / 36 = 10° per sector
# ── E-stop trigger arc ───────────────────────────────────────────────────
forward_arc_deg: 60.0 # ±30° from robot forward (+X / 0°)
estop_all_arcs: false # true = any sector triggers (360° e-stop)
estop_debounce_frames: 2 # consecutive DANGER scans before latch
# ── Range validity ───────────────────────────────────────────────────────
min_valid_range_m: 0.05 # ignore readings closer than this (sensor noise)
max_valid_range_m: 12.00 # RPLIDAR A1M8 nominal max range
# ── Publish rate ─────────────────────────────────────────────────────────
publish_rate: 10.0 # Hz — /saltybot/safety_zone/status publish rate
# /saltybot/safety_zone publishes every scan
# ── cmd_vel topics ───────────────────────────────────────────────────────
# Safety zone node intercepts cmd_vel from upstream, overrides to zero on estop.
# Typical chain:
# cmd_vel_mux → /cmd_vel_safe → [safety_zone: cmd_vel_input] → /cmd_vel → STM32
cmd_vel_input_topic: /cmd_vel_input # upstream velocity (remap as needed)
cmd_vel_output_topic: /cmd_vel # downstream (to STM32 bridge)

View File

@ -0,0 +1,28 @@
"""Launch file for saltybot_safety_zone (Issue #575)."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
config = os.path.join(
get_package_share_directory("saltybot_safety_zone"),
"config",
"safety_zone_params.yaml",
)
safety_zone_node = Node(
package="saltybot_safety_zone",
executable="safety_zone",
name="safety_zone",
parameters=[config],
remappings=[
# Remap if the upstream mux publishes to a different topic:
# ("/cmd_vel_input", "/cmd_vel_safe"),
],
output="screen",
)
return LaunchDescription([safety_zone_node])

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_safety_zone</name>
<version>0.1.0</version>
<description>
RPLIDAR 360° safety zone detector (Issue #575).
Divides the full 360° scan into 10° sectors, classifies each as
DANGER/WARN/CLEAR, latches an e-stop on DANGER in the forward arc,
overrides /cmd_vel to zero while latched, and exposes a service to clear
the latch once obstacles are gone.
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
safety_zone_node.py RPLIDAR 360° safety zone detector (Issue #575).
Processes /scan into three concentric safety zones and publishes per-sector
classification, a latching e-stop on DANGER in the forward arc, and a zero
cmd_vel override while the e-stop is active.
Zone thresholds (configurable):
DANGER < danger_range_m (default 0.30 m) immediate halt
WARN < warn_range_m (default 1.00 m) caution / slow-down
CLEAR otherwise
Sectors:
360° is divided into N_SECTORS (default 36) sectors of 10° each.
Sector 0 is centred on 0° (robot forward = base_link +X axis).
Sector indices increase counter-clockwise (ROS convention).
E-stop behaviour:
1. If any sector in the forward arc has DANGER for >= estop_debounce_frames
consecutive scans, the e-stop latches.
2. While latched:
- A zero Twist is published to /cmd_vel every scan cycle.
- Incoming cmd_vel_input messages are silently dropped.
3. The latch is cleared ONLY via the ROS service:
/saltybot/safety_zone/clear_estop (std_srvs/Trigger)
and only if no DANGER sectors remain at the time of the call.
Published topics:
/saltybot/safety_zone (std_msgs/String) JSON per-sector data
/saltybot/safety_zone/status (std_msgs/String) JSON summary + e-stop state
/cmd_vel (geometry_msgs/Twist) zero override when e-stopped
Subscribed topics:
/scan (sensor_msgs/LaserScan) RPLIDAR data
/cmd_vel_input (geometry_msgs/Twist) upstream cmd_vel (pass-through or block)
Services:
/saltybot/safety_zone/clear_estop (std_srvs/Trigger)
"""
import json
import math
import threading
from typing import List, Optional, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from std_msgs.msg import String
from std_srvs.srv import Trigger
# Zone levels (int)
CLEAR = 0
WARN = 1
DANGER = 2
_ZONE_NAME = {CLEAR: "CLEAR", WARN: "WARN", DANGER: "DANGER"}
class SafetyZoneNode(Node):
"""360° RPLIDAR safety zone detector with latching e-stop."""
def __init__(self) -> None:
super().__init__("safety_zone")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("danger_range_m", 0.30)
self.declare_parameter("warn_range_m", 1.00)
self.declare_parameter("n_sectors", 36) # 360/36 = 10° each
self.declare_parameter("forward_arc_deg", 60.0) # ±30° e-stop window
self.declare_parameter("estop_all_arcs", False) # true = any sector triggers
self.declare_parameter("estop_debounce_frames", 2) # consecutive DANGER frames
self.declare_parameter("min_valid_range_m", 0.05) # ignore closer readings
self.declare_parameter("max_valid_range_m", 12.0) # RPLIDAR A1M8 max
self.declare_parameter("publish_rate", 10.0) # Hz — sector publish rate
self.declare_parameter("cmd_vel_input_topic", "/cmd_vel_input")
self.declare_parameter("cmd_vel_output_topic", "/cmd_vel")
self._danger_r = self.get_parameter("danger_range_m").value
self._warn_r = self.get_parameter("warn_range_m").value
self._n_sectors = self.get_parameter("n_sectors").value
self._fwd_arc = self.get_parameter("forward_arc_deg").value
self._all_arcs = self.get_parameter("estop_all_arcs").value
self._debounce = self.get_parameter("estop_debounce_frames").value
self._min_r = self.get_parameter("min_valid_range_m").value
self._max_r = self.get_parameter("max_valid_range_m").value
self._pub_rate = self.get_parameter("publish_rate").value
_in_topic = self.get_parameter("cmd_vel_input_topic").value
_out_topic = self.get_parameter("cmd_vel_output_topic").value
self._sector_deg = 360.0 / self._n_sectors # degrees per sector
# Precompute which sector indices are in the forward arc
self._forward_sectors = self._compute_forward_sectors()
# ── State ─────────────────────────────────────────────────────────────
self._lock = threading.Lock()
self._sector_zones: List[int] = [CLEAR] * self._n_sectors
self._sector_ranges: List[float] = [float("inf")] * self._n_sectors
self._estop_latched = False
self._estop_reason = ""
self._danger_frame_count = 0 # consecutive DANGER frames in forward arc
self._scan_count = 0
self._last_scan_stamp: Optional[float] = None
# ── Subscriptions ─────────────────────────────────────────────────────
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
self._scan_sub = self.create_subscription(
LaserScan, "/scan", self._on_scan, sensor_qos
)
self._cmd_in_sub = self.create_subscription(
Twist, _in_topic, self._on_cmd_vel_input, 10
)
# ── Publishers ────────────────────────────────────────────────────────
self._zone_pub = self.create_publisher(String, "/saltybot/safety_zone", 10)
self._status_pub = self.create_publisher(String, "/saltybot/safety_zone/status", 10)
self._cmd_pub = self.create_publisher(Twist, _out_topic, 10)
# ── Service ───────────────────────────────────────────────────────────
self._clear_srv = self.create_service(
Trigger,
"/saltybot/safety_zone/clear_estop",
self._handle_clear_estop,
)
# ── Periodic status publish ───────────────────────────────────────────
self.create_timer(1.0 / self._pub_rate, self._publish_status)
self.get_logger().info(
f"SafetyZoneNode ready — "
f"danger={self._danger_r}m warn={self._warn_r}m "
f"sectors={self._n_sectors}({self._sector_deg:.0f}°each) "
f"fwd_arc=±{self._fwd_arc/2:.0f}° "
f"debounce={self._debounce}"
)
# ── Sector geometry ───────────────────────────────────────────────────────
def _compute_forward_sectors(self) -> List[int]:
"""Return sector indices that lie within the forward arc."""
half = self._fwd_arc / 2.0
fwd = []
for i in range(self._n_sectors):
centre_deg = i * self._sector_deg
# Normalise to (180, 180]
if centre_deg > 180.0:
centre_deg -= 360.0
if abs(centre_deg) <= half:
fwd.append(i)
return fwd
@staticmethod
def _angle_to_sector(angle_rad: float, n_sectors: int) -> int:
"""Convert a bearing (rad) to sector index [0, n_sectors)."""
deg = math.degrees(angle_rad) % 360.0
return int(deg / (360.0 / n_sectors)) % n_sectors
# ── Scan processing ───────────────────────────────────────────────────────
def _on_scan(self, msg: LaserScan) -> None:
"""Process incoming LaserScan into per-sector zone classification."""
n = len(msg.ranges)
if n == 0:
return
# Accumulate min range per sector
sector_min = [float("inf")] * self._n_sectors
for i, r in enumerate(msg.ranges):
if not math.isfinite(r) or r < self._min_r or r > self._max_r:
continue
angle_rad = msg.angle_min + i * msg.angle_increment
s = self._angle_to_sector(angle_rad, self._n_sectors)
if r < sector_min[s]:
sector_min[s] = r
# Classify each sector
sector_zones = []
for r in sector_min:
if r < self._danger_r:
sector_zones.append(DANGER)
elif r < self._warn_r:
sector_zones.append(WARN)
else:
sector_zones.append(CLEAR)
with self._lock:
self._sector_zones = sector_zones
self._sector_ranges = sector_min
self._scan_count += 1
self._last_scan_stamp = self.get_clock().now().nanoseconds * 1e-9
# E-stop detection
if not self._estop_latched:
danger_in_trigger = self._has_danger_in_trigger_arc(sector_zones)
if danger_in_trigger:
self._danger_frame_count += 1
if self._danger_frame_count >= self._debounce:
self._estop_latched = True
danger_sectors = [
i for i in (range(self._n_sectors) if self._all_arcs
else self._forward_sectors)
if sector_zones[i] == DANGER
]
self._estop_reason = (
f"DANGER in sectors {danger_sectors} "
f"(min range {min(sector_min[i] for i in danger_sectors if math.isfinite(sector_min[i])):.2f}m)"
)
self.get_logger().error(
f"E-STOP LATCHED: {self._estop_reason}"
)
else:
self._danger_frame_count = 0
# Publish zero cmd_vel immediately if e-stopped (time-critical)
if self._estop_latched:
self._cmd_pub.publish(Twist())
# Publish sector data every scan
self._publish_sectors(sector_zones, sector_min)
def _has_danger_in_trigger_arc(self, zones: List[int]) -> bool:
"""True if any DANGER sector exists in the trigger arc."""
if self._all_arcs:
return any(z == DANGER for z in zones)
return any(zones[i] == DANGER for i in self._forward_sectors)
# ── cmd_vel pass-through / override ──────────────────────────────────────
def _on_cmd_vel_input(self, msg: Twist) -> None:
"""Pass cmd_vel through unless e-stop is latched."""
with self._lock:
latched = self._estop_latched
if latched:
# Override: publish zero (already done in scan callback, belt-and-braces)
self._cmd_pub.publish(Twist())
else:
self._cmd_pub.publish(msg)
# ── Service: clear e-stop ─────────────────────────────────────────────────
def _handle_clear_estop(
self, request: Trigger.Request, response: Trigger.Response
) -> Trigger.Response:
with self._lock:
if not self._estop_latched:
response.success = True
response.message = "E-stop was not active."
return response
# Only allow clear if no current DANGER sectors
if self._has_danger_in_trigger_arc(self._sector_zones):
response.success = False
response.message = (
"Cannot clear: DANGER sectors still present. "
"Remove obstacle first."
)
return response
self._estop_latched = False
self._estop_reason = ""
self._danger_frame_count = 0
self.get_logger().warning("E-stop cleared via service.")
response.success = True
response.message = "E-stop cleared. Resuming normal operation."
return response
# ── Publishers ────────────────────────────────────────────────────────────
def _publish_sectors(self, zones: List[int], ranges: List[float]) -> None:
"""Publish per-sector JSON on /saltybot/safety_zone."""
sectors_data = []
for i, (zone, r) in enumerate(zip(zones, ranges)):
centre_deg = i * self._sector_deg
sectors_data.append({
"sector": i,
"angle_deg": round(centre_deg, 1),
"zone": _ZONE_NAME[zone],
"min_range_m": round(r, 3) if math.isfinite(r) else None,
"forward": i in self._forward_sectors,
})
payload = {
"sectors": sectors_data,
"estop_active": self._estop_latched,
"estop_reason": self._estop_reason,
"danger_sectors": [i for i, z in enumerate(zones) if z == DANGER],
"warn_sectors": [i for i, z in enumerate(zones) if z == WARN],
}
self._zone_pub.publish(String(data=json.dumps(payload)))
def _publish_status(self) -> None:
"""10 Hz JSON summary on /saltybot/safety_zone/status."""
with self._lock:
zones = list(self._sector_zones)
ranges = list(self._sector_ranges)
latched = self._estop_latched
reason = self._estop_reason
scans = self._scan_count
danger_cnt = sum(1 for z in zones if z == DANGER)
warn_cnt = sum(1 for z in zones if z == WARN)
fwd_zone = max(
(zones[i] for i in self._forward_sectors),
default=CLEAR,
)
# Closest obstacle in any direction
all_finite = [r for r in ranges if math.isfinite(r)]
closest_m = min(all_finite) if all_finite else None
status = {
"estop_active": latched,
"estop_reason": reason,
"forward_zone": _ZONE_NAME[fwd_zone],
"danger_sector_count": danger_cnt,
"warn_sector_count": warn_cnt,
"closest_obstacle_m": round(closest_m, 3) if closest_m is not None else None,
"scan_count": scans,
"forward_sector_ids": self._forward_sectors,
}
self._status_pub.publish(String(data=json.dumps(status)))
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = SafetyZoneNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_safety_zone
[install]
install_scripts=$base/lib/saltybot_safety_zone

View File

@ -0,0 +1,30 @@
import os
from glob import glob
from setuptools import setup
package_name = "saltybot_safety_zone"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages",
["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-perception",
maintainer_email="sl-perception@saltylab.local",
description="RPLIDAR 360° safety zone detector with latching e-stop (Issue #575)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"safety_zone = saltybot_safety_zone.safety_zone_node:main",
],
},
)