feat(bringup): LIDAR Euclidean object clustering + RViz visualisation (Issue #239)

Adds gap-based Euclidean distance clustering of /scan LaserScan points.
Each cluster is published as a labelled semi-transparent CUBE + TEXT marker
in /saltybot/lidar_clusters (MarkerArray), sorted nearest-first.  Stale
markers from shrinking cluster counts are explicitly deleted each cycle.
22/22 pure-Python tests pass (no ROS2 required).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-02 12:21:35 -05:00
parent 3901e27683
commit ff34f5ac43
4 changed files with 538 additions and 0 deletions

View File

@ -0,0 +1,131 @@
"""
_lidar_clustering.py Euclidean clustering of 2-D LIDAR scan points (no ROS2 deps).
Algorithm
---------
1. Convert LaserScan ranges to Cartesian (x, y) points, dropping inf / NaN.
2. Walk through the points in angular order. When the Euclidean distance
between consecutive scan points exceeds `distance_threshold_m`, a new
cluster begins (gap-based segmentation O(N), standard for dense 360°
LIDAR scans at short range).
3. Discard clusters with fewer than `min_points` points.
Each cluster is returned as a Cluster namedtuple with:
points (N, 2) float64 ndarray of (x, y) in metres
centroid (2,) float64 ndarray, mean of points
bbox_min (2,) min (x, y) corner of axis-aligned bounding box
bbox_max (2,) max (x, y) corner of axis-aligned bounding box
width_m bbox X extent (metres)
depth_m bbox Y extent (metres)
Coordinate frame: 2-D projection of the LIDAR optical frame (+X forward, +Y left).
"""
from __future__ import annotations
import math
from typing import List, NamedTuple
import numpy as np
class Cluster(NamedTuple):
points: np.ndarray # (N, 2) float64
centroid: np.ndarray # (2,) float64
bbox_min: np.ndarray # (2,) float64
bbox_max: np.ndarray # (2,) float64
width_m: float
depth_m: float
def scan_to_cartesian(
ranges: List[float],
angle_min: float,
angle_increment: float,
range_min: float = 0.05,
range_max: float = 12.0,
) -> np.ndarray:
"""
Convert LaserScan ranges to a (M, 2) float64 array of valid (x, y) points.
Parameters
----------
ranges : raw range values from LaserScan.ranges
angle_min : start angle of the scan (radians)
angle_increment : angular step between consecutive readings (radians)
range_min : readings below this are discarded (m)
range_max : readings above this are discarded (m)
Returns
-------
(M, 2) ndarray of valid Cartesian points, in scan order.
"""
angles = angle_min + np.arange(len(ranges)) * angle_increment
r = np.asarray(ranges, dtype=np.float64)
# Mask out invalid readings
valid = np.isfinite(r) & (r >= range_min) & (r <= range_max)
r = r[valid]
a = angles[valid]
x = r * np.cos(a)
y = r * np.sin(a)
return np.column_stack((x, y)) # (M, 2)
def cluster_points(
points: np.ndarray,
distance_threshold_m: float = 0.20,
min_points: int = 3,
) -> List[Cluster]:
"""
Cluster a (M, 2) array of scan-order Cartesian points.
Parameters
----------
points : output of scan_to_cartesian()
distance_threshold_m : max distance between consecutive points in the same cluster
min_points : discard clusters with fewer than this many points
Returns
-------
List of Cluster objects, sorted by distance from the origin (nearest first).
"""
if len(points) == 0:
return []
clusters: List[Cluster] = []
current: List[np.ndarray] = [points[0]]
for i in range(1, len(points)):
gap = np.linalg.norm(points[i] - points[i - 1])
if gap <= distance_threshold_m:
current.append(points[i])
else:
_flush(current, clusters, min_points)
current = [points[i]]
_flush(current, clusters, min_points)
# Sort by distance of centroid from origin (nearest first)
clusters.sort(key=lambda c: float(np.linalg.norm(c.centroid)))
return clusters
def _flush(pts: List[np.ndarray], out: List[Cluster], min_pts: int) -> None:
"""Convert accumulated points list into a Cluster and append to out."""
if len(pts) < min_pts:
return
arr = np.stack(pts) # (N, 2)
centroid = arr.mean(axis=0)
bmin = arr.min(axis=0)
bmax = arr.max(axis=0)
span = bmax - bmin
out.append(Cluster(
points = arr,
centroid = centroid,
bbox_min = bmin,
bbox_max = bmax,
width_m = float(span[0]),
depth_m = float(span[1]),
))

View File

@ -0,0 +1,214 @@
"""
lidar_clustering_node.py LIDAR object clustering and RViz visualisation (Issue #239).
Subscribes to the 2-D LIDAR scan, segments nearby points into objects using
Euclidean distance threshold clustering, and publishes a MarkerArray so each
cluster is visible as a labelled bounding-box cube in RViz.
Subscribes:
/scan sensor_msgs/LaserScan (RPLIDAR A1M8)
Publishes:
/saltybot/lidar_clusters visualization_msgs/MarkerArray
Algorithm
---------
1. Convert LaserScan ranges to Cartesian (x, y) points.
2. Walk scan-order points start a new cluster when the gap between
consecutive points exceeds `distance_threshold_m`.
3. Discard clusters below `min_points`.
4. Publish one CUBE marker per cluster (centroid + bbox dims) plus one TEXT
marker showing cluster index and point count.
Parameters
----------
distance_threshold_m float 0.20 Max gap between consecutive scan points (m)
min_points int 3 Min points for a valid cluster
range_min_m float 0.05 Discard ranges below this (m)
range_max_m float 12.0 Discard ranges above this (m)
marker_lifetime_s float 0.15 Marker lifetime auto-expire if scan stalls
marker_z float 0.0 Z position of cube markers in the LIDAR frame
marker_height float 0.3 Fixed height of cube markers (m)
"""
from __future__ import annotations
from typing import List
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from rclpy.duration import Duration
from sensor_msgs.msg import LaserScan
from visualization_msgs.msg import Marker, MarkerArray
from geometry_msgs.msg import Point
from std_msgs.msg import ColorRGBA
from builtin_interfaces.msg import Duration as DurationMsg
from ._lidar_clustering import scan_to_cartesian, cluster_points, Cluster
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=4,
)
# Distinct hue palette — up to 12 clusters get unique colours; wraps for more
_PALETTE: List[tuple] = [
(0.90, 0.20, 0.20), # red
(0.20, 0.70, 0.20), # green
(0.20, 0.50, 0.90), # blue
(0.90, 0.70, 0.10), # yellow
(0.80, 0.30, 0.90), # purple
(0.20, 0.85, 0.85), # cyan
(0.95, 0.50, 0.10), # orange
(0.50, 0.90, 0.30), # lime
(0.90, 0.20, 0.70), # pink
(0.40, 0.20, 0.90), # indigo
(0.10, 0.70, 0.50), # teal
(0.70, 0.50, 0.20), # brown
]
class LidarClusteringNode(Node):
def __init__(self) -> None:
super().__init__('lidar_clustering_node')
self.declare_parameter('distance_threshold_m', 0.20)
self.declare_parameter('min_points', 3)
self.declare_parameter('range_min_m', 0.05)
self.declare_parameter('range_max_m', 12.0)
self.declare_parameter('marker_lifetime_s', 0.15)
self.declare_parameter('marker_z', 0.0)
self.declare_parameter('marker_height', 0.3)
self._dist_thresh = self.get_parameter('distance_threshold_m').value
self._min_pts = self.get_parameter('min_points').value
self._range_min = self.get_parameter('range_min_m').value
self._range_max = self.get_parameter('range_max_m').value
self._lifetime_s = self.get_parameter('marker_lifetime_s').value
self._marker_z = self.get_parameter('marker_z').value
self._marker_h = self.get_parameter('marker_height').value
self._sub = self.create_subscription(
LaserScan, '/scan', self._on_scan, _SENSOR_QOS)
self._pub = self.create_publisher(
MarkerArray, '/saltybot/lidar_clusters', 10)
# Track how many markers were published last cycle so we can DELETE extras
self._prev_count = 0
self.get_logger().info(
f'lidar_clustering_node ready — '
f'dist={self._dist_thresh}m min_pts={self._min_pts}'
)
# ── Callback ──────────────────────────────────────────────────────────────
def _on_scan(self, msg: LaserScan) -> None:
points = scan_to_cartesian(
list(msg.ranges),
msg.angle_min,
msg.angle_increment,
range_min=self._range_min,
range_max=self._range_max,
)
clusters = cluster_points(
points,
distance_threshold_m=self._dist_thresh,
min_points=self._min_pts,
)
markers: List[Marker] = []
lifetime = _duration_msg(self._lifetime_s)
frame_id = msg.header.frame_id or 'laser'
stamp = msg.header.stamp
for idx, cluster in enumerate(clusters):
color = _PALETTE[idx % len(_PALETTE)]
# ── CUBE marker (bounding box) ────────────────────────────────────
cube = Marker()
cube.header.frame_id = frame_id
cube.header.stamp = stamp
cube.ns = 'lidar_clusters'
cube.id = idx * 2
cube.type = Marker.CUBE
cube.action = Marker.ADD
cube.pose.position.x = float(cluster.centroid[0])
cube.pose.position.y = float(cluster.centroid[1])
cube.pose.position.z = self._marker_z
cube.pose.orientation.w = 1.0
# Minimum visible size of 0.1 m per axis
cube.scale.x = max(cluster.width_m, 0.10)
cube.scale.y = max(cluster.depth_m, 0.10)
cube.scale.z = self._marker_h
cube.color = _rgba(*color, a=0.45)
cube.lifetime = lifetime
markers.append(cube)
# ── TEXT marker (id + count) ──────────────────────────────────────
label = Marker()
label.header.frame_id = frame_id
label.header.stamp = stamp
label.ns = 'lidar_clusters'
label.id = idx * 2 + 1
label.type = Marker.TEXT_VIEW_FACING
label.action = Marker.ADD
label.pose.position.x = float(cluster.centroid[0])
label.pose.position.y = float(cluster.centroid[1])
label.pose.position.z = self._marker_z + self._marker_h * 0.5 + 0.05
label.pose.orientation.w = 1.0
label.scale.z = 0.12 # text height (m)
label.color = _rgba(1.0, 1.0, 1.0, a=0.9)
label.text = f'#{idx} n={len(cluster.points)}'
label.lifetime = lifetime
markers.append(label)
# DELETE markers from the previous cycle that no longer have a cluster
new_count = len(clusters)
for idx in range(new_count, self._prev_count):
for offset in (0, 1):
del_m = Marker()
del_m.ns = 'lidar_clusters'
del_m.id = idx * 2 + offset
del_m.action = Marker.DELETE
markers.append(del_m)
self._prev_count = new_count
ma = MarkerArray()
ma.markers = markers
self._pub.publish(ma)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _rgba(r: float, g: float, b: float, a: float) -> ColorRGBA:
c = ColorRGBA()
c.r = r; c.g = g; c.b = b; c.a = a
return c
def _duration_msg(seconds: float) -> DurationMsg:
d = DurationMsg()
d.sec = int(seconds)
d.nanosec = int((seconds - int(seconds)) * 1e9)
return d
def main(args=None) -> None:
rclpy.init(args=args)
node = LidarClusteringNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -31,6 +31,8 @@ setup(
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
'scan_height_filter = saltybot_bringup.scan_height_filter_node:main',
# LIDAR object clustering + RViz visualisation (Issue #239)
'lidar_clustering = saltybot_bringup.lidar_clustering_node:main',
],
},
)

