Merge pull request 'feat: RealSense obstacle detection (Issue #611)' (#623) from sl-perception/issue-611-obstacle-detect into main

This commit is contained in:
sl-jetson 2026-03-15 11:02:29 -04:00
commit 96d13052b4
12 changed files with 1243 additions and 0 deletions

View File

@ -0,0 +1,35 @@
obstacle_detect:
ros__parameters:
# ── Depth processing ──────────────────────────────────────────────────────
downsample_factor: 8 # stride-based downsample before processing
# 640x480 / 8 = 80x60 = 4800 pts (fast RANSAC)
# ── RANSAC ground plane fitting ───────────────────────────────────────────
ransac_n_iter: 50 # RANSAC iterations
ransac_inlier_m: 0.06 # inlier distance threshold (m)
# ── Obstacle height filter ────────────────────────────────────────────────
min_obstacle_h_m: 0.05 # min height above ground to count as obstacle (m)
# — avoids false positives from ground noise
max_obstacle_h_m: 0.80 # max height above ground to count as obstacle (m)
# — ignores ceiling / upper structure (D435i ~0.5m mount)
# ── Clustering ────────────────────────────────────────────────────────────
cluster_cell_m: 0.30 # 2D grid cell size for BFS clustering (m)
cluster_min_pts: 5 # minimum points for a cluster to be reported
max_obstacle_dist_m: 5.0 # discard obstacle points beyond this forward distance
# ── Alert / e-stop thresholds ─────────────────────────────────────────────
danger_dist_m: 0.40 # closest obstacle z < this → DANGER alert
warn_dist_m: 1.20 # closest obstacle z < this → WARN alert
# ── Safety zone integration ───────────────────────────────────────────────
# When enabled, DANGER obstacles publish zero Twist to depth_estop_topic.
# Wire this into the cmd_vel safety chain so safety_zone can latch e-stop.
# Typical chain: depth_estop_topic=/cmd_vel_input → safety_zone → /cmd_vel
depth_estop_enabled: false
depth_estop_topic: /cmd_vel_input
# ── Frame / topic configuration ───────────────────────────────────────────
camera_frame: camera_link # frame_id for MarkerArray and PointCloud2
marker_lifetime_s: 0.20 # Marker.lifetime — 0.2 s = 6 frames at 30 Hz

View File

@ -0,0 +1,29 @@
"""obstacle_detect.launch.py — RealSense depth obstacle detection (Issue #611)."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory('saltybot_obstacle_detect')
default_params = os.path.join(pkg_share, 'config', 'obstacle_detect_params.yaml')
params_arg = DeclareLaunchArgument(
'params_file',
default_value=default_params,
description='Path to obstacle_detect_params.yaml',
)
obstacle_detect_node = Node(
package='saltybot_obstacle_detect',
executable='obstacle_detect',
name='obstacle_detect',
output='screen',
parameters=[LaunchConfiguration('params_file')],
)
return LaunchDescription([params_arg, obstacle_detect_node])

View File

@ -0,0 +1,36 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_obstacle_detect</name>
<version>0.1.0</version>
<description>
RealSense D435i depth-based obstacle detection node (Issue #611).
RANSAC ground-plane fitting, 2D grid-BFS obstacle clustering, publishes
MarkerArray centroids (/saltybot/obstacles), PointCloud2 of non-ground
points (/saltybot/obstacles/cloud), and JSON alert (/saltybot/obstacles/alert)
with safety_zone e-stop integration via configurable Twist output.
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>visualization_msgs</depend>
<depend>cv_bridge</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,192 @@
"""
ground_plane.py RANSAC ground plane fitting for D435i depth images (Issue #611).
No ROS2 dependencies pure Python + numpy, fully unit-testable.
Algorithm
RANSAC over 3D point cloud in camera optical frame:
+X = right, +Y = down, +Z = forward (depth)
1. Sample 3 random non-collinear points.
2. Fit plane: normal n, offset d (n·p = d, n=1).
3. Count inliers: |n·p - d| < inlier_thresh_m.
4. Repeat n_iter times; keep plane with most inliers.
5. Optionally refine: refit normal via SVD on inlier subset.
Normal orientation:
After RANSAC the normal is flipped so it points in the Y direction
(upward in world / toward the camera). This makes "height above ground"
unambiguous: h = d n·p > 0 means the point is above the plane.
Equivalently: normal.y < 0 after orientation.
Height above ground:
h(p) = d n·p
h > 0 above plane (obstacle candidate)
h 0 on the ground
h < 0 below ground (artefact / noise)
Obstacle filter:
min_obstacle_h < h < max_obstacle_h (configurable, metres)
Typical D435i camera mount ~0.5 m above ground:
min_obstacle_h = 0.05 m (ignore ground noise)
max_obstacle_h = 0.80 m (ignore ceiling / upper structure)
"""
from __future__ import annotations
import math
from typing import Optional, Tuple
import numpy as np
PlaneParams = Tuple[np.ndarray, float] # (normal_3, d_scalar)
# ── RANSAC ─────────────────────────────────────────────────────────────────────
def fit_ground_plane(
points: np.ndarray,
n_iter: int = 50,
inlier_thresh_m: float = 0.06,
min_inlier_frac: float = 0.30,
refine: bool = True,
) -> Optional[PlaneParams]:
"""
RANSAC ground-plane estimation from an (N, 3) point array.
Parameters
----------
points : (N, 3) float32/64 point cloud in camera optical frame
n_iter : RANSAC iterations
inlier_thresh_m : inlier distance threshold (m)
min_inlier_frac : minimum fraction of points that must be inliers
refine : if True, refit plane via SVD on best inlier set
Returns
-------
(normal, d) if a plane was found, else None.
Normal is oriented so that normal.y < 0 (points upward in world).
"""
n = len(points)
if n < 10:
return None
rng = np.random.default_rng(seed=42)
best_n_in = -1
best_params: Optional[PlaneParams] = None
pts = points.astype(np.float64)
for _ in range(n_iter):
idx = rng.integers(0, n, size=3)
p0, p1, p2 = pts[idx[0]], pts[idx[1]], pts[idx[2]]
edge1 = p1 - p0
edge2 = p2 - p0
normal = np.cross(edge1, edge2)
norm_len = float(np.linalg.norm(normal))
if norm_len < 1e-8:
continue
normal /= norm_len
d = float(np.dot(normal, p0))
dists = np.abs(pts @ normal - d)
inliers = dists < inlier_thresh_m
n_in = int(inliers.sum())
if n_in > best_n_in:
best_n_in = n_in
best_params = (normal.copy(), d)
if best_params is None:
return None
if best_n_in < int(n * min_inlier_frac):
return None
normal, d = best_params
# ── SVD refinement ──────────────────────────────────────────────────────
if refine:
dists = np.abs(pts @ normal - d)
in_mask = dists < inlier_thresh_m
if in_mask.sum() >= 4:
in_pts = pts[in_mask]
centroid = in_pts.mean(axis=0)
_, _, Vt = np.linalg.svd(in_pts - centroid)
normal = Vt[-1] # smallest singular value = plane normal
normal /= float(np.linalg.norm(normal))
d = float(np.dot(normal, centroid))
# Orient normal upward: ensure n.y < 0 (Y = up in camera frame)
if normal[1] > 0:
normal = -normal
d = -d
return normal, d
# ── Point classification ───────────────────────────────────────────────────────
def height_above_plane(
points: np.ndarray,
plane: PlaneParams,
) -> np.ndarray:
"""
Signed height of each point above the ground plane.
h > 0 above ground (potential obstacle)
h 0 on ground
h < 0 subsurface artefact / sensor noise
Parameters
----------
points : (N, 3) point cloud
plane : (normal, d) from fit_ground_plane()
Returns
-------
heights : (N,) float64
"""
normal, d = plane
# h = d - n·p (positive when point is "above" the upward-facing normal)
return d - (points.astype(np.float64) @ normal)
def obstacle_mask(
points: np.ndarray,
plane: PlaneParams,
min_height_m: float = 0.05,
max_height_m: float = 0.80,
) -> np.ndarray:
"""
Boolean mask: True where a point is an obstacle (above ground, in height window).
Parameters
----------
points : (N, 3) point cloud
plane : ground plane params
min_height_m : minimum height above ground to count as obstacle
max_height_m : maximum height above ground (ceiling / upper structure cut-off)
Returns
-------
mask : (N,) bool
"""
h = height_above_plane(points, plane)
return (h > min_height_m) & (h < max_height_m)
def ground_mask(
points: np.ndarray,
plane: PlaneParams,
inlier_thresh: float = 0.06,
) -> np.ndarray:
"""Boolean mask: True for ground-plane inlier points."""
normal, d = plane
dists = np.abs(points.astype(np.float64) @ normal - d)
return dists < inlier_thresh

View File

@ -0,0 +1,170 @@
"""
obstacle_clusterer.py 2D grid-based obstacle clustering (Issue #611).
No ROS2 dependencies pure Python + numpy, fully unit-testable.
Algorithm
After RANSAC ground removal, obstacle points (non-ground) are clustered
using a 2D occupancy grid on the horizontal (XZ) plane in camera frame.
Steps:
1. Project obstacle 3D points onto the (X, Z) bird's-eye plane.
2. Discretise into grid cells of side `cell_m`.
3. 4-connected BFS flood-fill to find connected components.
4. For each component with min_pts 3D points:
centroid = mean of the component's 3D points
radius = max distance from centroid (bounding-sphere radius)
Camera frame convention:
+X = right, +Y = down (not used for 2D projection), +Z = forward (depth)
Output per cluster
ObstacleCluster (dataclass):
centroid : (3,) float array (cx, cy, cz) in camera frame
radius_m : float bounding-sphere radius (m)
height_m : float mean height above ground plane (m)
n_pts : int number of 3D points in cluster
"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from typing import List, Tuple
import numpy as np
@dataclass
class ObstacleCluster:
centroid: np.ndarray # (3,) float64 in camera optical frame
radius_m: float
height_m: float
n_pts: int
@property
def distance_m(self) -> float:
"""Forward distance (Z in camera frame)."""
return float(self.centroid[2])
@property
def lateral_m(self) -> float:
"""Lateral offset (X in camera frame; positive = right)."""
return float(self.centroid[0])
def cluster_obstacles(
points: np.ndarray,
heights: np.ndarray,
cell_m: float = 0.30,
min_pts: int = 5,
x_range: Tuple[float, float] = (-4.0, 4.0),
z_range: Tuple[float, float] = (0.15, 6.0),
) -> List[ObstacleCluster]:
"""
Cluster obstacle points into connected groups.
Parameters
----------
points : (N, 3) float array obstacle points in camera frame
heights : (N,) float array height above ground for each point
cell_m : grid cell side length (m)
min_pts : minimum points for a cluster to be reported
x_range : (min_x, max_x) bounds for the bird's-eye grid (m)
z_range : (min_z, max_z) forward-distance bounds (m)
Returns
-------
clusters : list of ObstacleCluster, sorted by ascending distance (z)
"""
if len(points) == 0:
return []
pts = points.astype(np.float64)
# ── Discard out-of-range points ─────────────────────────────────────────
x = pts[:, 0]
z = pts[:, 2]
valid = (
(x >= x_range[0]) & (x <= x_range[1]) &
(z >= z_range[0]) & (z <= z_range[1])
)
pts = pts[valid]
heights = heights.astype(np.float64)[valid]
if len(pts) == 0:
return []
# ── Build 2D occupancy grid (X, Z) ──────────────────────────────────────
x_min, x_max = x_range
z_min, z_max = z_range
n_x = max(1, int(math.ceil((x_max - x_min) / cell_m)))
n_z = max(1, int(math.ceil((z_max - z_min) / cell_m)))
xi = np.clip(((pts[:, 0] - x_min) / cell_m).astype(np.int32), 0, n_x - 1)
zi = np.clip(((pts[:, 2] - z_min) / cell_m).astype(np.int32), 0, n_z - 1)
# Map each grid cell to list of 3D-point indices
cell_pts: dict[Tuple[int, int], list] = {}
for idx, (gx, gz) in enumerate(zip(xi.tolist(), zi.tolist())):
key = (gx, gz)
if key not in cell_pts:
cell_pts[key] = []
cell_pts[key].append(idx)
occupied = set(cell_pts.keys())
# ── BFS connected-component labelling ────────────────────────────────────
visited = set()
clusters = []
for start in occupied:
if start in visited:
continue
# BFS flood fill
component_cells = []
queue = deque([start])
visited.add(start)
while queue:
cx, cz = queue.popleft()
component_cells.append((cx, cz))
for dx, dz in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nb = (cx + dx, cz + dz)
if nb in occupied and nb not in visited:
visited.add(nb)
queue.append(nb)
# Collect 3D points in this component
pt_indices = []
for cell in component_cells:
pt_indices.extend(cell_pts[cell])
if len(pt_indices) < min_pts:
continue
cluster_pts = pts[pt_indices]
cluster_h = heights[pt_indices]
centroid = cluster_pts.mean(axis=0)
dists = np.linalg.norm(cluster_pts - centroid, axis=1)
radius_m = float(dists.max())
mean_height = float(cluster_h.mean())
clusters.append(ObstacleCluster(
centroid = centroid,
radius_m = radius_m,
height_m = mean_height,
n_pts = len(pt_indices),
))
# Sort by forward distance
clusters.sort(key=lambda c: c.distance_m)
return clusters
# ── Local import for math ─────────────────────────────────────────────────────
import math

View File

@ -0,0 +1,512 @@
"""
obstacle_detect_node.py RealSense depth obstacle detection node (Issue #611).
Pipeline per frame (30 Hz)
1. Receive D435i aligned depth image (float32, metres) + camera_info.
2. Downsample by `downsample_factor` for efficiency.
3. Back-project non-zero pixels to 3D (camera optical frame, +Z forward).
4. RANSAC ground-plane fitting on the 3D cloud.
5. Classify points: ground (inlier) vs obstacle (above ground, in height window).
6. Cluster obstacle points using 2D grid BFS on the (X, Z) plane.
7. Publish:
/saltybot/obstacles visualization_msgs/MarkerArray (centroids)
/saltybot/obstacles/cloud sensor_msgs/PointCloud2 (non-ground pts)
/saltybot/obstacles/alert std_msgs/String (JSON alert)
8. When `depth_estop_enabled` and DANGER detected: publish zero Twist to
`depth_estop_topic` (default /cmd_vel_input) for safety_zone integration.
Obstacle alert levels (JSON on /saltybot/obstacles/alert)
DANGER closest obstacle z < danger_dist_m (requires immediate stop)
WARN closest obstacle z < warn_dist_m (caution)
CLEAR no obstacles in range
Safety zone integration
When `depth_estop_enabled: true`, DANGER obstacles trigger a zero Twist
published to `depth_estop_topic` (default: /cmd_vel_input).
This feeds into the safety_zone's cmd_vel chain and triggers its e-stop,
giving independent depth-based stopping complementary to the LIDAR zones.
Subscribes
/camera/depth/image_rect_raw sensor_msgs/Image 30 Hz float32 (m)
/camera/depth/camera_info sensor_msgs/CameraInfo (once)
Publishes
/saltybot/obstacles visualization_msgs/MarkerArray
/saltybot/obstacles/cloud sensor_msgs/PointCloud2
/saltybot/obstacles/alert std_msgs/String (JSON)
[depth_estop_topic] geometry_msgs/Twist (zero, when DANGER)
Parameters (see config/obstacle_detect_params.yaml)
downsample_factor int 8 factor to reduce image before processing
ransac_n_iter int 50 RANSAC iterations for ground plane
ransac_inlier_m float 0.06 inlier threshold (m)
min_obstacle_h_m float 0.05 min height above ground to be obstacle
max_obstacle_h_m float 0.80 max height above ground to be obstacle
cluster_cell_m float 0.30 clustering grid cell size (m)
cluster_min_pts int 5 minimum points per cluster
max_obstacle_dist_m float 5.0 discard obstacles beyond this distance
danger_dist_m float 0.40 obstacle z < this DANGER
warn_dist_m float 1.20 obstacle z < this WARN
depth_estop_enabled bool false publish zero-vel when DANGER
depth_estop_topic str /cmd_vel_input
camera_frame str camera_link
marker_lifetime_s float 0.2 MarkerArray marker lifetime (s)
"""
from __future__ import annotations
import json
import math
import struct
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
)
import numpy as np
from cv_bridge import CvBridge
from geometry_msgs.msg import Point, Twist, Vector3
from sensor_msgs.msg import Image, CameraInfo, PointCloud2, PointField
from std_msgs.msg import Header, String, ColorRGBA
from visualization_msgs.msg import Marker, MarkerArray
from .ground_plane import fit_ground_plane, height_above_plane, obstacle_mask
from .obstacle_clusterer import ObstacleCluster, cluster_obstacles
# ── QoS ───────────────────────────────────────────────────────────────────────
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
_LATCHED_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
# Alert levels
CLEAR = 'CLEAR'
WARN = 'WARN'
DANGER = 'DANGER'
# Marker colours per alert level
_COLOUR = {
CLEAR: (0.0, 1.0, 0.0, 0.8), # green
WARN: (1.0, 0.8, 0.0, 0.9), # yellow
DANGER: (1.0, 0.1, 0.1, 1.0), # red
}
# D435i depth scale (image is already float32 in metres when using 32FC1)
_DEPTH_ENCODING = '32FC1'
class ObstacleDetectNode(Node):
"""RealSense depth-based obstacle detection — see module docstring."""
def __init__(self) -> None:
super().__init__('obstacle_detect')
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter('downsample_factor', 8)
self.declare_parameter('ransac_n_iter', 50)
self.declare_parameter('ransac_inlier_m', 0.06)
self.declare_parameter('min_obstacle_h_m', 0.05)
self.declare_parameter('max_obstacle_h_m', 0.80)
self.declare_parameter('cluster_cell_m', 0.30)
self.declare_parameter('cluster_min_pts', 5)
self.declare_parameter('max_obstacle_dist_m', 5.0)
self.declare_parameter('danger_dist_m', 0.40)
self.declare_parameter('warn_dist_m', 1.20)
self.declare_parameter('depth_estop_enabled', False)
self.declare_parameter('depth_estop_topic', '/cmd_vel_input')
self.declare_parameter('camera_frame', 'camera_link')
self.declare_parameter('marker_lifetime_s', 0.2)
self._ds = self.get_parameter('downsample_factor').value
self._n_iter = self.get_parameter('ransac_n_iter').value
self._inlier_m = self.get_parameter('ransac_inlier_m').value
self._min_h = self.get_parameter('min_obstacle_h_m').value
self._max_h = self.get_parameter('max_obstacle_h_m').value
self._cell_m = self.get_parameter('cluster_cell_m').value
self._min_pts = self.get_parameter('cluster_min_pts').value
self._max_dist = self.get_parameter('max_obstacle_dist_m').value
self._danger_m = self.get_parameter('danger_dist_m').value
self._warn_m = self.get_parameter('warn_dist_m').value
self._estop_en = self.get_parameter('depth_estop_enabled').value
self._cam_frame = self.get_parameter('camera_frame').value
self._marker_life = self.get_parameter('marker_lifetime_s').value
estop_topic = self.get_parameter('depth_estop_topic').value
# ── Internal state ────────────────────────────────────────────────────
self._bridge = CvBridge()
self._fx: float | None = None
self._fy: float | None = None
self._cx: float | None = None
self._cy: float | None = None
# Ground plane stability: blend current with previous (exponential)
self._plane_normal: np.ndarray | None = None
self._plane_d: float | None = None
self._plane_alpha = 0.3 # blend weight for new plane estimate
# ── Publishers ─────────────────────────────────────────────────────────
self._marker_pub = self.create_publisher(
MarkerArray, '/saltybot/obstacles', 10
)
self._cloud_pub = self.create_publisher(
PointCloud2, '/saltybot/obstacles/cloud', 10
)
self._alert_pub = self.create_publisher(
String, '/saltybot/obstacles/alert', 10
)
if self._estop_en:
self._estop_pub = self.create_publisher(
Twist, estop_topic, 10
)
else:
self._estop_pub = None
# ── Subscriptions ──────────────────────────────────────────────────────
self.create_subscription(
CameraInfo,
'/camera/depth/camera_info',
self._on_camera_info,
_LATCHED_QOS,
)
self.create_subscription(
Image,
'/camera/depth/image_rect_raw',
self._on_depth,
_SENSOR_QOS,
)
self.get_logger().info(
f'obstacle_detect ready — '
f'ds={self._ds} ransac={self._n_iter}it '
f'danger={self._danger_m}m warn={self._warn_m}m '
f'estop={"ON → " + estop_topic if self._estop_en else "OFF"}'
)
# ── Camera info ───────────────────────────────────────────────────────────
def _on_camera_info(self, msg: CameraInfo) -> None:
if self._fx is not None:
return
self._fx = float(msg.k[0])
self._fy = float(msg.k[4])
self._cx = float(msg.k[2])
self._cy = float(msg.k[5])
self.get_logger().info(
f'camera_info: fx={self._fx:.1f} fy={self._fy:.1f} '
f'cx={self._cx:.1f} cy={self._cy:.1f}'
)
# ── Main depth callback ───────────────────────────────────────────────────
def _on_depth(self, msg: Image) -> None:
if self._fx is None:
return # wait for camera_info
# ── Decode depth ──────────────────────────────────────────────────────
try:
depth = self._bridge.imgmsg_to_cv2(msg, desired_encoding=_DEPTH_ENCODING)
except Exception as exc:
self.get_logger().warning(f'depth decode error: {exc}', throttle_duration_sec=5.0)
return
h_img, w_img = depth.shape
stamp = msg.header.stamp
# ── Downsample ────────────────────────────────────────────────────────
ds = self._ds
depth_ds = depth[::ds, ::ds] # stride-based downsample
h_ds, w_ds = depth_ds.shape
# Back-projection intrinsics at downsampled resolution
fx = self._fx / ds
fy = self._fy / ds
cx = self._cx / ds
cy = self._cy / ds
# ── Back-project to 3D ────────────────────────────────────────────────
points = _backproject(depth_ds, fx, fy, cx, cy, max_dist=self._max_dist)
if len(points) < 30:
self._publish_empty(stamp)
return
# ── RANSAC ground plane ───────────────────────────────────────────────
result = fit_ground_plane(
points,
n_iter = self._n_iter,
inlier_thresh_m = self._inlier_m,
)
if result is None:
self._publish_empty(stamp)
return
new_normal, new_d = result
# Blend with previous plane for temporal stability
if self._plane_normal is None:
self._plane_normal = new_normal
self._plane_d = new_d
else:
alpha = self._plane_alpha
blended = (1.0 - alpha) * self._plane_normal + alpha * new_normal
norm_len = float(np.linalg.norm(blended))
if norm_len > 1e-8:
self._plane_normal = blended / norm_len
self._plane_d = (1.0 - alpha) * self._plane_d + alpha * new_d
plane = (self._plane_normal, self._plane_d)
# ── Obstacle mask ─────────────────────────────────────────────────────
obs_mask = obstacle_mask(points, plane, self._min_h, self._max_h)
obs_pts = points[obs_mask]
heights_all = height_above_plane(points, plane)
obs_heights = heights_all[obs_mask]
# ── Cluster obstacles ─────────────────────────────────────────────────
clusters = cluster_obstacles(
obs_pts, obs_heights,
cell_m = self._cell_m,
min_pts = self._min_pts,
z_range = (0.15, self._max_dist),
)
# ── Alert level ───────────────────────────────────────────────────────
alert_level = CLEAR
closest_dist = float('inf')
for cluster in clusters:
d = cluster.distance_m
if d < closest_dist:
closest_dist = d
if d < self._danger_m:
alert_level = DANGER
break
if d < self._warn_m and alert_level != DANGER:
alert_level = WARN
if closest_dist == float('inf'):
closest_dist = -1.0
# ── Publish ───────────────────────────────────────────────────────────
self._publish_markers(clusters, stamp)
self._publish_cloud(obs_pts, stamp)
self._publish_alert(alert_level, clusters, closest_dist, stamp)
# Depth e-stop: feed zero vel to safety zone chain
if self._estop_en and self._estop_pub is not None and alert_level == DANGER:
self._estop_pub.publish(Twist())
# ── Publishers ────────────────────────────────────────────────────────────
def _publish_markers(self, clusters: list, stamp) -> None:
"""Publish obstacle centroids as sphere MarkerArray."""
ma = MarkerArray()
lifetime = _ros_duration(self._marker_life)
# Delete-all marker first for clean slate
del_marker = Marker()
del_marker.action = Marker.DELETEALL
del_marker.header.stamp = stamp
del_marker.header.frame_id = self._cam_frame
ma.markers.append(del_marker)
for idx, cluster in enumerate(clusters):
level = self._cluster_alert_level(cluster)
r, g, b, a = _COLOUR[level]
cx, cy, cz = cluster.centroid
m = Marker()
m.header.stamp = stamp
m.header.frame_id = self._cam_frame
m.ns = 'obstacles'
m.id = idx + 1
m.type = Marker.SPHERE
m.action = Marker.ADD
m.pose.position = Point(x=float(cx), y=float(cy), z=float(cz))
m.pose.orientation.w = 1.0
rad = max(0.05, cluster.radius_m)
m.scale = Vector3(x=rad * 2.0, y=rad * 2.0, z=rad * 2.0)
m.color = ColorRGBA(r=r, g=g, b=b, a=a)
m.lifetime = lifetime
ma.markers.append(m)
# Text label above sphere
lbl = Marker()
lbl.header = m.header
lbl.ns = 'obstacle_labels'
lbl.id = idx + 1
lbl.type = Marker.TEXT_VIEW_FACING
lbl.action = Marker.ADD
lbl.pose.position = Point(x=float(cx), y=float(cy) - 0.15, z=float(cz))
lbl.pose.orientation.w = 1.0
lbl.scale.z = 0.12
lbl.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=0.9)
lbl.text = f'{cluster.distance_m:.2f}m'
lbl.lifetime = lifetime
ma.markers.append(lbl)
self._marker_pub.publish(ma)
def _publish_cloud(self, points: np.ndarray, stamp) -> None:
"""Publish non-ground obstacle points as PointCloud2 (xyz float32)."""
if len(points) == 0:
# Publish empty cloud
cloud = PointCloud2()
cloud.header.stamp = stamp
cloud.header.frame_id = self._cam_frame
self._cloud_pub.publish(cloud)
return
pts_f32 = points.astype(np.float32)
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
]
cloud = PointCloud2()
cloud.header.stamp = stamp
cloud.header.frame_id = self._cam_frame
cloud.height = 1
cloud.width = len(pts_f32)
cloud.fields = fields
cloud.is_bigendian = False
cloud.point_step = 12 # 3 × float32
cloud.row_step = cloud.point_step * cloud.width
cloud.data = pts_f32.tobytes()
cloud.is_dense = True
self._cloud_pub.publish(cloud)
def _publish_alert(
self,
level: str,
clusters: list,
closest_m: float,
stamp,
) -> None:
obstacles_json = [
{
'x': round(float(c.centroid[0]), 3),
'y': round(float(c.centroid[1]), 3),
'z': round(float(c.centroid[2]), 3),
'radius_m': round(c.radius_m, 3),
'height_m': round(c.height_m, 3),
'n_pts': c.n_pts,
'level': self._cluster_alert_level(c),
}
for c in clusters
]
alert = {
'alert_level': level,
'closest_m': round(closest_m, 3),
'obstacle_count': len(clusters),
'obstacles': obstacles_json,
}
msg = String()
msg.data = json.dumps(alert)
self._alert_pub.publish(msg)
def _publish_empty(self, stamp) -> None:
"""Publish empty state (no obstacles detected / ground plane failed)."""
self._publish_markers([], stamp)
self._publish_cloud(np.zeros((0, 3), np.float32), stamp)
alert = {
'alert_level': CLEAR,
'closest_m': -1.0,
'obstacle_count': 0,
'obstacles': [],
}
msg = String()
msg.data = json.dumps(alert)
self._alert_pub.publish(msg)
# ── Helpers ───────────────────────────────────────────────────────────────
def _cluster_alert_level(self, cluster: ObstacleCluster) -> str:
d = cluster.distance_m
if d < self._danger_m:
return DANGER
if d < self._warn_m:
return WARN
return CLEAR
# ── Pure-function helpers ──────────────────────────────────────────────────────
def _backproject(
depth: np.ndarray,
fx: float, fy: float,
cx: float, cy: float,
min_dist: float = 0.15,
max_dist: float = 6.0,
) -> np.ndarray:
"""
Back-project a float32 depth image (metres) to a (N, 3) 3D point array.
Camera optical frame: +X right, +Y down, +Z forward.
Only returns pixels where depth is in [min_dist, max_dist].
"""
h, w = depth.shape
# Build pixel-grid meshgrid
u_arr = np.arange(w, dtype=np.float32)
v_arr = np.arange(h, dtype=np.float32)
uu, vv = np.meshgrid(u_arr, v_arr)
z = depth.astype(np.float32).ravel()
u = uu.ravel()
v = vv.ravel()
valid = (z > min_dist) & (z < max_dist) & np.isfinite(z)
z = z[valid]
u = u[valid]
v = v[valid]
x = (u - cx) * z / fx
y = (v - cy) * z / fy
return np.stack([x, y, z], axis=1).astype(np.float64)
def _ros_duration(seconds: float):
"""Build a rclpy Duration-compatible value for Marker.lifetime."""
from rclpy.duration import Duration
sec = int(seconds)
nsec = int((seconds - sec) * 1e9)
return Duration(seconds=sec, nanoseconds=nsec).to_msg()
def main(args=None) -> None:
rclpy.init(args=args)
node = ObstacleDetectNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_obstacle_detect
[install]
install_scripts=$base/lib/saltybot_obstacle_detect

