feat: RealSense obstacle detection (Issue #611) #623
@ -0,0 +1,35 @@
|
||||
obstacle_detect:
|
||||
ros__parameters:
|
||||
# ── Depth processing ──────────────────────────────────────────────────────
|
||||
downsample_factor: 8 # stride-based downsample before processing
|
||||
# 640x480 / 8 = 80x60 = 4800 pts (fast RANSAC)
|
||||
|
||||
# ── RANSAC ground plane fitting ───────────────────────────────────────────
|
||||
ransac_n_iter: 50 # RANSAC iterations
|
||||
ransac_inlier_m: 0.06 # inlier distance threshold (m)
|
||||
|
||||
# ── Obstacle height filter ────────────────────────────────────────────────
|
||||
min_obstacle_h_m: 0.05 # min height above ground to count as obstacle (m)
|
||||
# — avoids false positives from ground noise
|
||||
max_obstacle_h_m: 0.80 # max height above ground to count as obstacle (m)
|
||||
# — ignores ceiling / upper structure (D435i ~0.5m mount)
|
||||
|
||||
# ── Clustering ────────────────────────────────────────────────────────────
|
||||
cluster_cell_m: 0.30 # 2D grid cell size for BFS clustering (m)
|
||||
cluster_min_pts: 5 # minimum points for a cluster to be reported
|
||||
max_obstacle_dist_m: 5.0 # discard obstacle points beyond this forward distance
|
||||
|
||||
# ── Alert / e-stop thresholds ─────────────────────────────────────────────
|
||||
danger_dist_m: 0.40 # closest obstacle z < this → DANGER alert
|
||||
warn_dist_m: 1.20 # closest obstacle z < this → WARN alert
|
||||
|
||||
# ── Safety zone integration ───────────────────────────────────────────────
|
||||
# When enabled, DANGER obstacles publish zero Twist to depth_estop_topic.
|
||||
# Wire this into the cmd_vel safety chain so safety_zone can latch e-stop.
|
||||
# Typical chain: depth_estop_topic=/cmd_vel_input → safety_zone → /cmd_vel
|
||||
depth_estop_enabled: false
|
||||
depth_estop_topic: /cmd_vel_input
|
||||
|
||||
# ── Frame / topic configuration ───────────────────────────────────────────
|
||||
camera_frame: camera_link # frame_id for MarkerArray and PointCloud2
|
||||
marker_lifetime_s: 0.20 # Marker.lifetime — 0.2 s = 6 frames at 30 Hz
|
||||
@ -0,0 +1,29 @@
|
||||
"""obstacle_detect.launch.py — RealSense depth obstacle detection (Issue #611)."""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_share = get_package_share_directory('saltybot_obstacle_detect')
|
||||
default_params = os.path.join(pkg_share, 'config', 'obstacle_detect_params.yaml')
|
||||
|
||||
params_arg = DeclareLaunchArgument(
|
||||
'params_file',
|
||||
default_value=default_params,
|
||||
description='Path to obstacle_detect_params.yaml',
|
||||
)
|
||||
|
||||
obstacle_detect_node = Node(
|
||||
package='saltybot_obstacle_detect',
|
||||
executable='obstacle_detect',
|
||||
name='obstacle_detect',
|
||||
output='screen',
|
||||
parameters=[LaunchConfiguration('params_file')],
|
||||
)
|
||||
|
||||
return LaunchDescription([params_arg, obstacle_detect_node])
|
||||
36
jetson/ros2_ws/src/saltybot_obstacle_detect/package.xml
Normal file
36
jetson/ros2_ws/src/saltybot_obstacle_detect/package.xml
Normal file
@ -0,0 +1,36 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_obstacle_detect</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
RealSense D435i depth-based obstacle detection node (Issue #611).
|
||||
RANSAC ground-plane fitting, 2D grid-BFS obstacle clustering, publishes
|
||||
MarkerArray centroids (/saltybot/obstacles), PointCloud2 of non-ground
|
||||
points (/saltybot/obstacles/cloud), and JSON alert (/saltybot/obstacles/alert)
|
||||
with safety_zone e-stop integration via configurable Twist output.
|
||||
</description>
|
||||
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>visualization_msgs</depend>
|
||||
<depend>cv_bridge</depend>
|
||||
|
||||
<exec_depend>python3-numpy</exec_depend>
|
||||
<exec_depend>python3-opencv</exec_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,192 @@
|
||||
"""
|
||||
ground_plane.py — RANSAC ground plane fitting for D435i depth images (Issue #611).
|
||||
|
||||
No ROS2 dependencies — pure Python + numpy, fully unit-testable.
|
||||
|
||||
Algorithm
|
||||
─────────
|
||||
RANSAC over 3D point cloud in camera optical frame:
|
||||
+X = right, +Y = down, +Z = forward (depth)
|
||||
|
||||
1. Sample 3 random non-collinear points.
|
||||
2. Fit plane: normal n, offset d (n·p = d, ‖n‖=1).
|
||||
3. Count inliers: |n·p - d| < inlier_thresh_m.
|
||||
4. Repeat n_iter times; keep plane with most inliers.
|
||||
5. Optionally refine: refit normal via SVD on inlier subset.
|
||||
|
||||
Normal orientation:
|
||||
After RANSAC the normal is flipped so it points in the −Y direction
|
||||
(upward in world / toward the camera). This makes "height above ground"
|
||||
unambiguous: h = d − n·p > 0 means the point is above the plane.
|
||||
|
||||
Equivalently: normal.y < 0 after orientation.
|
||||
|
||||
Height above ground:
|
||||
h(p) = d − n·p
|
||||
|
||||
h > 0 → above plane (obstacle candidate)
|
||||
h ≈ 0 → on the ground
|
||||
h < 0 → below ground (artefact / noise)
|
||||
|
||||
Obstacle filter:
|
||||
min_obstacle_h < h < max_obstacle_h (configurable, metres)
|
||||
|
||||
Typical D435i camera mount ~0.5 m above ground:
|
||||
min_obstacle_h = 0.05 m (ignore ground noise)
|
||||
max_obstacle_h = 0.80 m (ignore ceiling / upper structure)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
PlaneParams = Tuple[np.ndarray, float] # (normal_3, d_scalar)
|
||||
|
||||
|
||||
# ── RANSAC ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def fit_ground_plane(
|
||||
points: np.ndarray,
|
||||
n_iter: int = 50,
|
||||
inlier_thresh_m: float = 0.06,
|
||||
min_inlier_frac: float = 0.30,
|
||||
refine: bool = True,
|
||||
) -> Optional[PlaneParams]:
|
||||
"""
|
||||
RANSAC ground-plane estimation from an (N, 3) point array.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
points : (N, 3) float32/64 point cloud in camera optical frame
|
||||
n_iter : RANSAC iterations
|
||||
inlier_thresh_m : inlier distance threshold (m)
|
||||
min_inlier_frac : minimum fraction of points that must be inliers
|
||||
refine : if True, refit plane via SVD on best inlier set
|
||||
|
||||
Returns
|
||||
-------
|
||||
(normal, d) if a plane was found, else None.
|
||||
Normal is oriented so that normal.y < 0 (points upward in world).
|
||||
"""
|
||||
n = len(points)
|
||||
if n < 10:
|
||||
return None
|
||||
|
||||
rng = np.random.default_rng(seed=42)
|
||||
best_n_in = -1
|
||||
best_params: Optional[PlaneParams] = None
|
||||
pts = points.astype(np.float64)
|
||||
|
||||
for _ in range(n_iter):
|
||||
idx = rng.integers(0, n, size=3)
|
||||
p0, p1, p2 = pts[idx[0]], pts[idx[1]], pts[idx[2]]
|
||||
|
||||
edge1 = p1 - p0
|
||||
edge2 = p2 - p0
|
||||
normal = np.cross(edge1, edge2)
|
||||
norm_len = float(np.linalg.norm(normal))
|
||||
if norm_len < 1e-8:
|
||||
continue
|
||||
|
||||
normal /= norm_len
|
||||
d = float(np.dot(normal, p0))
|
||||
|
||||
dists = np.abs(pts @ normal - d)
|
||||
inliers = dists < inlier_thresh_m
|
||||
n_in = int(inliers.sum())
|
||||
|
||||
if n_in > best_n_in:
|
||||
best_n_in = n_in
|
||||
best_params = (normal.copy(), d)
|
||||
|
||||
if best_params is None:
|
||||
return None
|
||||
if best_n_in < int(n * min_inlier_frac):
|
||||
return None
|
||||
|
||||
normal, d = best_params
|
||||
|
||||
# ── SVD refinement ──────────────────────────────────────────────────────
|
||||
if refine:
|
||||
dists = np.abs(pts @ normal - d)
|
||||
in_mask = dists < inlier_thresh_m
|
||||
if in_mask.sum() >= 4:
|
||||
in_pts = pts[in_mask]
|
||||
centroid = in_pts.mean(axis=0)
|
||||
_, _, Vt = np.linalg.svd(in_pts - centroid)
|
||||
normal = Vt[-1] # smallest singular value = plane normal
|
||||
normal /= float(np.linalg.norm(normal))
|
||||
d = float(np.dot(normal, centroid))
|
||||
|
||||
# Orient normal upward: ensure n.y < 0 (−Y = up in camera frame)
|
||||
if normal[1] > 0:
|
||||
normal = -normal
|
||||
d = -d
|
||||
|
||||
return normal, d
|
||||
|
||||
|
||||
# ── Point classification ───────────────────────────────────────────────────────
|
||||
|
||||
def height_above_plane(
|
||||
points: np.ndarray,
|
||||
plane: PlaneParams,
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Signed height of each point above the ground plane.
|
||||
|
||||
h > 0 → above ground (potential obstacle)
|
||||
h ≈ 0 → on ground
|
||||
h < 0 → subsurface artefact / sensor noise
|
||||
|
||||
Parameters
|
||||
----------
|
||||
points : (N, 3) point cloud
|
||||
plane : (normal, d) from fit_ground_plane()
|
||||
|
||||
Returns
|
||||
-------
|
||||
heights : (N,) float64
|
||||
"""
|
||||
normal, d = plane
|
||||
# h = d - n·p (positive when point is "above" the upward-facing normal)
|
||||
return d - (points.astype(np.float64) @ normal)
|
||||
|
||||
|
||||
def obstacle_mask(
|
||||
points: np.ndarray,
|
||||
plane: PlaneParams,
|
||||
min_height_m: float = 0.05,
|
||||
max_height_m: float = 0.80,
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Boolean mask: True where a point is an obstacle (above ground, in height window).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
points : (N, 3) point cloud
|
||||
plane : ground plane params
|
||||
min_height_m : minimum height above ground to count as obstacle
|
||||
max_height_m : maximum height above ground (ceiling / upper structure cut-off)
|
||||
|
||||
Returns
|
||||
-------
|
||||
mask : (N,) bool
|
||||
"""
|
||||
h = height_above_plane(points, plane)
|
||||
return (h > min_height_m) & (h < max_height_m)
|
||||
|
||||
|
||||
def ground_mask(
|
||||
points: np.ndarray,
|
||||
plane: PlaneParams,
|
||||
inlier_thresh: float = 0.06,
|
||||
) -> np.ndarray:
|
||||
"""Boolean mask: True for ground-plane inlier points."""
|
||||
normal, d = plane
|
||||
dists = np.abs(points.astype(np.float64) @ normal - d)
|
||||
return dists < inlier_thresh
|
||||
@ -0,0 +1,170 @@
|
||||
"""
|
||||
obstacle_clusterer.py — 2D grid-based obstacle clustering (Issue #611).
|
||||
|
||||
No ROS2 dependencies — pure Python + numpy, fully unit-testable.
|
||||
|
||||
Algorithm
|
||||
─────────
|
||||
After RANSAC ground removal, obstacle points (non-ground) are clustered
|
||||
using a 2D occupancy grid on the horizontal (X–Z) plane in camera frame.
|
||||
|
||||
Steps:
|
||||
1. Project obstacle 3D points onto the (X, Z) bird's-eye plane.
|
||||
2. Discretise into grid cells of side `cell_m`.
|
||||
3. 4-connected BFS flood-fill to find connected components.
|
||||
4. For each component with ≥ min_pts 3D points:
|
||||
centroid = mean of the component's 3D points
|
||||
radius = max distance from centroid (bounding-sphere radius)
|
||||
|
||||
Camera frame convention:
|
||||
+X = right, +Y = down (not used for 2D projection), +Z = forward (depth)
|
||||
|
||||
Output per cluster
|
||||
──────────────────
|
||||
ObstacleCluster (dataclass):
|
||||
centroid : (3,) float array — (cx, cy, cz) in camera frame
|
||||
radius_m : float — bounding-sphere radius (m)
|
||||
height_m : float — mean height above ground plane (m)
|
||||
n_pts : int — number of 3D points in cluster
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObstacleCluster:
|
||||
centroid: np.ndarray # (3,) float64 in camera optical frame
|
||||
radius_m: float
|
||||
height_m: float
|
||||
n_pts: int
|
||||
|
||||
@property
|
||||
def distance_m(self) -> float:
|
||||
"""Forward distance (Z in camera frame)."""
|
||||
return float(self.centroid[2])
|
||||
|
||||
@property
|
||||
def lateral_m(self) -> float:
|
||||
"""Lateral offset (X in camera frame; positive = right)."""
|
||||
return float(self.centroid[0])
|
||||
|
||||
|
||||
def cluster_obstacles(
|
||||
points: np.ndarray,
|
||||
heights: np.ndarray,
|
||||
cell_m: float = 0.30,
|
||||
min_pts: int = 5,
|
||||
x_range: Tuple[float, float] = (-4.0, 4.0),
|
||||
z_range: Tuple[float, float] = (0.15, 6.0),
|
||||
) -> List[ObstacleCluster]:
|
||||
"""
|
||||
Cluster obstacle points into connected groups.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
points : (N, 3) float array — obstacle points in camera frame
|
||||
heights : (N,) float array — height above ground for each point
|
||||
cell_m : grid cell side length (m)
|
||||
min_pts : minimum points for a cluster to be reported
|
||||
x_range : (min_x, max_x) bounds for the bird's-eye grid (m)
|
||||
z_range : (min_z, max_z) forward-distance bounds (m)
|
||||
|
||||
Returns
|
||||
-------
|
||||
clusters : list of ObstacleCluster, sorted by ascending distance (z)
|
||||
"""
|
||||
if len(points) == 0:
|
||||
return []
|
||||
|
||||
pts = points.astype(np.float64)
|
||||
|
||||
# ── Discard out-of-range points ─────────────────────────────────────────
|
||||
x = pts[:, 0]
|
||||
z = pts[:, 2]
|
||||
valid = (
|
||||
(x >= x_range[0]) & (x <= x_range[1]) &
|
||||
(z >= z_range[0]) & (z <= z_range[1])
|
||||
)
|
||||
pts = pts[valid]
|
||||
heights = heights.astype(np.float64)[valid]
|
||||
|
||||
if len(pts) == 0:
|
||||
return []
|
||||
|
||||
# ── Build 2D occupancy grid (X, Z) ──────────────────────────────────────
|
||||
x_min, x_max = x_range
|
||||
z_min, z_max = z_range
|
||||
n_x = max(1, int(math.ceil((x_max - x_min) / cell_m)))
|
||||
n_z = max(1, int(math.ceil((z_max - z_min) / cell_m)))
|
||||
|
||||
xi = np.clip(((pts[:, 0] - x_min) / cell_m).astype(np.int32), 0, n_x - 1)
|
||||
zi = np.clip(((pts[:, 2] - z_min) / cell_m).astype(np.int32), 0, n_z - 1)
|
||||
|
||||
# Map each grid cell to list of 3D-point indices
|
||||
cell_pts: dict[Tuple[int, int], list] = {}
|
||||
for idx, (gx, gz) in enumerate(zip(xi.tolist(), zi.tolist())):
|
||||
key = (gx, gz)
|
||||
if key not in cell_pts:
|
||||
cell_pts[key] = []
|
||||
cell_pts[key].append(idx)
|
||||
|
||||
occupied = set(cell_pts.keys())
|
||||
|
||||
# ── BFS connected-component labelling ────────────────────────────────────
|
||||
visited = set()
|
||||
clusters = []
|
||||
|
||||
for start in occupied:
|
||||
if start in visited:
|
||||
continue
|
||||
|
||||
# BFS flood fill
|
||||
component_cells = []
|
||||
queue = deque([start])
|
||||
visited.add(start)
|
||||
|
||||
while queue:
|
||||
cx, cz = queue.popleft()
|
||||
component_cells.append((cx, cz))
|
||||
|
||||
for dx, dz in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
|
||||
nb = (cx + dx, cz + dz)
|
||||
if nb in occupied and nb not in visited:
|
||||
visited.add(nb)
|
||||
queue.append(nb)
|
||||
|
||||
# Collect 3D points in this component
|
||||
pt_indices = []
|
||||
for cell in component_cells:
|
||||
pt_indices.extend(cell_pts[cell])
|
||||
|
||||
if len(pt_indices) < min_pts:
|
||||
continue
|
||||
|
||||
cluster_pts = pts[pt_indices]
|
||||
cluster_h = heights[pt_indices]
|
||||
centroid = cluster_pts.mean(axis=0)
|
||||
dists = np.linalg.norm(cluster_pts - centroid, axis=1)
|
||||
radius_m = float(dists.max())
|
||||
mean_height = float(cluster_h.mean())
|
||||
|
||||
clusters.append(ObstacleCluster(
|
||||
centroid = centroid,
|
||||
radius_m = radius_m,
|
||||
height_m = mean_height,
|
||||
n_pts = len(pt_indices),
|
||||
))
|
||||
|
||||
# Sort by forward distance
|
||||
clusters.sort(key=lambda c: c.distance_m)
|
||||
return clusters
|
||||
|
||||
|
||||
# ── Local import for math ─────────────────────────────────────────────────────
|
||||
import math
|
||||
@ -0,0 +1,512 @@
|
||||
"""
|
||||
obstacle_detect_node.py — RealSense depth obstacle detection node (Issue #611).
|
||||
|
||||
Pipeline per frame (30 Hz)
|
||||
──────────────────────────
|
||||
1. Receive D435i aligned depth image (float32, metres) + camera_info.
|
||||
2. Downsample by `downsample_factor` for efficiency.
|
||||
3. Back-project non-zero pixels to 3D (camera optical frame, +Z forward).
|
||||
4. RANSAC ground-plane fitting on the 3D cloud.
|
||||
5. Classify points: ground (inlier) vs obstacle (above ground, in height window).
|
||||
6. Cluster obstacle points using 2D grid BFS on the (X, Z) plane.
|
||||
7. Publish:
|
||||
/saltybot/obstacles visualization_msgs/MarkerArray (centroids)
|
||||
/saltybot/obstacles/cloud sensor_msgs/PointCloud2 (non-ground pts)
|
||||
/saltybot/obstacles/alert std_msgs/String (JSON alert)
|
||||
8. When `depth_estop_enabled` and DANGER detected: publish zero Twist to
|
||||
`depth_estop_topic` (default /cmd_vel_input) for safety_zone integration.
|
||||
|
||||
Obstacle alert levels (JSON on /saltybot/obstacles/alert)
|
||||
──────────────────────────────────────────────────────────
|
||||
DANGER — closest obstacle z < danger_dist_m (requires immediate stop)
|
||||
WARN — closest obstacle z < warn_dist_m (caution)
|
||||
CLEAR — no obstacles in range
|
||||
|
||||
Safety zone integration
|
||||
───────────────────────
|
||||
When `depth_estop_enabled: true`, DANGER obstacles trigger a zero Twist
|
||||
published to `depth_estop_topic` (default: /cmd_vel_input).
|
||||
This feeds into the safety_zone's cmd_vel chain and triggers its e-stop,
|
||||
giving independent depth-based stopping complementary to the LIDAR zones.
|
||||
|
||||
Subscribes
|
||||
──────────
|
||||
/camera/depth/image_rect_raw sensor_msgs/Image 30 Hz float32 (m)
|
||||
/camera/depth/camera_info sensor_msgs/CameraInfo (once)
|
||||
|
||||
Publishes
|
||||
─────────
|
||||
/saltybot/obstacles visualization_msgs/MarkerArray
|
||||
/saltybot/obstacles/cloud sensor_msgs/PointCloud2
|
||||
/saltybot/obstacles/alert std_msgs/String (JSON)
|
||||
[depth_estop_topic] geometry_msgs/Twist (zero, when DANGER)
|
||||
|
||||
Parameters (see config/obstacle_detect_params.yaml)
|
||||
────────────────────────────────────────────────────
|
||||
downsample_factor int 8 factor to reduce image before processing
|
||||
ransac_n_iter int 50 RANSAC iterations for ground plane
|
||||
ransac_inlier_m float 0.06 inlier threshold (m)
|
||||
min_obstacle_h_m float 0.05 min height above ground to be obstacle
|
||||
max_obstacle_h_m float 0.80 max height above ground to be obstacle
|
||||
cluster_cell_m float 0.30 clustering grid cell size (m)
|
||||
cluster_min_pts int 5 minimum points per cluster
|
||||
max_obstacle_dist_m float 5.0 discard obstacles beyond this distance
|
||||
danger_dist_m float 0.40 obstacle z < this → DANGER
|
||||
warn_dist_m float 1.20 obstacle z < this → WARN
|
||||
depth_estop_enabled bool false publish zero-vel when DANGER
|
||||
depth_estop_topic str /cmd_vel_input
|
||||
camera_frame str camera_link
|
||||
marker_lifetime_s float 0.2 MarkerArray marker lifetime (s)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
import struct
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import (
|
||||
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
|
||||
)
|
||||
|
||||
import numpy as np
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
from geometry_msgs.msg import Point, Twist, Vector3
|
||||
from sensor_msgs.msg import Image, CameraInfo, PointCloud2, PointField
|
||||
from std_msgs.msg import Header, String, ColorRGBA
|
||||
from visualization_msgs.msg import Marker, MarkerArray
|
||||
|
||||
from .ground_plane import fit_ground_plane, height_above_plane, obstacle_mask
|
||||
from .obstacle_clusterer import ObstacleCluster, cluster_obstacles
|
||||
|
||||
|
||||
# ── QoS ───────────────────────────────────────────────────────────────────────
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
_LATCHED_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=1,
|
||||
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
||||
)
|
||||
|
||||
# Alert levels
|
||||
CLEAR = 'CLEAR'
|
||||
WARN = 'WARN'
|
||||
DANGER = 'DANGER'
|
||||
|
||||
# Marker colours per alert level
|
||||
_COLOUR = {
|
||||
CLEAR: (0.0, 1.0, 0.0, 0.8), # green
|
||||
WARN: (1.0, 0.8, 0.0, 0.9), # yellow
|
||||
DANGER: (1.0, 0.1, 0.1, 1.0), # red
|
||||
}
|
||||
|
||||
# D435i depth scale (image is already float32 in metres when using 32FC1)
|
||||
_DEPTH_ENCODING = '32FC1'
|
||||
|
||||
|
||||
class ObstacleDetectNode(Node):
|
||||
"""RealSense depth-based obstacle detection — see module docstring."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('obstacle_detect')
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter('downsample_factor', 8)
|
||||
self.declare_parameter('ransac_n_iter', 50)
|
||||
self.declare_parameter('ransac_inlier_m', 0.06)
|
||||
self.declare_parameter('min_obstacle_h_m', 0.05)
|
||||
self.declare_parameter('max_obstacle_h_m', 0.80)
|
||||
self.declare_parameter('cluster_cell_m', 0.30)
|
||||
self.declare_parameter('cluster_min_pts', 5)
|
||||
self.declare_parameter('max_obstacle_dist_m', 5.0)
|
||||
self.declare_parameter('danger_dist_m', 0.40)
|
||||
self.declare_parameter('warn_dist_m', 1.20)
|
||||
self.declare_parameter('depth_estop_enabled', False)
|
||||
self.declare_parameter('depth_estop_topic', '/cmd_vel_input')
|
||||
self.declare_parameter('camera_frame', 'camera_link')
|
||||
self.declare_parameter('marker_lifetime_s', 0.2)
|
||||
|
||||
self._ds = self.get_parameter('downsample_factor').value
|
||||
self._n_iter = self.get_parameter('ransac_n_iter').value
|
||||
self._inlier_m = self.get_parameter('ransac_inlier_m').value
|
||||
self._min_h = self.get_parameter('min_obstacle_h_m').value
|
||||
self._max_h = self.get_parameter('max_obstacle_h_m').value
|
||||
self._cell_m = self.get_parameter('cluster_cell_m').value
|
||||
self._min_pts = self.get_parameter('cluster_min_pts').value
|
||||
self._max_dist = self.get_parameter('max_obstacle_dist_m').value
|
||||
self._danger_m = self.get_parameter('danger_dist_m').value
|
||||
self._warn_m = self.get_parameter('warn_dist_m').value
|
||||
self._estop_en = self.get_parameter('depth_estop_enabled').value
|
||||
self._cam_frame = self.get_parameter('camera_frame').value
|
||||
self._marker_life = self.get_parameter('marker_lifetime_s').value
|
||||
|
||||
estop_topic = self.get_parameter('depth_estop_topic').value
|
||||
|
||||
# ── Internal state ────────────────────────────────────────────────────
|
||||
self._bridge = CvBridge()
|
||||
self._fx: float | None = None
|
||||
self._fy: float | None = None
|
||||
self._cx: float | None = None
|
||||
self._cy: float | None = None
|
||||
|
||||
# Ground plane stability: blend current with previous (exponential)
|
||||
self._plane_normal: np.ndarray | None = None
|
||||
self._plane_d: float | None = None
|
||||
self._plane_alpha = 0.3 # blend weight for new plane estimate
|
||||
|
||||
# ── Publishers ─────────────────────────────────────────────────────────
|
||||
self._marker_pub = self.create_publisher(
|
||||
MarkerArray, '/saltybot/obstacles', 10
|
||||
)
|
||||
self._cloud_pub = self.create_publisher(
|
||||
PointCloud2, '/saltybot/obstacles/cloud', 10
|
||||
)
|
||||
self._alert_pub = self.create_publisher(
|
||||
String, '/saltybot/obstacles/alert', 10
|
||||
)
|
||||
if self._estop_en:
|
||||
self._estop_pub = self.create_publisher(
|
||||
Twist, estop_topic, 10
|
||||
)
|
||||
else:
|
||||
self._estop_pub = None
|
||||
|
||||
# ── Subscriptions ──────────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
CameraInfo,
|
||||
'/camera/depth/camera_info',
|
||||
self._on_camera_info,
|
||||
_LATCHED_QOS,
|
||||
)
|
||||
self.create_subscription(
|
||||
Image,
|
||||
'/camera/depth/image_rect_raw',
|
||||
self._on_depth,
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f'obstacle_detect ready — '
|
||||
f'ds={self._ds} ransac={self._n_iter}it '
|
||||
f'danger={self._danger_m}m warn={self._warn_m}m '
|
||||
f'estop={"ON → " + estop_topic if self._estop_en else "OFF"}'
|
||||
)
|
||||
|
||||
# ── Camera info ───────────────────────────────────────────────────────────
|
||||
|
||||
def _on_camera_info(self, msg: CameraInfo) -> None:
|
||||
if self._fx is not None:
|
||||
return
|
||||
self._fx = float(msg.k[0])
|
||||
self._fy = float(msg.k[4])
|
||||
self._cx = float(msg.k[2])
|
||||
self._cy = float(msg.k[5])
|
||||
self.get_logger().info(
|
||||
f'camera_info: fx={self._fx:.1f} fy={self._fy:.1f} '
|
||||
f'cx={self._cx:.1f} cy={self._cy:.1f}'
|
||||
)
|
||||
|
||||
# ── Main depth callback ───────────────────────────────────────────────────
|
||||
|
||||
def _on_depth(self, msg: Image) -> None:
|
||||
if self._fx is None:
|
||||
return # wait for camera_info
|
||||
|
||||
# ── Decode depth ──────────────────────────────────────────────────────
|
||||
try:
|
||||
depth = self._bridge.imgmsg_to_cv2(msg, desired_encoding=_DEPTH_ENCODING)
|
||||
except Exception as exc:
|
||||
self.get_logger().warning(f'depth decode error: {exc}', throttle_duration_sec=5.0)
|
||||
return
|
||||
|
||||
h_img, w_img = depth.shape
|
||||
stamp = msg.header.stamp
|
||||
|
||||
# ── Downsample ────────────────────────────────────────────────────────
|
||||
ds = self._ds
|
||||
depth_ds = depth[::ds, ::ds] # stride-based downsample
|
||||
h_ds, w_ds = depth_ds.shape
|
||||
|
||||
# Back-projection intrinsics at downsampled resolution
|
||||
fx = self._fx / ds
|
||||
fy = self._fy / ds
|
||||
cx = self._cx / ds
|
||||
cy = self._cy / ds
|
||||
|
||||
# ── Back-project to 3D ────────────────────────────────────────────────
|
||||
points = _backproject(depth_ds, fx, fy, cx, cy, max_dist=self._max_dist)
|
||||
|
||||
if len(points) < 30:
|
||||
self._publish_empty(stamp)
|
||||
return
|
||||
|
||||
# ── RANSAC ground plane ───────────────────────────────────────────────
|
||||
result = fit_ground_plane(
|
||||
points,
|
||||
n_iter = self._n_iter,
|
||||
inlier_thresh_m = self._inlier_m,
|
||||
)
|
||||
|
||||
if result is None:
|
||||
self._publish_empty(stamp)
|
||||
return
|
||||
|
||||
new_normal, new_d = result
|
||||
|
||||
# Blend with previous plane for temporal stability
|
||||
if self._plane_normal is None:
|
||||
self._plane_normal = new_normal
|
||||
self._plane_d = new_d
|
||||
else:
|
||||
alpha = self._plane_alpha
|
||||
blended = (1.0 - alpha) * self._plane_normal + alpha * new_normal
|
||||
norm_len = float(np.linalg.norm(blended))
|
||||
if norm_len > 1e-8:
|
||||
self._plane_normal = blended / norm_len
|
||||
self._plane_d = (1.0 - alpha) * self._plane_d + alpha * new_d
|
||||
|
||||
plane = (self._plane_normal, self._plane_d)
|
||||
|
||||
# ── Obstacle mask ─────────────────────────────────────────────────────
|
||||
obs_mask = obstacle_mask(points, plane, self._min_h, self._max_h)
|
||||
obs_pts = points[obs_mask]
|
||||
|
||||
heights_all = height_above_plane(points, plane)
|
||||
obs_heights = heights_all[obs_mask]
|
||||
|
||||
# ── Cluster obstacles ─────────────────────────────────────────────────
|
||||
clusters = cluster_obstacles(
|
||||
obs_pts, obs_heights,
|
||||
cell_m = self._cell_m,
|
||||
min_pts = self._min_pts,
|
||||
z_range = (0.15, self._max_dist),
|
||||
)
|
||||
|
||||
# ── Alert level ───────────────────────────────────────────────────────
|
||||
alert_level = CLEAR
|
||||
closest_dist = float('inf')
|
||||
|
||||
for cluster in clusters:
|
||||
d = cluster.distance_m
|
||||
if d < closest_dist:
|
||||
closest_dist = d
|
||||
if d < self._danger_m:
|
||||
alert_level = DANGER
|
||||
break
|
||||
if d < self._warn_m and alert_level != DANGER:
|
||||
alert_level = WARN
|
||||
|
||||
if closest_dist == float('inf'):
|
||||
closest_dist = -1.0
|
||||
|
||||
# ── Publish ───────────────────────────────────────────────────────────
|
||||
self._publish_markers(clusters, stamp)
|
||||
self._publish_cloud(obs_pts, stamp)
|
||||
self._publish_alert(alert_level, clusters, closest_dist, stamp)
|
||||
|
||||
# Depth e-stop: feed zero vel to safety zone chain
|
||||
if self._estop_en and self._estop_pub is not None and alert_level == DANGER:
|
||||
self._estop_pub.publish(Twist())
|
||||
|
||||
# ── Publishers ────────────────────────────────────────────────────────────
|
||||
|
||||
def _publish_markers(self, clusters: list, stamp) -> None:
|
||||
"""Publish obstacle centroids as sphere MarkerArray."""
|
||||
ma = MarkerArray()
|
||||
lifetime = _ros_duration(self._marker_life)
|
||||
|
||||
# Delete-all marker first for clean slate
|
||||
del_marker = Marker()
|
||||
del_marker.action = Marker.DELETEALL
|
||||
del_marker.header.stamp = stamp
|
||||
del_marker.header.frame_id = self._cam_frame
|
||||
ma.markers.append(del_marker)
|
||||
|
||||
for idx, cluster in enumerate(clusters):
|
||||
level = self._cluster_alert_level(cluster)
|
||||
r, g, b, a = _COLOUR[level]
|
||||
cx, cy, cz = cluster.centroid
|
||||
|
||||
m = Marker()
|
||||
m.header.stamp = stamp
|
||||
m.header.frame_id = self._cam_frame
|
||||
m.ns = 'obstacles'
|
||||
m.id = idx + 1
|
||||
m.type = Marker.SPHERE
|
||||
m.action = Marker.ADD
|
||||
m.pose.position = Point(x=float(cx), y=float(cy), z=float(cz))
|
||||
m.pose.orientation.w = 1.0
|
||||
rad = max(0.05, cluster.radius_m)
|
||||
m.scale = Vector3(x=rad * 2.0, y=rad * 2.0, z=rad * 2.0)
|
||||
m.color = ColorRGBA(r=r, g=g, b=b, a=a)
|
||||
m.lifetime = lifetime
|
||||
ma.markers.append(m)
|
||||
|
||||
# Text label above sphere
|
||||
lbl = Marker()
|
||||
lbl.header = m.header
|
||||
lbl.ns = 'obstacle_labels'
|
||||
lbl.id = idx + 1
|
||||
lbl.type = Marker.TEXT_VIEW_FACING
|
||||
lbl.action = Marker.ADD
|
||||
lbl.pose.position = Point(x=float(cx), y=float(cy) - 0.15, z=float(cz))
|
||||
lbl.pose.orientation.w = 1.0
|
||||
lbl.scale.z = 0.12
|
||||
lbl.color = ColorRGBA(r=1.0, g=1.0, b=1.0, a=0.9)
|
||||
lbl.text = f'{cluster.distance_m:.2f}m'
|
||||
lbl.lifetime = lifetime
|
||||
ma.markers.append(lbl)
|
||||
|
||||
self._marker_pub.publish(ma)
|
||||
|
||||
def _publish_cloud(self, points: np.ndarray, stamp) -> None:
|
||||
"""Publish non-ground obstacle points as PointCloud2 (xyz float32)."""
|
||||
if len(points) == 0:
|
||||
# Publish empty cloud
|
||||
cloud = PointCloud2()
|
||||
cloud.header.stamp = stamp
|
||||
cloud.header.frame_id = self._cam_frame
|
||||
self._cloud_pub.publish(cloud)
|
||||
return
|
||||
|
||||
pts_f32 = points.astype(np.float32)
|
||||
|
||||
fields = [
|
||||
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
|
||||
]
|
||||
cloud = PointCloud2()
|
||||
cloud.header.stamp = stamp
|
||||
cloud.header.frame_id = self._cam_frame
|
||||
cloud.height = 1
|
||||
cloud.width = len(pts_f32)
|
||||
cloud.fields = fields
|
||||
cloud.is_bigendian = False
|
||||
cloud.point_step = 12 # 3 × float32
|
||||
cloud.row_step = cloud.point_step * cloud.width
|
||||
cloud.data = pts_f32.tobytes()
|
||||
cloud.is_dense = True
|
||||
self._cloud_pub.publish(cloud)
|
||||
|
||||
def _publish_alert(
|
||||
self,
|
||||
level: str,
|
||||
clusters: list,
|
||||
closest_m: float,
|
||||
stamp,
|
||||
) -> None:
|
||||
obstacles_json = [
|
||||
{
|
||||
'x': round(float(c.centroid[0]), 3),
|
||||
'y': round(float(c.centroid[1]), 3),
|
||||
'z': round(float(c.centroid[2]), 3),
|
||||
'radius_m': round(c.radius_m, 3),
|
||||
'height_m': round(c.height_m, 3),
|
||||
'n_pts': c.n_pts,
|
||||
'level': self._cluster_alert_level(c),
|
||||
}
|
||||
for c in clusters
|
||||
]
|
||||
alert = {
|
||||
'alert_level': level,
|
||||
'closest_m': round(closest_m, 3),
|
||||
'obstacle_count': len(clusters),
|
||||
'obstacles': obstacles_json,
|
||||
}
|
||||
msg = String()
|
||||
msg.data = json.dumps(alert)
|
||||
self._alert_pub.publish(msg)
|
||||
|
||||
def _publish_empty(self, stamp) -> None:
|
||||
"""Publish empty state (no obstacles detected / ground plane failed)."""
|
||||
self._publish_markers([], stamp)
|
||||
self._publish_cloud(np.zeros((0, 3), np.float32), stamp)
|
||||
alert = {
|
||||
'alert_level': CLEAR,
|
||||
'closest_m': -1.0,
|
||||
'obstacle_count': 0,
|
||||
'obstacles': [],
|
||||
}
|
||||
msg = String()
|
||||
msg.data = json.dumps(alert)
|
||||
self._alert_pub.publish(msg)
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
def _cluster_alert_level(self, cluster: ObstacleCluster) -> str:
|
||||
d = cluster.distance_m
|
||||
if d < self._danger_m:
|
||||
return DANGER
|
||||
if d < self._warn_m:
|
||||
return WARN
|
||||
return CLEAR
|
||||
|
||||
|
||||
# ── Pure-function helpers ──────────────────────────────────────────────────────
|
||||
|
||||
def _backproject(
|
||||
depth: np.ndarray,
|
||||
fx: float, fy: float,
|
||||
cx: float, cy: float,
|
||||
min_dist: float = 0.15,
|
||||
max_dist: float = 6.0,
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Back-project a float32 depth image (metres) to a (N, 3) 3D point array.
|
||||
|
||||
Camera optical frame: +X right, +Y down, +Z forward.
|
||||
Only returns pixels where depth is in [min_dist, max_dist].
|
||||
"""
|
||||
h, w = depth.shape
|
||||
|
||||
# Build pixel-grid meshgrid
|
||||
u_arr = np.arange(w, dtype=np.float32)
|
||||
v_arr = np.arange(h, dtype=np.float32)
|
||||
uu, vv = np.meshgrid(u_arr, v_arr)
|
||||
|
||||
z = depth.astype(np.float32).ravel()
|
||||
u = uu.ravel()
|
||||
v = vv.ravel()
|
||||
|
||||
valid = (z > min_dist) & (z < max_dist) & np.isfinite(z)
|
||||
z = z[valid]
|
||||
u = u[valid]
|
||||
v = v[valid]
|
||||
|
||||
x = (u - cx) * z / fx
|
||||
y = (v - cy) * z / fy
|
||||
|
||||
return np.stack([x, y, z], axis=1).astype(np.float64)
|
||||
|
||||
|
||||
def _ros_duration(seconds: float):
|
||||
"""Build a rclpy Duration-compatible value for Marker.lifetime."""
|
||||
from rclpy.duration import Duration
|
||||
sec = int(seconds)
|
||||
nsec = int((seconds - sec) * 1e9)
|
||||
return Duration(seconds=sec, nanoseconds=nsec).to_msg()
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = ObstacleDetectNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
5
jetson/ros2_ws/src/saltybot_obstacle_detect/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_obstacle_detect/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_obstacle_detect
|
||||
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_obstacle_detect
|
||||
30
jetson/ros2_ws/src/saltybot_obstacle_detect/setup.py
Normal file
30
jetson/ros2_ws/src/saltybot_obstacle_detect/setup.py
Normal file
@ -0,0 +1,30 @@
|
||||
import os
|
||||
from glob import glob
|
||||
from setuptools import setup
|
||||
|
||||
package_name = 'saltybot_obstacle_detect'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.1.0',
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
|
||||
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
maintainer='sl-perception',
|
||||
maintainer_email='sl-perception@saltylab.local',
|
||||
description='RealSense depth obstacle detection — RANSAC ground + clustering (Issue #611)',
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'obstacle_detect = saltybot_obstacle_detect.obstacle_detect_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,129 @@
|
||||
"""Unit tests for ground_plane.py — RANSAC fitting and point classification."""
|
||||
|
||||
import math
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from saltybot_obstacle_detect.ground_plane import (
|
||||
fit_ground_plane,
|
||||
height_above_plane,
|
||||
obstacle_mask,
|
||||
ground_mask,
|
||||
)
|
||||
|
||||
|
||||
def make_flat_ground(n=500, y_ground=1.0, noise=0.02, rng=None):
|
||||
"""Flat ground plane at y = y_ground (camera frame: Y down)."""
|
||||
if rng is None:
|
||||
rng = np.random.default_rng(0)
|
||||
x = rng.uniform(-2.0, 2.0, n).astype(np.float64)
|
||||
z = rng.uniform(0.5, 4.0, n).astype(np.float64)
|
||||
y = np.full(n, y_ground) + rng.normal(0, noise, n)
|
||||
return np.stack([x, y, z], axis=1)
|
||||
|
||||
|
||||
def make_obstacle_above_ground(cx=0.0, y_ground=1.0, height=0.3, n=50, rng=None):
|
||||
"""Points forming a box above the ground plane."""
|
||||
if rng is None:
|
||||
rng = np.random.default_rng(1)
|
||||
x = rng.uniform(cx - 0.15, cx + 0.15, n)
|
||||
z = rng.uniform(1.5, 2.0, n)
|
||||
y = np.full(n, y_ground - height) # Y_down: smaller y = higher in world
|
||||
return np.stack([x, y, z], axis=1)
|
||||
|
||||
|
||||
# ── fit_ground_plane ──────────────────────────────────────────────────────────
|
||||
|
||||
def test_fit_flat_ground_returns_result():
|
||||
pts = make_flat_ground(n=400)
|
||||
plane = fit_ground_plane(pts, n_iter=50)
|
||||
assert plane is not None
|
||||
|
||||
|
||||
def test_fit_ground_normal_points_upward():
|
||||
"""Normal should have negative Y component (upward in camera frame)."""
|
||||
pts = make_flat_ground(n=400)
|
||||
plane = fit_ground_plane(pts, n_iter=50)
|
||||
assert plane is not None
|
||||
normal, d = plane
|
||||
assert normal[1] < 0.0, f"Expected normal.y < 0, got {normal[1]:.3f}"
|
||||
|
||||
|
||||
def test_fit_ground_normal_unit_vector():
|
||||
pts = make_flat_ground(n=300)
|
||||
plane = fit_ground_plane(pts, n_iter=50)
|
||||
assert plane is not None
|
||||
normal, _ = plane
|
||||
assert abs(np.linalg.norm(normal) - 1.0) < 1e-6
|
||||
|
||||
|
||||
def test_fit_too_few_points_returns_none():
|
||||
pts = make_flat_ground(n=5)
|
||||
plane = fit_ground_plane(pts, n_iter=10)
|
||||
assert plane is None
|
||||
|
||||
|
||||
def test_fit_empty_returns_none():
|
||||
plane = fit_ground_plane(np.zeros((0, 3)))
|
||||
assert plane is None
|
||||
|
||||
|
||||
def test_fit_tilted_ground():
|
||||
"""Tilted ground plane (robot on slope) should still be found."""
|
||||
rng = np.random.default_rng(42)
|
||||
n = 400
|
||||
x = rng.uniform(-2.0, 2.0, n)
|
||||
z = rng.uniform(0.5, 4.0, n)
|
||||
# Tilt: y = 0.5 + 0.1*z (plane tilted 5.7° forward)
|
||||
y = 0.5 + 0.1 * z + rng.normal(0, 0.02, n)
|
||||
pts = np.stack([x, y, z], axis=1)
|
||||
plane = fit_ground_plane(pts, n_iter=80)
|
||||
assert plane is not None
|
||||
|
||||
|
||||
# ── height_above_plane ────────────────────────────────────────────────────────
|
||||
|
||||
def test_ground_pts_near_zero_height():
|
||||
ground_pts = make_flat_ground(n=200, y_ground=1.0, noise=0.005)
|
||||
plane = fit_ground_plane(ground_pts, n_iter=60)
|
||||
assert plane is not None
|
||||
heights = height_above_plane(ground_pts, plane)
|
||||
assert np.abs(heights).mean() < 0.10 # mean residual < 10 cm
|
||||
|
||||
|
||||
def test_obstacle_pts_positive_height():
|
||||
rng = np.random.default_rng(7)
|
||||
ground_pts = make_flat_ground(n=300, y_ground=1.0, rng=rng)
|
||||
obs_pts = make_obstacle_above_ground(y_ground=1.0, height=0.3, rng=rng)
|
||||
all_pts = np.vstack([ground_pts, obs_pts])
|
||||
plane = fit_ground_plane(all_pts, n_iter=60)
|
||||
assert plane is not None
|
||||
heights = height_above_plane(obs_pts, plane)
|
||||
assert heights.mean() > 0.1, f"Expected obstacles above ground, got mean h={heights.mean():.3f}"
|
||||
|
||||
|
||||
# ── obstacle_mask ─────────────────────────────────────────────────────────────
|
||||
|
||||
def test_obstacle_mask_selects_correct_points():
|
||||
rng = np.random.default_rng(3)
|
||||
ground_pts = make_flat_ground(n=300, y_ground=1.0, rng=rng)
|
||||
obs_pts = make_obstacle_above_ground(y_ground=1.0, height=0.4, n=60, rng=rng)
|
||||
all_pts = np.vstack([ground_pts, obs_pts])
|
||||
plane = fit_ground_plane(all_pts, n_iter=60)
|
||||
assert plane is not None
|
||||
mask = obstacle_mask(all_pts, plane, min_height_m=0.05, max_height_m=0.80)
|
||||
# Most of the obs_pts should be selected
|
||||
n_ground_selected = mask[:300].sum()
|
||||
n_obs_selected = mask[300:].sum()
|
||||
assert n_obs_selected > 40, f"Expected most obs selected, got {n_obs_selected}/60"
|
||||
assert n_ground_selected < 30, f"Expected few ground points selected, got {n_ground_selected}/300"
|
||||
|
||||
|
||||
# ── ground_mask ───────────────────────────────────────────────────────────────
|
||||
|
||||
def test_ground_mask_covers_inliers():
|
||||
pts = make_flat_ground(n=300, noise=0.01)
|
||||
plane = fit_ground_plane(pts, n_iter=60)
|
||||
assert plane is not None
|
||||
gmask = ground_mask(pts, plane, inlier_thresh=0.08)
|
||||
assert gmask.mean() > 0.7 # at least 70% of flat ground should be ground inliers
|
||||
@ -0,0 +1,105 @@
|
||||
"""Unit tests for obstacle_clusterer.py — 2D grid BFS clustering."""
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from saltybot_obstacle_detect.obstacle_clusterer import cluster_obstacles, ObstacleCluster
|
||||
|
||||
|
||||
def _make_cluster_pts(cx, cz, n=30, spread=0.15, rng=None):
|
||||
"""Generate points around a centroid in camera frame (x=lateral, z=forward)."""
|
||||
if rng is None:
|
||||
rng = np.random.default_rng(0)
|
||||
x = rng.uniform(cx - spread, cx + spread, n)
|
||||
y = rng.uniform(-0.3, 0.0, n) # above ground (height ~0.3m)
|
||||
z = rng.uniform(cz - spread, cz + spread, n)
|
||||
return np.stack([x, y, z], axis=1)
|
||||
|
||||
|
||||
def _make_heights(pts, base_h=0.3, noise=0.05, rng=None):
|
||||
if rng is None:
|
||||
rng = np.random.default_rng(1)
|
||||
return np.full(len(pts), base_h) + rng.normal(0, noise, len(pts))
|
||||
|
||||
|
||||
# ── Basic clustering ──────────────────────────────────────────────────────────
|
||||
|
||||
def test_single_cluster_detected():
|
||||
pts = _make_cluster_pts(cx=0.0, cz=2.0, n=40)
|
||||
heights = _make_heights(pts)
|
||||
clusters = cluster_obstacles(pts, heights, cell_m=0.30, min_pts=5)
|
||||
assert len(clusters) == 1
|
||||
|
||||
|
||||
def test_two_clusters_separated():
|
||||
rng = np.random.default_rng(42)
|
||||
pts1 = _make_cluster_pts(cx=-1.0, cz=2.0, n=40, rng=rng)
|
||||
pts2 = _make_cluster_pts(cx=+1.5, cz=3.5, n=40, rng=rng)
|
||||
pts = np.vstack([pts1, pts2])
|
||||
h = _make_heights(pts, rng=rng)
|
||||
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5)
|
||||
assert len(clusters) == 2
|
||||
|
||||
|
||||
def test_clusters_sorted_by_distance():
|
||||
rng = np.random.default_rng(7)
|
||||
pts_near = _make_cluster_pts(cx=0.0, cz=1.5, n=40, rng=rng)
|
||||
pts_far = _make_cluster_pts(cx=0.0, cz=4.0, n=40, rng=rng)
|
||||
pts = np.vstack([pts_far, pts_near]) # far first
|
||||
h = _make_heights(pts, rng=rng)
|
||||
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5)
|
||||
assert len(clusters) == 2
|
||||
assert clusters[0].distance_m < clusters[1].distance_m
|
||||
|
||||
|
||||
def test_empty_input():
|
||||
clusters = cluster_obstacles(
|
||||
np.zeros((0, 3), np.float64),
|
||||
np.zeros(0, np.float64),
|
||||
)
|
||||
assert clusters == []
|
||||
|
||||
|
||||
def test_min_pts_filters_small_cluster():
|
||||
rng = np.random.default_rng(9)
|
||||
pts_big = _make_cluster_pts(cx=0.0, cz=2.0, n=50, rng=rng)
|
||||
pts_tiny = _make_cluster_pts(cx=2.0, cz=2.0, n=2, rng=rng)
|
||||
pts = np.vstack([pts_big, pts_tiny])
|
||||
h = _make_heights(pts, rng=rng)
|
||||
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5)
|
||||
# Only the big cluster should pass
|
||||
assert len(clusters) == 1
|
||||
assert clusters[0].n_pts >= 40
|
||||
|
||||
|
||||
def test_centroid_near_true_center():
|
||||
rng = np.random.default_rng(5)
|
||||
true_cx, true_cz = 0.0, 2.5
|
||||
pts = _make_cluster_pts(cx=true_cx, cz=true_cz, n=100, spread=0.1, rng=rng)
|
||||
h = _make_heights(pts, rng=rng)
|
||||
clusters = cluster_obstacles(pts, h, cell_m=0.20, min_pts=5)
|
||||
assert len(clusters) >= 1
|
||||
c = clusters[0]
|
||||
assert abs(c.centroid[0] - true_cx) < 0.3
|
||||
assert abs(c.centroid[2] - true_cz) < 0.3
|
||||
|
||||
|
||||
def test_out_of_range_points_ignored():
|
||||
rng = np.random.default_rng(3)
|
||||
pts_ok = _make_cluster_pts(cx=0.0, cz=2.0, n=30, rng=rng)
|
||||
# Points beyond max_obstacle_dist_m=5.0
|
||||
pts_far = _make_cluster_pts(cx=0.0, cz=7.0, n=30, rng=rng)
|
||||
pts = np.vstack([pts_ok, pts_far])
|
||||
h = _make_heights(pts, rng=rng)
|
||||
clusters = cluster_obstacles(pts, h, cell_m=0.30, min_pts=5, z_range=(0.15, 5.0))
|
||||
# Only the near cluster should survive
|
||||
assert all(c.distance_m <= 5.0 for c in clusters)
|
||||
|
||||
|
||||
# ── ObstacleCluster accessors ─────────────────────────────────────────────────
|
||||
|
||||
def test_cluster_distance_and_lateral():
|
||||
centroid = np.array([1.5, -0.3, 3.0])
|
||||
c = ObstacleCluster(centroid=centroid, radius_m=0.2, height_m=0.3, n_pts=30)
|
||||
assert abs(c.distance_m - 3.0) < 1e-9
|
||||
assert abs(c.lateral_m - 1.5) < 1e-9
|
||||
Loading…
x
Reference in New Issue
Block a user