View File

@ -0,0 +1,191 @@
"""
test_lidar_clustering.py Unit tests for LIDAR clustering helpers (no ROS2 required).
Covers:
scan_to_cartesian:
- principal ray (angle=0) maps to (+range, 0)
- 90° ray maps to (0, +range)
- inf / NaN ranges are discarded
- ranges outside [range_min, range_max] are discarded
- all-invalid scan returns empty array
- output shape and dtype
cluster_points:
- two well-separated groups two clusters
- one tight group one cluster
- consecutive points just within threshold stay together
- consecutive points just over threshold split
- fewer than min_points dropped
- empty input returns empty list
- clusters sorted nearest-first
- cluster centroid is mean of points
- cluster bbox correct
"""
import sys
import os
import math
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._lidar_clustering import (
scan_to_cartesian,
cluster_points,
Cluster,
)
# ── scan_to_cartesian ─────────────────────────────────────────────────────────
class TestScanToCartesian:
def test_forward_ray(self):
"""angle=0 → (range, 0)"""
pts = scan_to_cartesian([2.0], angle_min=0.0, angle_increment=0.0)
assert pts.shape == (1, 2)
assert pts[0, 0] == pytest.approx(2.0, abs=1e-6)
assert pts[0, 1] == pytest.approx(0.0, abs=1e-6)
def test_left_ray(self):
"""angle=π/2 → (0, range)"""
pts = scan_to_cartesian([3.0], angle_min=math.pi / 2, angle_increment=0.0)
assert pts[0, 0] == pytest.approx(0.0, abs=1e-6)
assert pts[0, 1] == pytest.approx(3.0, abs=1e-6)
def test_inf_discarded(self):
pts = scan_to_cartesian([float('inf'), 1.0], angle_min=0.0, angle_increment=0.1)
assert pts.shape == (1, 2) # only the valid reading
def test_nan_discarded(self):
pts = scan_to_cartesian([float('nan'), 1.0], angle_min=0.0, angle_increment=0.1)
assert pts.shape == (1, 2)
def test_below_range_min_discarded(self):
pts = scan_to_cartesian([0.01, 1.0], angle_min=0.0,
angle_increment=0.1, range_min=0.05)
assert pts.shape == (1, 2)
def test_above_range_max_discarded(self):
pts = scan_to_cartesian([1.0, 50.0], angle_min=0.0,
angle_increment=0.1, range_max=12.0)
assert pts.shape == (1, 2)
def test_all_invalid_returns_empty(self):
pts = scan_to_cartesian([float('inf')] * 10, angle_min=0.0, angle_increment=0.1)
assert pts.shape[0] == 0
def test_output_dtype_float64(self):
pts = scan_to_cartesian([1.0, 2.0], angle_min=0.0, angle_increment=0.1)
assert pts.dtype == np.float64
def test_output_shape_n_by_2(self):
pts = scan_to_cartesian([1.0, 2.0, 3.0], angle_min=0.0, angle_increment=0.1)
assert pts.shape == (3, 2)
def test_negative_angle(self):
"""-π/2 → (0, -range)"""
pts = scan_to_cartesian([2.5], angle_min=-math.pi / 2, angle_increment=0.0)
assert pts[0, 0] == pytest.approx(0.0, abs=1e-6)
assert pts[0, 1] == pytest.approx(-2.5, abs=1e-6)
# ── cluster_points ────────────────────────────────────────────────────────────
def _pts(*xy_pairs) -> np.ndarray:
"""Convenience: _pts(0,0, 1,0, 2,0) → (3,2) array."""
return np.array(xy_pairs, dtype=np.float64).reshape(-1, 2)
class TestClusterPoints:
def test_empty_input_returns_empty_list(self):
result = cluster_points(np.empty((0, 2)))
assert result == []
def test_two_separated_groups(self):
"""Points at x≈0 and x≈10 should form two clusters."""
group_a = _pts(0.0, 0.0, 0.1, 0.0, 0.2, 0.0) # 3 pts near origin
group_b = _pts(10.0, 0.0, 10.1, 0.0, 10.2, 0.0) # 3 pts far away
pts = np.vstack([group_a, group_b])
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=2)
assert len(clusters) == 2
def test_one_tight_group(self):
pts = _pts(0.0, 0.0, 0.05, 0.0, 0.10, 0.0, 0.15, 0.0)
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=3)
assert len(clusters) == 1
def test_gap_just_within_threshold_stays_together(self):
# gap = exactly threshold → same cluster
pts = _pts(0.0, 0.0, 0.20, 0.0, 0.40, 0.0)
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=2)
assert len(clusters) == 1
def test_gap_just_over_threshold_splits(self):
# gap = threshold + epsilon → different clusters
pts = _pts(0.0, 0.0, 0.21, 0.0, 0.42, 0.0)
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=1)
assert len(clusters) == 3 # every point is its own cluster
def test_fewer_than_min_points_dropped(self):
pts = _pts(0.0, 0.0, 0.05, 0.0) # only 2 pts
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=3)
assert clusters == []
def test_sorted_nearest_first(self):
far_pts = _pts(8.0, 0.0, 8.1, 0.0, 8.2, 0.0)
near_pts = _pts(1.0, 0.0, 1.1, 0.0, 1.2, 0.0)
pts = np.vstack([far_pts, near_pts]) # far group listed first
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=2)
assert len(clusters) == 2
dist0 = np.linalg.norm(clusters[0].centroid)
dist1 = np.linalg.norm(clusters[1].centroid)
assert dist0 < dist1
def test_centroid_is_mean(self):
pts = _pts(0.0, 0.0, 2.0, 0.0) # gap=2 > threshold, but with min_pts=1
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=1)
# two separate clusters; first cluster centroid = (0,0)
c0 = [c for c in clusters if np.linalg.norm(c.centroid) < 0.5][0]
np.testing.assert_allclose(c0.centroid, [0.0, 0.0])
def test_bbox_min_max_correct(self):
pts = _pts(1.0, 2.0, 3.0, 4.0, 2.0, 3.0)
clusters = cluster_points(pts, distance_threshold_m=5.0, min_points=2)
assert len(clusters) == 1
c = clusters[0]
np.testing.assert_allclose(c.bbox_min, [1.0, 2.0])
np.testing.assert_allclose(c.bbox_max, [3.0, 4.0])
def test_width_depth_correct(self):
pts = _pts(0.0, 0.0, 4.0, 0.0, 0.0, 2.0, 4.0, 2.0)
clusters = cluster_points(pts, distance_threshold_m=10.0, min_points=4)
assert len(clusters) == 1
c = clusters[0]
assert c.width_m == pytest.approx(4.0, abs=1e-9)
assert c.depth_m == pytest.approx(2.0, abs=1e-9)
def test_single_point_cluster_with_min_pts_1(self):
pts = _pts(5.0, 5.0)
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=1)
assert len(clusters) == 1
np.testing.assert_allclose(clusters[0].centroid, [5.0, 5.0])
def test_cluster_fields_are_namedtuple(self):
pts = _pts(0.0, 0.0, 0.1, 0.0, 0.2, 0.0)
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=3)
assert len(clusters) == 1
c = clusters[0]
assert hasattr(c, 'points')
assert hasattr(c, 'centroid')
assert hasattr(c, 'bbox_min')
assert hasattr(c, 'bbox_max')
assert hasattr(c, 'width_m')
assert hasattr(c, 'depth_m')
if __name__ == '__main__':
pytest.main([__file__, '-v'])