View File

@ -0,0 +1,30 @@
import os
from glob import glob
from setuptools import setup
package_name = 'saltybot_obstacle_detect'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='sl-perception',
maintainer_email='sl-perception@saltylab.local',
description='RealSense depth obstacle detection — RANSAC ground + clustering (Issue #611)',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'obstacle_detect = saltybot_obstacle_detect.obstacle_detect_node:main',
],
},
)

View File

@ -0,0 +1,129 @@
"""Unit tests for ground_plane.py — RANSAC fitting and point classification."""
import math
import numpy as np
import pytest
from saltybot_obstacle_detect.ground_plane import (
fit_ground_plane,
height_above_plane,
obstacle_mask,
ground_mask,
)
def make_flat_ground(n=500, y_ground=1.0, noise=0.02, rng=None):
"""Flat ground plane at y = y_ground (camera frame: Y down)."""
if rng is None:
rng = np.random.default_rng(0)
x = rng.uniform(-2.0, 2.0, n).astype(np.float64)
z = rng.uniform(0.5, 4.0, n).astype(np.float64)
y = np.full(n, y_ground) + rng.normal(0, noise, n)
return np.stack([x, y, z], axis=1)
def make_obstacle_above_ground(cx=0.0, y_ground=1.0, height=0.3, n=50, rng=None):
"""Points forming a box above the ground plane."""
if rng is None:
rng = np.random.default_rng(1)
x = rng.uniform(cx - 0.15, cx + 0.15, n)
z = rng.uniform(1.5, 2.0, n)
y = np.full(n, y_ground - height) # Y_down: smaller y = higher in world
return np.stack([x, y, z], axis=1)
# ── fit_ground_plane ──────────────────────────────────────────────────────────
def test_fit_flat_ground_returns_result():
pts = make_flat_ground(n=400)
plane = fit_ground_plane(pts, n_iter=50)
assert plane is not None
def test_fit_ground_normal_points_upward():
"""Normal should have negative Y component (upward in camera frame)."""
pts = make_flat_ground(n=400)
plane = fit_ground_plane(pts, n_iter=50)
assert plane is not None
normal, d = plane
assert normal[1] < 0.0, f"Expected normal.y < 0, got {normal[1]:.3f}"
def test_fit_ground_normal_unit_vector():
pts = make_flat_ground(n=300)
plane = fit_ground_plane(pts, n_iter=50)
assert plane is not None
normal, _ = plane
assert abs(np.linalg.norm(normal) - 1.0) < 1e-6
def test_fit_too_few_points_returns_none():
pts = make_flat_ground(n=5)
plane = fit_ground_plane(pts, n_iter=10)
assert plane is None
def test_fit_empty_returns_none():
plane = fit_ground_plane(np.zeros((0, 3)))
assert plane is None
def test_fit_tilted_ground():
"""Tilted ground plane (robot on slope) should still be found."""
rng = np.random.default_rng(42)
n = 400
x = rng.uniform(-2.0, 2.0, n)
z = rng.uniform(0.5, 4.0, n)
# Tilt: y = 0.5 + 0.1*z (plane tilted 5.7° forward)
y = 0.5 + 0.1 * z + rng.normal(0, 0.02, n)
pts = np.stack([x, y, z], axis=1)
plane = fit_ground_plane(pts, n_iter=80)
assert plane is not None
# ── height_above_plane ────────────────────────────────────────────────────────
def test_ground_pts_near_zero_height():
ground_pts = make_flat_ground(n=200, y_ground=1.0, noise=0.005)
plane = fit_ground_plane(ground_pts, n_iter=60)
assert plane is not None
heights = height_above_plane(ground_pts, plane)
assert np.abs(heights).mean() < 0.10 # mean residual < 10 cm
def test_obstacle_pts_positive_height():
rng = np.random.default_rng(7)
ground_pts = make_flat_ground(n=300, y_ground=1.0, rng=rng)
obs_pts = make_obstacle_above_ground(y_ground=1.0, height=0.3, rng=rng)
all_pts = np.vstack([ground_pts, obs_pts])
plane = fit_ground_plane(all_pts, n_iter=60)
assert plane is not None
heights = height_above_plane(obs_pts, plane)
assert heights.mean() > 0.1, f"Expected obstacles above ground, got mean h={heights.mean():.3f}"
# ── obstacle_mask ─────────────────────────────────────────────────────────────
def test_obstacle_mask_selects_correct_points():
rng = np.random.default_rng(3)
ground_pts = make_flat_ground(n=300, y_ground=1.0, rng=rng)
obs_pts = make_obstacle_above_ground(y_ground=1.0, height=0.4, n=60, rng=rng)
all_pts = np.vstack([ground_pts, obs_pts])
plane = fit_ground_plane(all_pts, n_iter=60)
assert plane is not None
mask = obstacle_mask(all_pts, plane, min_height_m=0.05, max_height_m=0.80)
# Most of the obs_pts should be selected
n_ground_selected = mask[:300].sum()
n_obs_selected = mask[300:].sum()
assert n_obs_selected > 40, f"Expected most obs selected, got {n_obs_selected}/60"
assert n_ground_selected < 30, f"Expected few ground points selected, got {n_ground_selected}/300"
# ── ground_mask ───────────────────────────────────────────────────────────────
def test_ground_mask_covers_inliers():
pts = make_flat_ground(n=300, noise=0.01)
plane = fit_ground_plane(pts, n_iter=60)
assert plane is not None
gmask = ground_mask(pts, plane, inlier_thresh=0.08)
assert gmask.mean() > 0.7 # at least 70% of flat ground should be ground inliers

