feat(bringup): LIDAR Euclidean object clustering + RViz visualisation (Issue #239) #244
@ -0,0 +1,131 @@
|
|||||||
|
"""
|
||||||
|
_lidar_clustering.py — Euclidean clustering of 2-D LIDAR scan points (no ROS2 deps).
|
||||||
|
|
||||||
|
Algorithm
|
||||||
|
---------
|
||||||
|
1. Convert LaserScan ranges to Cartesian (x, y) points, dropping inf / NaN.
|
||||||
|
2. Walk through the points in angular order. When the Euclidean distance
|
||||||
|
between consecutive scan points exceeds `distance_threshold_m`, a new
|
||||||
|
cluster begins (gap-based segmentation — O(N), standard for dense 360°
|
||||||
|
LIDAR scans at short range).
|
||||||
|
3. Discard clusters with fewer than `min_points` points.
|
||||||
|
|
||||||
|
Each cluster is returned as a Cluster namedtuple with:
|
||||||
|
points — (N, 2) float64 ndarray of (x, y) in metres
|
||||||
|
centroid — (2,) float64 ndarray, mean of points
|
||||||
|
bbox_min — (2,) min (x, y) corner of axis-aligned bounding box
|
||||||
|
bbox_max — (2,) max (x, y) corner of axis-aligned bounding box
|
||||||
|
width_m — bbox X extent (metres)
|
||||||
|
depth_m — bbox Y extent (metres)
|
||||||
|
|
||||||
|
Coordinate frame: 2-D projection of the LIDAR optical frame (+X forward, +Y left).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from typing import List, NamedTuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
class Cluster(NamedTuple):
|
||||||
|
points: np.ndarray # (N, 2) float64
|
||||||
|
centroid: np.ndarray # (2,) float64
|
||||||
|
bbox_min: np.ndarray # (2,) float64
|
||||||
|
bbox_max: np.ndarray # (2,) float64
|
||||||
|
width_m: float
|
||||||
|
depth_m: float
|
||||||
|
|
||||||
|
|
||||||
|
def scan_to_cartesian(
|
||||||
|
ranges: List[float],
|
||||||
|
angle_min: float,
|
||||||
|
angle_increment: float,
|
||||||
|
range_min: float = 0.05,
|
||||||
|
range_max: float = 12.0,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""
|
||||||
|
Convert LaserScan ranges to a (M, 2) float64 array of valid (x, y) points.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
ranges : raw range values from LaserScan.ranges
|
||||||
|
angle_min : start angle of the scan (radians)
|
||||||
|
angle_increment : angular step between consecutive readings (radians)
|
||||||
|
range_min : readings below this are discarded (m)
|
||||||
|
range_max : readings above this are discarded (m)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(M, 2) ndarray of valid Cartesian points, in scan order.
|
||||||
|
"""
|
||||||
|
angles = angle_min + np.arange(len(ranges)) * angle_increment
|
||||||
|
r = np.asarray(ranges, dtype=np.float64)
|
||||||
|
|
||||||
|
# Mask out invalid readings
|
||||||
|
valid = np.isfinite(r) & (r >= range_min) & (r <= range_max)
|
||||||
|
r = r[valid]
|
||||||
|
a = angles[valid]
|
||||||
|
|
||||||
|
x = r * np.cos(a)
|
||||||
|
y = r * np.sin(a)
|
||||||
|
return np.column_stack((x, y)) # (M, 2)
|
||||||
|
|
||||||
|
|
||||||
|
def cluster_points(
|
||||||
|
points: np.ndarray,
|
||||||
|
distance_threshold_m: float = 0.20,
|
||||||
|
min_points: int = 3,
|
||||||
|
) -> List[Cluster]:
|
||||||
|
"""
|
||||||
|
Cluster a (M, 2) array of scan-order Cartesian points.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
points : output of scan_to_cartesian()
|
||||||
|
distance_threshold_m : max distance between consecutive points in the same cluster
|
||||||
|
min_points : discard clusters with fewer than this many points
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
List of Cluster objects, sorted by distance from the origin (nearest first).
|
||||||
|
"""
|
||||||
|
if len(points) == 0:
|
||||||
|
return []
|
||||||
|
|
||||||
|
clusters: List[Cluster] = []
|
||||||
|
current: List[np.ndarray] = [points[0]]
|
||||||
|
|
||||||
|
for i in range(1, len(points)):
|
||||||
|
gap = np.linalg.norm(points[i] - points[i - 1])
|
||||||
|
if gap <= distance_threshold_m:
|
||||||
|
current.append(points[i])
|
||||||
|
else:
|
||||||
|
_flush(current, clusters, min_points)
|
||||||
|
current = [points[i]]
|
||||||
|
|
||||||
|
_flush(current, clusters, min_points)
|
||||||
|
|
||||||
|
# Sort by distance of centroid from origin (nearest first)
|
||||||
|
clusters.sort(key=lambda c: float(np.linalg.norm(c.centroid)))
|
||||||
|
return clusters
|
||||||
|
|
||||||
|
|
||||||
|
def _flush(pts: List[np.ndarray], out: List[Cluster], min_pts: int) -> None:
|
||||||
|
"""Convert accumulated points list into a Cluster and append to out."""
|
||||||
|
if len(pts) < min_pts:
|
||||||
|
return
|
||||||
|
arr = np.stack(pts) # (N, 2)
|
||||||
|
centroid = arr.mean(axis=0)
|
||||||
|
bmin = arr.min(axis=0)
|
||||||
|
bmax = arr.max(axis=0)
|
||||||
|
span = bmax - bmin
|
||||||
|
out.append(Cluster(
|
||||||
|
points = arr,
|
||||||
|
centroid = centroid,
|
||||||
|
bbox_min = bmin,
|
||||||
|
bbox_max = bmax,
|
||||||
|
width_m = float(span[0]),
|
||||||
|
depth_m = float(span[1]),
|
||||||
|
))
|
||||||
@ -0,0 +1,214 @@
|
|||||||
|
"""
|
||||||
|
lidar_clustering_node.py — LIDAR object clustering and RViz visualisation (Issue #239).
|
||||||
|
|
||||||
|
Subscribes to the 2-D LIDAR scan, segments nearby points into objects using
|
||||||
|
Euclidean distance threshold clustering, and publishes a MarkerArray so each
|
||||||
|
cluster is visible as a labelled bounding-box cube in RViz.
|
||||||
|
|
||||||
|
Subscribes:
|
||||||
|
/scan sensor_msgs/LaserScan (RPLIDAR A1M8)
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/saltybot/lidar_clusters visualization_msgs/MarkerArray
|
||||||
|
|
||||||
|
Algorithm
|
||||||
|
---------
|
||||||
|
1. Convert LaserScan ranges to Cartesian (x, y) points.
|
||||||
|
2. Walk scan-order points — start a new cluster when the gap between
|
||||||
|
consecutive points exceeds `distance_threshold_m`.
|
||||||
|
3. Discard clusters below `min_points`.
|
||||||
|
4. Publish one CUBE marker per cluster (centroid + bbox dims) plus one TEXT
|
||||||
|
marker showing cluster index and point count.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
distance_threshold_m float 0.20 Max gap between consecutive scan points (m)
|
||||||
|
min_points int 3 Min points for a valid cluster
|
||||||
|
range_min_m float 0.05 Discard ranges below this (m)
|
||||||
|
range_max_m float 12.0 Discard ranges above this (m)
|
||||||
|
marker_lifetime_s float 0.15 Marker lifetime — auto-expire if scan stalls
|
||||||
|
marker_z float 0.0 Z position of cube markers in the LIDAR frame
|
||||||
|
marker_height float 0.3 Fixed height of cube markers (m)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
from rclpy.duration import Duration
|
||||||
|
|
||||||
|
from sensor_msgs.msg import LaserScan
|
||||||
|
from visualization_msgs.msg import Marker, MarkerArray
|
||||||
|
from geometry_msgs.msg import Point
|
||||||
|
from std_msgs.msg import ColorRGBA
|
||||||
|
from builtin_interfaces.msg import Duration as DurationMsg
|
||||||
|
|
||||||
|
from ._lidar_clustering import scan_to_cartesian, cluster_points, Cluster
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=4,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Distinct hue palette — up to 12 clusters get unique colours; wraps for more
|
||||||
|
_PALETTE: List[tuple] = [
|
||||||
|
(0.90, 0.20, 0.20), # red
|
||||||
|
(0.20, 0.70, 0.20), # green
|
||||||
|
(0.20, 0.50, 0.90), # blue
|
||||||
|
(0.90, 0.70, 0.10), # yellow
|
||||||
|
(0.80, 0.30, 0.90), # purple
|
||||||
|
(0.20, 0.85, 0.85), # cyan
|
||||||
|
(0.95, 0.50, 0.10), # orange
|
||||||
|
(0.50, 0.90, 0.30), # lime
|
||||||
|
(0.90, 0.20, 0.70), # pink
|
||||||
|
(0.40, 0.20, 0.90), # indigo
|
||||||
|
(0.10, 0.70, 0.50), # teal
|
||||||
|
(0.70, 0.50, 0.20), # brown
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class LidarClusteringNode(Node):
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__('lidar_clustering_node')
|
||||||
|
|
||||||
|
self.declare_parameter('distance_threshold_m', 0.20)
|
||||||
|
self.declare_parameter('min_points', 3)
|
||||||
|
self.declare_parameter('range_min_m', 0.05)
|
||||||
|
self.declare_parameter('range_max_m', 12.0)
|
||||||
|
self.declare_parameter('marker_lifetime_s', 0.15)
|
||||||
|
self.declare_parameter('marker_z', 0.0)
|
||||||
|
self.declare_parameter('marker_height', 0.3)
|
||||||
|
|
||||||
|
self._dist_thresh = self.get_parameter('distance_threshold_m').value
|
||||||
|
self._min_pts = self.get_parameter('min_points').value
|
||||||
|
self._range_min = self.get_parameter('range_min_m').value
|
||||||
|
self._range_max = self.get_parameter('range_max_m').value
|
||||||
|
self._lifetime_s = self.get_parameter('marker_lifetime_s').value
|
||||||
|
self._marker_z = self.get_parameter('marker_z').value
|
||||||
|
self._marker_h = self.get_parameter('marker_height').value
|
||||||
|
|
||||||
|
self._sub = self.create_subscription(
|
||||||
|
LaserScan, '/scan', self._on_scan, _SENSOR_QOS)
|
||||||
|
self._pub = self.create_publisher(
|
||||||
|
MarkerArray, '/saltybot/lidar_clusters', 10)
|
||||||
|
|
||||||
|
# Track how many markers were published last cycle so we can DELETE extras
|
||||||
|
self._prev_count = 0
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'lidar_clustering_node ready — '
|
||||||
|
f'dist={self._dist_thresh}m min_pts={self._min_pts}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callback ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_scan(self, msg: LaserScan) -> None:
|
||||||
|
points = scan_to_cartesian(
|
||||||
|
list(msg.ranges),
|
||||||
|
msg.angle_min,
|
||||||
|
msg.angle_increment,
|
||||||
|
range_min=self._range_min,
|
||||||
|
range_max=self._range_max,
|
||||||
|
)
|
||||||
|
clusters = cluster_points(
|
||||||
|
points,
|
||||||
|
distance_threshold_m=self._dist_thresh,
|
||||||
|
min_points=self._min_pts,
|
||||||
|
)
|
||||||
|
|
||||||
|
markers: List[Marker] = []
|
||||||
|
lifetime = _duration_msg(self._lifetime_s)
|
||||||
|
frame_id = msg.header.frame_id or 'laser'
|
||||||
|
stamp = msg.header.stamp
|
||||||
|
|
||||||
|
for idx, cluster in enumerate(clusters):
|
||||||
|
color = _PALETTE[idx % len(_PALETTE)]
|
||||||
|
|
||||||
|
# ── CUBE marker (bounding box) ────────────────────────────────────
|
||||||
|
cube = Marker()
|
||||||
|
cube.header.frame_id = frame_id
|
||||||
|
cube.header.stamp = stamp
|
||||||
|
cube.ns = 'lidar_clusters'
|
||||||
|
cube.id = idx * 2
|
||||||
|
cube.type = Marker.CUBE
|
||||||
|
cube.action = Marker.ADD
|
||||||
|
cube.pose.position.x = float(cluster.centroid[0])
|
||||||
|
cube.pose.position.y = float(cluster.centroid[1])
|
||||||
|
cube.pose.position.z = self._marker_z
|
||||||
|
cube.pose.orientation.w = 1.0
|
||||||
|
# Minimum visible size of 0.1 m per axis
|
||||||
|
cube.scale.x = max(cluster.width_m, 0.10)
|
||||||
|
cube.scale.y = max(cluster.depth_m, 0.10)
|
||||||
|
cube.scale.z = self._marker_h
|
||||||
|
cube.color = _rgba(*color, a=0.45)
|
||||||
|
cube.lifetime = lifetime
|
||||||
|
markers.append(cube)
|
||||||
|
|
||||||
|
# ── TEXT marker (id + count) ──────────────────────────────────────
|
||||||
|
label = Marker()
|
||||||
|
label.header.frame_id = frame_id
|
||||||
|
label.header.stamp = stamp
|
||||||
|
label.ns = 'lidar_clusters'
|
||||||
|
label.id = idx * 2 + 1
|
||||||
|
label.type = Marker.TEXT_VIEW_FACING
|
||||||
|
label.action = Marker.ADD
|
||||||
|
label.pose.position.x = float(cluster.centroid[0])
|
||||||
|
label.pose.position.y = float(cluster.centroid[1])
|
||||||
|
label.pose.position.z = self._marker_z + self._marker_h * 0.5 + 0.05
|
||||||
|
label.pose.orientation.w = 1.0
|
||||||
|
label.scale.z = 0.12 # text height (m)
|
||||||
|
label.color = _rgba(1.0, 1.0, 1.0, a=0.9)
|
||||||
|
label.text = f'#{idx} n={len(cluster.points)}'
|
||||||
|
label.lifetime = lifetime
|
||||||
|
markers.append(label)
|
||||||
|
|
||||||
|
# DELETE markers from the previous cycle that no longer have a cluster
|
||||||
|
new_count = len(clusters)
|
||||||
|
for idx in range(new_count, self._prev_count):
|
||||||
|
for offset in (0, 1):
|
||||||
|
del_m = Marker()
|
||||||
|
del_m.ns = 'lidar_clusters'
|
||||||
|
del_m.id = idx * 2 + offset
|
||||||
|
del_m.action = Marker.DELETE
|
||||||
|
markers.append(del_m)
|
||||||
|
|
||||||
|
self._prev_count = new_count
|
||||||
|
|
||||||
|
ma = MarkerArray()
|
||||||
|
ma.markers = markers
|
||||||
|
self._pub.publish(ma)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _rgba(r: float, g: float, b: float, a: float) -> ColorRGBA:
|
||||||
|
c = ColorRGBA()
|
||||||
|
c.r = r; c.g = g; c.b = b; c.a = a
|
||||||
|
return c
|
||||||
|
|
||||||
|
|
||||||
|
def _duration_msg(seconds: float) -> DurationMsg:
|
||||||
|
d = DurationMsg()
|
||||||
|
d.sec = int(seconds)
|
||||||
|
d.nanosec = int((seconds - int(seconds)) * 1e9)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = LidarClusteringNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -31,6 +31,8 @@ setup(
|
|||||||
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
|
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
|
||||||
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
|
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
|
||||||
'scan_height_filter = saltybot_bringup.scan_height_filter_node:main',
|
'scan_height_filter = saltybot_bringup.scan_height_filter_node:main',
|
||||||
|
# LIDAR object clustering + RViz visualisation (Issue #239)
|
||||||
|
'lidar_clustering = saltybot_bringup.lidar_clustering_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -0,0 +1,191 @@
|
|||||||
|
"""
|
||||||
|
test_lidar_clustering.py — Unit tests for LIDAR clustering helpers (no ROS2 required).
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
scan_to_cartesian:
|
||||||
|
- principal ray (angle=0) maps to (+range, 0)
|
||||||
|
- 90° ray maps to (0, +range)
|
||||||
|
- inf / NaN ranges are discarded
|
||||||
|
- ranges outside [range_min, range_max] are discarded
|
||||||
|
- all-invalid scan returns empty array
|
||||||
|
- output shape and dtype
|
||||||
|
|
||||||
|
cluster_points:
|
||||||
|
- two well-separated groups → two clusters
|
||||||
|
- one tight group → one cluster
|
||||||
|
- consecutive points just within threshold stay together
|
||||||
|
- consecutive points just over threshold split
|
||||||
|
- fewer than min_points dropped
|
||||||
|
- empty input returns empty list
|
||||||
|
- clusters sorted nearest-first
|
||||||
|
- cluster centroid is mean of points
|
||||||
|
- cluster bbox correct
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import math
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
|
from saltybot_bringup._lidar_clustering import (
|
||||||
|
scan_to_cartesian,
|
||||||
|
cluster_points,
|
||||||
|
Cluster,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── scan_to_cartesian ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestScanToCartesian:
|
||||||
|
|
||||||
|
def test_forward_ray(self):
|
||||||
|
"""angle=0 → (range, 0)"""
|
||||||
|
pts = scan_to_cartesian([2.0], angle_min=0.0, angle_increment=0.0)
|
||||||
|
assert pts.shape == (1, 2)
|
||||||
|
assert pts[0, 0] == pytest.approx(2.0, abs=1e-6)
|
||||||
|
assert pts[0, 1] == pytest.approx(0.0, abs=1e-6)
|
||||||
|
|
||||||
|
def test_left_ray(self):
|
||||||
|
"""angle=π/2 → (0, range)"""
|
||||||
|
pts = scan_to_cartesian([3.0], angle_min=math.pi / 2, angle_increment=0.0)
|
||||||
|
assert pts[0, 0] == pytest.approx(0.0, abs=1e-6)
|
||||||
|
assert pts[0, 1] == pytest.approx(3.0, abs=1e-6)
|
||||||
|
|
||||||
|
def test_inf_discarded(self):
|
||||||
|
pts = scan_to_cartesian([float('inf'), 1.0], angle_min=0.0, angle_increment=0.1)
|
||||||
|
assert pts.shape == (1, 2) # only the valid reading
|
||||||
|
|
||||||
|
def test_nan_discarded(self):
|
||||||
|
pts = scan_to_cartesian([float('nan'), 1.0], angle_min=0.0, angle_increment=0.1)
|
||||||
|
assert pts.shape == (1, 2)
|
||||||
|
|
||||||
|
def test_below_range_min_discarded(self):
|
||||||
|
pts = scan_to_cartesian([0.01, 1.0], angle_min=0.0,
|
||||||
|
angle_increment=0.1, range_min=0.05)
|
||||||
|
assert pts.shape == (1, 2)
|
||||||
|
|
||||||
|
def test_above_range_max_discarded(self):
|
||||||
|
pts = scan_to_cartesian([1.0, 50.0], angle_min=0.0,
|
||||||
|
angle_increment=0.1, range_max=12.0)
|
||||||
|
assert pts.shape == (1, 2)
|
||||||
|
|
||||||
|
def test_all_invalid_returns_empty(self):
|
||||||
|
pts = scan_to_cartesian([float('inf')] * 10, angle_min=0.0, angle_increment=0.1)
|
||||||
|
assert pts.shape[0] == 0
|
||||||
|
|
||||||
|
def test_output_dtype_float64(self):
|
||||||
|
pts = scan_to_cartesian([1.0, 2.0], angle_min=0.0, angle_increment=0.1)
|
||||||
|
assert pts.dtype == np.float64
|
||||||
|
|
||||||
|
def test_output_shape_n_by_2(self):
|
||||||
|
pts = scan_to_cartesian([1.0, 2.0, 3.0], angle_min=0.0, angle_increment=0.1)
|
||||||
|
assert pts.shape == (3, 2)
|
||||||
|
|
||||||
|
def test_negative_angle(self):
|
||||||
|
"""-π/2 → (0, -range)"""
|
||||||
|
pts = scan_to_cartesian([2.5], angle_min=-math.pi / 2, angle_increment=0.0)
|
||||||
|
assert pts[0, 0] == pytest.approx(0.0, abs=1e-6)
|
||||||
|
assert pts[0, 1] == pytest.approx(-2.5, abs=1e-6)
|
||||||
|
|
||||||
|
|
||||||
|
# ── cluster_points ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _pts(*xy_pairs) -> np.ndarray:
|
||||||
|
"""Convenience: _pts(0,0, 1,0, 2,0) → (3,2) array."""
|
||||||
|
return np.array(xy_pairs, dtype=np.float64).reshape(-1, 2)
|
||||||
|
|
||||||
|
|
||||||
|
class TestClusterPoints:
|
||||||
|
|
||||||
|
def test_empty_input_returns_empty_list(self):
|
||||||
|
result = cluster_points(np.empty((0, 2)))
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
def test_two_separated_groups(self):
|
||||||
|
"""Points at x≈0 and x≈10 should form two clusters."""
|
||||||
|
group_a = _pts(0.0, 0.0, 0.1, 0.0, 0.2, 0.0) # 3 pts near origin
|
||||||
|
group_b = _pts(10.0, 0.0, 10.1, 0.0, 10.2, 0.0) # 3 pts far away
|
||||||
|
pts = np.vstack([group_a, group_b])
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=2)
|
||||||
|
assert len(clusters) == 2
|
||||||
|
|
||||||
|
def test_one_tight_group(self):
|
||||||
|
pts = _pts(0.0, 0.0, 0.05, 0.0, 0.10, 0.0, 0.15, 0.0)
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=3)
|
||||||
|
assert len(clusters) == 1
|
||||||
|
|
||||||
|
def test_gap_just_within_threshold_stays_together(self):
|
||||||
|
# gap = exactly threshold → same cluster
|
||||||
|
pts = _pts(0.0, 0.0, 0.20, 0.0, 0.40, 0.0)
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=2)
|
||||||
|
assert len(clusters) == 1
|
||||||
|
|
||||||
|
def test_gap_just_over_threshold_splits(self):
|
||||||
|
# gap = threshold + epsilon → different clusters
|
||||||
|
pts = _pts(0.0, 0.0, 0.21, 0.0, 0.42, 0.0)
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=1)
|
||||||
|
assert len(clusters) == 3 # every point is its own cluster
|
||||||
|
|
||||||
|
def test_fewer_than_min_points_dropped(self):
|
||||||
|
pts = _pts(0.0, 0.0, 0.05, 0.0) # only 2 pts
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.20, min_points=3)
|
||||||
|
assert clusters == []
|
||||||
|
|
||||||
|
def test_sorted_nearest_first(self):
|
||||||
|
far_pts = _pts(8.0, 0.0, 8.1, 0.0, 8.2, 0.0)
|
||||||
|
near_pts = _pts(1.0, 0.0, 1.1, 0.0, 1.2, 0.0)
|
||||||
|
pts = np.vstack([far_pts, near_pts]) # far group listed first
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=2)
|
||||||
|
assert len(clusters) == 2
|
||||||
|
dist0 = np.linalg.norm(clusters[0].centroid)
|
||||||
|
dist1 = np.linalg.norm(clusters[1].centroid)
|
||||||
|
assert dist0 < dist1
|
||||||
|
|
||||||
|
def test_centroid_is_mean(self):
|
||||||
|
pts = _pts(0.0, 0.0, 2.0, 0.0) # gap=2 > threshold, but with min_pts=1
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=1)
|
||||||
|
# two separate clusters; first cluster centroid = (0,0)
|
||||||
|
c0 = [c for c in clusters if np.linalg.norm(c.centroid) < 0.5][0]
|
||||||
|
np.testing.assert_allclose(c0.centroid, [0.0, 0.0])
|
||||||
|
|
||||||
|
def test_bbox_min_max_correct(self):
|
||||||
|
pts = _pts(1.0, 2.0, 3.0, 4.0, 2.0, 3.0)
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=5.0, min_points=2)
|
||||||
|
assert len(clusters) == 1
|
||||||
|
c = clusters[0]
|
||||||
|
np.testing.assert_allclose(c.bbox_min, [1.0, 2.0])
|
||||||
|
np.testing.assert_allclose(c.bbox_max, [3.0, 4.0])
|
||||||
|
|
||||||
|
def test_width_depth_correct(self):
|
||||||
|
pts = _pts(0.0, 0.0, 4.0, 0.0, 0.0, 2.0, 4.0, 2.0)
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=10.0, min_points=4)
|
||||||
|
assert len(clusters) == 1
|
||||||
|
c = clusters[0]
|
||||||
|
assert c.width_m == pytest.approx(4.0, abs=1e-9)
|
||||||
|
assert c.depth_m == pytest.approx(2.0, abs=1e-9)
|
||||||
|
|
||||||
|
def test_single_point_cluster_with_min_pts_1(self):
|
||||||
|
pts = _pts(5.0, 5.0)
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=1)
|
||||||
|
assert len(clusters) == 1
|
||||||
|
np.testing.assert_allclose(clusters[0].centroid, [5.0, 5.0])
|
||||||
|
|
||||||
|
def test_cluster_fields_are_namedtuple(self):
|
||||||
|
pts = _pts(0.0, 0.0, 0.1, 0.0, 0.2, 0.0)
|
||||||
|
clusters = cluster_points(pts, distance_threshold_m=0.5, min_points=3)
|
||||||
|
assert len(clusters) == 1
|
||||||
|
c = clusters[0]
|
||||||
|
assert hasattr(c, 'points')
|
||||||
|
assert hasattr(c, 'centroid')
|
||||||
|
assert hasattr(c, 'bbox_min')
|
||||||
|
assert hasattr(c, 'bbox_max')
|
||||||
|
assert hasattr(c, 'width_m')
|
||||||
|
assert hasattr(c, 'depth_m')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
Loading…
x
Reference in New Issue
Block a user