View File

@ -0,0 +1,105 @@
"""Unit tests for obstacle_clusterer.py — 2D grid BFS clustering."""
import numpy as np
import pytest
from saltybot_obstacle_detect.obstacle_clusterer import cluster_obstacles, ObstacleCluster
def _make_cluster_pts(cx, cz, n=30, spread=0.15, rng=None):
"""Generate points around a centroid in camera frame (x=lateral, z=forward)."""
if rng is None:
rng = np.random.default_rng(0)
x = rng.uniform(cx - spread, cx + spread, n)
y = rng.uniform(-0.3, 0.0, n) # above ground (height ~0.3m)
z = rng.uniform(cz - spread, cz + spread, n)
return np.stack([x, y, z], axis=1)
def _make_heights(pts, base_h=0.3, noise=0.05, rng=None):
if rng is None:
rng = np.random.default_rng(1)
return np.full(len(pts), base_h) + rng.normal(0, noise, len(pts))
# ── Basic clustering ──────────────────────────────────────────────────────────
def test_single_cluster_detected():
pts = _make_cluster_pts(cx=0.0, cz=2.0, n=40)
heights = _make_heights(pts)
clusters = cluster_obstacles(pts, heights, cell_m=0.30, min_pts=5)
assert len(clusters) == 1
def test_two_clusters_separated():
rng = np.random.default_rng(42)
pts1 = _make_cluster_pts(cx=-1.0, cz=2.0, n=40, rng=rng)
pts2 = _make_cluster_pts(cx=+1.5, cz=3.5, n=40, rng=rng)
pts = np.vstack([pts1, pts2])
h = _make_heights(pts, rng=rng)
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5)
assert len(clusters) == 2
def test_clusters_sorted_by_distance():
rng = np.random.default_rng(7)
pts_near = _make_cluster_pts(cx=0.0, cz=1.5, n=40, rng=rng)
pts_far = _make_cluster_pts(cx=0.0, cz=4.0, n=40, rng=rng)
pts = np.vstack([pts_far, pts_near]) # far first
h = _make_heights(pts, rng=rng)
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5)
assert len(clusters) == 2
assert clusters[0].distance_m < clusters[1].distance_m
def test_empty_input():
clusters = cluster_obstacles(
np.zeros((0, 3), np.float64),
np.zeros(0, np.float64),
)
assert clusters == []
def test_min_pts_filters_small_cluster():
rng = np.random.default_rng(9)
pts_big = _make_cluster_pts(cx=0.0, cz=2.0, n=50, rng=rng)
pts_tiny = _make_cluster_pts(cx=2.0, cz=2.0, n=2, rng=rng)
pts = np.vstack([pts_big, pts_tiny])
h = _make_heights(pts, rng=rng)
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5)
# Only the big cluster should pass
assert len(clusters) == 1
assert clusters[0].n_pts >= 40
def test_centroid_near_true_center():
rng = np.random.default_rng(5)
true_cx, true_cz = 0.0, 2.5
pts = _make_cluster_pts(cx=true_cx, cz=true_cz, n=100, spread=0.1, rng=rng)
h = _make_heights(pts, rng=rng)
clusters = cluster_obstacles(pts, h, cell_m=0.20, min_pts=5)
assert len(clusters) >= 1
c = clusters[0]
assert abs(c.centroid[0] - true_cx) < 0.3
assert abs(c.centroid[2] - true_cz) < 0.3
def test_out_of_range_points_ignored():
rng = np.random.default_rng(3)
pts_ok = _make_cluster_pts(cx=0.0, cz=2.0, n=30, rng=rng)
# Points beyond max_obstacle_dist_m=5.0
pts_far = _make_cluster_pts(cx=0.0, cz=7.0, n=30, rng=rng)
pts = np.vstack([pts_ok, pts_far])
h = _make_heights(pts, rng=rng)
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5, z_range=(0.15, 5.0))
# Only the near cluster should survive
assert all(c.distance_m <= 5.0 for c in clusters)
# ── ObstacleCluster accessors ─────────────────────────────────────────────────
def test_cluster_distance_and_lateral():
centroid = np.array([1.5, -0.3, 3.0])
c = ObstacleCluster(centroid=centroid, radius_m=0.2, height_m=0.3, n_pts=30)
assert abs(c.distance_m - 3.0) < 1e-9
assert abs(c.lateral_m - 1.5) < 1e-9