feat(perception): depth-based obstacle size estimator (Issue #348) #351
@ -0,0 +1,361 @@
|
|||||||
|
"""
|
||||||
|
_obstacle_size.py — Depth-based obstacle size estimation by projecting LIDAR
|
||||||
|
clusters into the D435i depth image (no ROS2 deps).
|
||||||
|
|
||||||
|
Algorithm
|
||||||
|
---------
|
||||||
|
1. Transform each LIDAR cluster centroid from LIDAR frame to camera frame
|
||||||
|
using a configurable translation-only extrinsic (typical co-mounted setup).
|
||||||
|
2. Project to depth-image pixel coordinates via the pinhole camera model.
|
||||||
|
3. Sample the D435i depth image in a window around the projected pixel to
|
||||||
|
obtain a robust Z estimate (uint16 mm → metres).
|
||||||
|
4. Derive horizontal width (metres) directly from the LIDAR cluster bounding
|
||||||
|
box, which is more reliable than image-based re-measurement.
|
||||||
|
5. Derive vertical height (metres) by scanning a vertical strip in the depth
|
||||||
|
image: collect pixels whose depth is within `z_tol` of the sampled Z, then
|
||||||
|
compute the row extent and back-project using the camera fy.
|
||||||
|
6. Return an ObstacleSizeEstimate per cluster.
|
||||||
|
|
||||||
|
Coordinate frames
|
||||||
|
-----------------
|
||||||
|
LIDAR frame (+X forward, +Y left, +Z up — 2-D scan at z=0):
|
||||||
|
x_lidar = forward distance (metres)
|
||||||
|
y_lidar = lateral distance (metres, positive = left)
|
||||||
|
z_lidar = 0 (2-D LIDAR always lies in the horizontal plane)
|
||||||
|
|
||||||
|
Camera frame (+X right, +Y down, +Z forward — standard pinhole):
|
||||||
|
x_cam = −y_lidar + ex
|
||||||
|
y_cam = −z_lidar + ey = ey (z_lidar=0 for 2-D LIDAR)
|
||||||
|
z_cam = x_lidar + ez
|
||||||
|
|
||||||
|
Extrinsic parameters (CameraParams.ex/ey/ez) give the LIDAR origin position
|
||||||
|
in the camera coordinate frame:
|
||||||
|
ex — lateral offset (positive = LIDAR origin is to the right of camera)
|
||||||
|
ey — vertical offset (positive = LIDAR origin is below camera)
|
||||||
|
ez — forward offset (positive = LIDAR origin is in front of camera)
|
||||||
|
|
||||||
|
Public API
|
||||||
|
----------
|
||||||
|
CameraParams dataclass — camera intrinsics + co-mount extrinsics
|
||||||
|
ObstacleSizeEstimate NamedTuple — per-cluster result
|
||||||
|
|
||||||
|
lidar_to_camera(x_l, y_l, params) → (x_cam, y_cam, z_cam)
|
||||||
|
project_to_pixel(x_cam, y_cam, z_cam, p) → (u, v) or None if out-of-bounds
|
||||||
|
sample_depth_median(depth_u16, u, v, win, scale) → (depth_m, n_valid)
|
||||||
|
estimate_height(depth_u16, u_c, v_c, z_ref, params,
|
||||||
|
search_rows, col_hw, z_tol) → height_m
|
||||||
|
estimate_cluster_size(cluster, depth_u16, params,
|
||||||
|
depth_window, search_rows, col_hw, z_tol,
|
||||||
|
cluster_id) → ObstacleSizeEstimate
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import NamedTuple, Optional, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
# ── Camera parameters ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CameraParams:
|
||||||
|
"""Pinhole intrinsics and LIDAR→camera extrinsics for the D435i.
|
||||||
|
|
||||||
|
Intrinsics (D435i 640×480 depth stream defaults — override from CameraInfo):
|
||||||
|
fx, fy : focal lengths in pixels
|
||||||
|
cx, cy : principal point in pixels
|
||||||
|
width, height : image dimensions in pixels
|
||||||
|
depth_scale : multiply raw uint16 value by this to get metres
|
||||||
|
D435i default = 0.001 (raw value is millimetres)
|
||||||
|
|
||||||
|
Extrinsics (LIDAR origin position in camera frame):
|
||||||
|
ex, ey, ez : translation (metres) — see module docstring
|
||||||
|
"""
|
||||||
|
fx: float = 383.0 # D435i 640×480 depth approx
|
||||||
|
fy: float = 383.0
|
||||||
|
cx: float = 320.0
|
||||||
|
cy: float = 240.0
|
||||||
|
width: int = 640
|
||||||
|
height: int = 480
|
||||||
|
depth_scale: float = 0.001 # mm → m
|
||||||
|
|
||||||
|
# LIDAR→camera extrinsics: LIDAR origin position in camera frame
|
||||||
|
ex: float = 0.0 # lateral (m); 0 = LIDAR directly in front of camera
|
||||||
|
ey: float = 0.05 # vertical (m); +0.05 = LIDAR 5 cm below camera
|
||||||
|
ez: float = 0.0 # forward (m); 0 = LIDAR at same depth as camera
|
||||||
|
|
||||||
|
|
||||||
|
# ── Result type ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class ObstacleSizeEstimate(NamedTuple):
|
||||||
|
"""Size estimate for one LIDAR cluster projected into the depth image."""
|
||||||
|
obstacle_id: int # user-supplied cluster index (0-based) or track ID
|
||||||
|
centroid_x: float # LIDAR-frame forward distance (metres)
|
||||||
|
centroid_y: float # LIDAR-frame lateral distance (metres, +left)
|
||||||
|
depth_z: float # D435i sampled depth at projected centroid (metres)
|
||||||
|
width_m: float # horizontal size, LIDAR-derived (metres)
|
||||||
|
height_m: float # vertical size, depth-image-derived (metres)
|
||||||
|
pixel_u: int # projected centroid column (-1 = out of image)
|
||||||
|
pixel_v: int # projected centroid row (-1 = out of image)
|
||||||
|
lidar_range: float # range from LIDAR origin to centroid (metres)
|
||||||
|
confidence: float # 0.0–1.0; based on depth sample quality
|
||||||
|
|
||||||
|
|
||||||
|
# ── Coordinate transform ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def lidar_to_camera(
|
||||||
|
x_lidar: float,
|
||||||
|
y_lidar: float,
|
||||||
|
params: CameraParams,
|
||||||
|
) -> Tuple[float, float, float]:
|
||||||
|
"""
|
||||||
|
Transform a 2-D LIDAR point to the camera coordinate frame.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
x_lidar : forward distance in LIDAR frame (metres)
|
||||||
|
y_lidar : lateral distance in LIDAR frame (metres, +left)
|
||||||
|
params : CameraParams with extrinsic translation ex/ey/ez
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(x_cam, y_cam, z_cam) in camera frame (metres)
|
||||||
|
x_cam = right, y_cam = down, z_cam = forward
|
||||||
|
"""
|
||||||
|
x_cam = -y_lidar + params.ex
|
||||||
|
y_cam = params.ey # LIDAR z=0 for 2-D scan
|
||||||
|
z_cam = x_lidar + params.ez
|
||||||
|
return float(x_cam), float(y_cam), float(z_cam)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pinhole projection ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def project_to_pixel(
|
||||||
|
x_cam: float,
|
||||||
|
y_cam: float,
|
||||||
|
z_cam: float,
|
||||||
|
params: CameraParams,
|
||||||
|
) -> Optional[Tuple[int, int]]:
|
||||||
|
"""
|
||||||
|
Project a 3-D camera-frame point to image pixel coordinates.
|
||||||
|
|
||||||
|
Returns None if z_cam ≤ 0 or the projection falls outside the image.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(u, v) integer pixel coordinates (column, row), or None.
|
||||||
|
"""
|
||||||
|
if z_cam <= 0.0:
|
||||||
|
return None
|
||||||
|
u = params.fx * x_cam / z_cam + params.cx
|
||||||
|
v = params.fy * y_cam / z_cam + params.cy
|
||||||
|
ui, vi = int(round(u)), int(round(v))
|
||||||
|
if 0 <= ui < params.width and 0 <= vi < params.height:
|
||||||
|
return ui, vi
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Depth sampling ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def sample_depth_median(
|
||||||
|
depth_u16: np.ndarray,
|
||||||
|
u: int,
|
||||||
|
v: int,
|
||||||
|
window_px: int = 5,
|
||||||
|
depth_scale: float = 0.001,
|
||||||
|
) -> Tuple[float, int]:
|
||||||
|
"""
|
||||||
|
Median depth in a square window centred on (u, v) in a uint16 depth image.
|
||||||
|
|
||||||
|
Zeros (invalid readings) are excluded from the median.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
depth_u16 : (H, W) uint16 depth image (raw units, e.g. mm for D435i)
|
||||||
|
u, v : centre pixel (column, row)
|
||||||
|
window_px : half-side of the sampling square (window of 2*window_px+1)
|
||||||
|
depth_scale : scale factor to convert raw units to metres (0.001 for mm)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(depth_m, n_valid) — median depth in metres and count of valid pixels.
|
||||||
|
depth_m = 0.0 when n_valid == 0.
|
||||||
|
"""
|
||||||
|
h, w = depth_u16.shape
|
||||||
|
r0 = max(0, v - window_px)
|
||||||
|
r1 = min(h, v + window_px + 1)
|
||||||
|
c0 = max(0, u - window_px)
|
||||||
|
c1 = min(w, u + window_px + 1)
|
||||||
|
|
||||||
|
patch = depth_u16[r0:r1, c0:c1].astype(np.float64)
|
||||||
|
valid = patch[patch > 0]
|
||||||
|
if len(valid) == 0:
|
||||||
|
return 0.0, 0
|
||||||
|
return float(np.median(valid) * depth_scale), int(len(valid))
|
||||||
|
|
||||||
|
|
||||||
|
# ── Height estimation ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def estimate_height(
|
||||||
|
depth_u16: np.ndarray,
|
||||||
|
u_c: int,
|
||||||
|
v_c: int,
|
||||||
|
z_ref: float,
|
||||||
|
params: CameraParams,
|
||||||
|
search_rows: int = 120,
|
||||||
|
col_hw: int = 10,
|
||||||
|
z_tol: float = 0.30,
|
||||||
|
) -> float:
|
||||||
|
"""
|
||||||
|
Estimate the vertical height (metres) of an obstacle in the depth image.
|
||||||
|
|
||||||
|
Scans a vertical strip centred on (u_c, v_c) and finds the row extent of
|
||||||
|
pixels whose depth is within `z_tol` metres of `z_ref`.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
depth_u16 : (H, W) uint16 depth image
|
||||||
|
u_c, v_c : projected centroid pixel
|
||||||
|
z_ref : reference depth (metres) — from sample_depth_median
|
||||||
|
params : CameraParams (needs fy, depth_scale)
|
||||||
|
search_rows : half-height of the search strip (rows above/below v_c)
|
||||||
|
col_hw : half-width of the search strip (columns left/right of u_c)
|
||||||
|
z_tol : depth tolerance — pixels within z_ref ± z_tol are included
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
height_m — vertical extent in metres; 0.0 if fewer than 2 valid rows found.
|
||||||
|
"""
|
||||||
|
if z_ref <= 0.0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
h, w = depth_u16.shape
|
||||||
|
r0 = max(0, v_c - search_rows)
|
||||||
|
r1 = min(h, v_c + search_rows + 1)
|
||||||
|
c0 = max(0, u_c - col_hw)
|
||||||
|
c1 = min(w, u_c + col_hw + 1)
|
||||||
|
|
||||||
|
strip = depth_u16[r0:r1, c0:c1].astype(np.float64) * params.depth_scale
|
||||||
|
|
||||||
|
# For each row in the strip, check if any valid pixel is within z_tol of z_ref
|
||||||
|
z_min = z_ref - z_tol
|
||||||
|
z_max = z_ref + z_tol
|
||||||
|
valid_mask = (strip > 0) & (strip >= z_min) & (strip <= z_max)
|
||||||
|
valid_rows = np.any(valid_mask, axis=1) # (n_rows,) bool
|
||||||
|
|
||||||
|
if valid_rows.sum() < 2:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
row_indices = np.where(valid_rows)[0] # rows within the strip
|
||||||
|
v_top_strip = int(row_indices[0])
|
||||||
|
v_bottom_strip = int(row_indices[-1])
|
||||||
|
|
||||||
|
# Convert to absolute image rows
|
||||||
|
v_top = r0 + v_top_strip
|
||||||
|
v_bottom = r0 + v_bottom_strip
|
||||||
|
|
||||||
|
row_extent = v_bottom - v_top
|
||||||
|
if row_extent <= 0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Back-project row extent to metres at depth z_ref
|
||||||
|
height_m = float(row_extent) * z_ref / params.fy
|
||||||
|
return float(height_m)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Main entry point ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def estimate_cluster_size(
|
||||||
|
cluster, # _lidar_clustering.Cluster NamedTuple
|
||||||
|
depth_u16: np.ndarray, # (H, W) uint16 depth image
|
||||||
|
params: CameraParams,
|
||||||
|
depth_window: int = 5, # half-side for depth median sampling
|
||||||
|
search_rows: int = 120, # half-height of height search strip
|
||||||
|
col_hw: int = 10, # half-width of height search strip
|
||||||
|
z_tol: float = 0.30, # depth tolerance for height estimation
|
||||||
|
obstacle_id: int = 0, # caller-assigned ID
|
||||||
|
) -> ObstacleSizeEstimate:
|
||||||
|
"""
|
||||||
|
Estimate the 3-D size of one LIDAR cluster using the D435i depth image.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
cluster : Cluster from _lidar_clustering.cluster_points()
|
||||||
|
depth_u16 : uint16 depth image (H × W), values in millimetres
|
||||||
|
params : CameraParams with intrinsics + extrinsics
|
||||||
|
depth_window : half-side of median depth sampling window (pixels)
|
||||||
|
search_rows : half-height for vertical height search (rows)
|
||||||
|
col_hw : half-width for vertical height search (columns)
|
||||||
|
z_tol : depth tolerance when collecting obstacle pixels (metres)
|
||||||
|
obstacle_id : arbitrary integer ID for this estimate
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
ObstacleSizeEstimate
|
||||||
|
"""
|
||||||
|
cx, cy = float(cluster.centroid[0]), float(cluster.centroid[1])
|
||||||
|
lidar_range = float(math.sqrt(cx * cx + cy * cy))
|
||||||
|
|
||||||
|
# Transform centroid to camera frame
|
||||||
|
x_cam, y_cam, z_cam = lidar_to_camera(cx, cy, params)
|
||||||
|
|
||||||
|
# Project to depth image pixel
|
||||||
|
px = project_to_pixel(x_cam, y_cam, z_cam, params)
|
||||||
|
if px is None:
|
||||||
|
return ObstacleSizeEstimate(
|
||||||
|
obstacle_id = obstacle_id,
|
||||||
|
centroid_x = cx,
|
||||||
|
centroid_y = cy,
|
||||||
|
depth_z = 0.0,
|
||||||
|
width_m = float(cluster.width_m),
|
||||||
|
height_m = 0.0,
|
||||||
|
pixel_u = -1,
|
||||||
|
pixel_v = -1,
|
||||||
|
lidar_range = lidar_range,
|
||||||
|
confidence = 0.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
u, v = px
|
||||||
|
|
||||||
|
# Sample depth
|
||||||
|
depth_m, n_valid = sample_depth_median(
|
||||||
|
depth_u16, u, v, window_px=depth_window,
|
||||||
|
depth_scale=params.depth_scale,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use LIDAR range as fallback when depth image has no valid reading
|
||||||
|
if depth_m <= 0.0:
|
||||||
|
depth_m = z_cam
|
||||||
|
depth_conf = 0.0
|
||||||
|
else:
|
||||||
|
# Confidence: ratio of valid pixels in window + how close to expected Z
|
||||||
|
max_valid = (2 * depth_window + 1) ** 2
|
||||||
|
fill_ratio = min(1.0, n_valid / max(max_valid * 0.25, 1.0))
|
||||||
|
z_err = abs(depth_m - z_cam)
|
||||||
|
z_conf = max(0.0, 1.0 - z_err / max(z_cam, 0.1))
|
||||||
|
depth_conf = float(0.6 * fill_ratio + 0.4 * z_conf)
|
||||||
|
|
||||||
|
# Width: LIDAR gives reliable horizontal measurement
|
||||||
|
width_m = float(cluster.width_m)
|
||||||
|
|
||||||
|
# Height: from depth image vertical strip
|
||||||
|
height_m = estimate_height(
|
||||||
|
depth_u16, u, v, depth_m, params,
|
||||||
|
search_rows=search_rows, col_hw=col_hw, z_tol=z_tol,
|
||||||
|
)
|
||||||
|
|
||||||
|
return ObstacleSizeEstimate(
|
||||||
|
obstacle_id = obstacle_id,
|
||||||
|
centroid_x = cx,
|
||||||
|
centroid_y = cy,
|
||||||
|
depth_z = float(depth_m),
|
||||||
|
width_m = width_m,
|
||||||
|
height_m = float(height_m),
|
||||||
|
pixel_u = int(u),
|
||||||
|
pixel_v = int(v),
|
||||||
|
lidar_range = lidar_range,
|
||||||
|
confidence = float(depth_conf),
|
||||||
|
)
|
||||||
@ -0,0 +1,262 @@
|
|||||||
|
"""
|
||||||
|
obstacle_size_node.py — Depth-based obstacle size estimator (Issue #348).
|
||||||
|
|
||||||
|
Fuses 2-D LIDAR clusters with the D435i depth image to estimate the full
|
||||||
|
3-D width and height (metres) of each obstacle.
|
||||||
|
|
||||||
|
Subscribes
|
||||||
|
----------
|
||||||
|
/scan sensor_msgs/LaserScan (BEST_EFFORT, ~10 Hz)
|
||||||
|
/camera/depth/image_rect_raw sensor_msgs/Image (BEST_EFFORT, ~15 Hz)
|
||||||
|
/camera/depth/camera_info sensor_msgs/CameraInfo (RELIABLE)
|
||||||
|
|
||||||
|
Publishes
|
||||||
|
---------
|
||||||
|
/saltybot/obstacle_sizes saltybot_scene_msgs/ObstacleSizeArray
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
Clustering
|
||||||
|
distance_threshold_m float 0.20 Max gap between consecutive scan pts (m)
|
||||||
|
min_points int 3 Min LIDAR points per cluster
|
||||||
|
range_min_m float 0.05 Discard ranges below this (m)
|
||||||
|
range_max_m float 12.0 Discard ranges above this (m)
|
||||||
|
|
||||||
|
Depth sampling
|
||||||
|
depth_window_px int 5 Half-side of median sampling window
|
||||||
|
search_rows_px int 120 Half-height of height search strip
|
||||||
|
col_hw_px int 10 Half-width of height search strip
|
||||||
|
z_tol_m float 0.30 Depth tolerance for height collection
|
||||||
|
|
||||||
|
Extrinsics (LIDAR origin in camera frame — metres)
|
||||||
|
lidar_ex float 0.0 Camera-frame X (right) offset
|
||||||
|
lidar_ey float 0.05 Camera-frame Y (down) offset
|
||||||
|
lidar_ez float 0.0 Camera-frame Z (forward) offset
|
||||||
|
|
||||||
|
Notes
|
||||||
|
-----
|
||||||
|
Camera intrinsics are read from /camera/depth/camera_info and override the
|
||||||
|
defaults in CameraParams once the first CameraInfo message arrives.
|
||||||
|
|
||||||
|
If the depth image is unavailable, the node publishes with depth_z=0 and
|
||||||
|
height_m=0, confidence=0 (LIDAR-only width estimate).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
from sensor_msgs.msg import CameraInfo, Image, LaserScan
|
||||||
|
from saltybot_scene_msgs.msg import ObstacleSize, ObstacleSizeArray
|
||||||
|
|
||||||
|
from ._lidar_clustering import scan_to_cartesian, cluster_points
|
||||||
|
from ._obstacle_size import CameraParams, estimate_cluster_size
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=4,
|
||||||
|
)
|
||||||
|
_RELIABLE_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ObstacleSizeNode(Node):
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__('obstacle_size_node')
|
||||||
|
|
||||||
|
# ── Parameters ──────────────────────────────────────────────────────
|
||||||
|
# Clustering
|
||||||
|
self.declare_parameter('distance_threshold_m', 0.20)
|
||||||
|
self.declare_parameter('min_points', 3)
|
||||||
|
self.declare_parameter('range_min_m', 0.05)
|
||||||
|
self.declare_parameter('range_max_m', 12.0)
|
||||||
|
# Depth sampling
|
||||||
|
self.declare_parameter('depth_window_px', 5)
|
||||||
|
self.declare_parameter('search_rows_px', 120)
|
||||||
|
self.declare_parameter('col_hw_px', 10)
|
||||||
|
self.declare_parameter('z_tol_m', 0.30)
|
||||||
|
# Extrinsics
|
||||||
|
self.declare_parameter('lidar_ex', 0.0)
|
||||||
|
self.declare_parameter('lidar_ey', 0.05)
|
||||||
|
self.declare_parameter('lidar_ez', 0.0)
|
||||||
|
|
||||||
|
p = self.get_parameter
|
||||||
|
self._dist_thresh = float(p('distance_threshold_m').value)
|
||||||
|
self._min_pts = int(p('min_points').value)
|
||||||
|
self._range_min = float(p('range_min_m').value)
|
||||||
|
self._range_max = float(p('range_max_m').value)
|
||||||
|
self._win_px = int(p('depth_window_px').value)
|
||||||
|
self._rows_px = int(p('search_rows_px').value)
|
||||||
|
self._col_hw = int(p('col_hw_px').value)
|
||||||
|
self._z_tol = float(p('z_tol_m').value)
|
||||||
|
|
||||||
|
# Build default CameraParams (overridden when CameraInfo arrives)
|
||||||
|
self._cam = CameraParams(
|
||||||
|
ex=float(p('lidar_ex').value),
|
||||||
|
ey=float(p('lidar_ey').value),
|
||||||
|
ez=float(p('lidar_ez').value),
|
||||||
|
)
|
||||||
|
self._cam_info_received = False
|
||||||
|
|
||||||
|
# Latest depth frame
|
||||||
|
self._depth_image: Optional[np.ndarray] = None
|
||||||
|
|
||||||
|
# ── Subscribers ──────────────────────────────────────────────────────
|
||||||
|
self._scan_sub = self.create_subscription(
|
||||||
|
LaserScan, '/scan', self._on_scan, _SENSOR_QOS
|
||||||
|
)
|
||||||
|
self._depth_sub = self.create_subscription(
|
||||||
|
Image, '/camera/depth/image_rect_raw',
|
||||||
|
self._on_depth, _SENSOR_QOS
|
||||||
|
)
|
||||||
|
self._info_sub = self.create_subscription(
|
||||||
|
CameraInfo, '/camera/depth/camera_info',
|
||||||
|
self._on_camera_info, _RELIABLE_QOS
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publisher ────────────────────────────────────────────────────────
|
||||||
|
self._pub = self.create_publisher(
|
||||||
|
ObstacleSizeArray, '/saltybot/obstacle_sizes', 10
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'obstacle_size_node ready — '
|
||||||
|
f'dist_thresh={self._dist_thresh}m, z_tol={self._z_tol}m'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_camera_info(self, msg: CameraInfo) -> None:
|
||||||
|
"""Latch camera intrinsics on first message."""
|
||||||
|
if self._cam_info_received:
|
||||||
|
return
|
||||||
|
ex = self._cam.ex
|
||||||
|
ey = self._cam.ey
|
||||||
|
ez = self._cam.ez
|
||||||
|
self._cam = CameraParams(
|
||||||
|
fx=float(msg.k[0]),
|
||||||
|
fy=float(msg.k[4]),
|
||||||
|
cx=float(msg.k[2]),
|
||||||
|
cy=float(msg.k[5]),
|
||||||
|
width=int(msg.width),
|
||||||
|
height=int(msg.height),
|
||||||
|
depth_scale=self._cam.depth_scale,
|
||||||
|
ex=ex, ey=ey, ez=ez,
|
||||||
|
)
|
||||||
|
self._cam_info_received = True
|
||||||
|
self.get_logger().info(
|
||||||
|
f'Camera intrinsics loaded: '
|
||||||
|
f'fx={self._cam.fx:.1f} fy={self._cam.fy:.1f} '
|
||||||
|
f'cx={self._cam.cx:.1f} cy={self._cam.cy:.1f} '
|
||||||
|
f'{self._cam.width}×{self._cam.height}'
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_depth(self, msg: Image) -> None:
|
||||||
|
"""Cache the latest depth frame as a numpy uint16 array."""
|
||||||
|
enc = msg.encoding.lower()
|
||||||
|
if enc not in ('16uc1', 'mono16'):
|
||||||
|
self.get_logger().warn(
|
||||||
|
f'Unexpected depth encoding {msg.encoding!r} — expected 16UC1',
|
||||||
|
throttle_duration_sec=10.0,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
data = np.frombuffer(msg.data, dtype=np.uint16)
|
||||||
|
try:
|
||||||
|
self._depth_image = data.reshape((msg.height, msg.width))
|
||||||
|
except ValueError as exc:
|
||||||
|
self.get_logger().warn(f'Depth reshape failed: {exc}')
|
||||||
|
|
||||||
|
def _on_scan(self, msg: LaserScan) -> None:
|
||||||
|
"""Cluster LIDAR scan, project clusters to depth image, publish sizes."""
|
||||||
|
points = scan_to_cartesian(
|
||||||
|
list(msg.ranges),
|
||||||
|
msg.angle_min,
|
||||||
|
msg.angle_increment,
|
||||||
|
range_min=self._range_min,
|
||||||
|
range_max=self._range_max,
|
||||||
|
)
|
||||||
|
clusters = cluster_points(
|
||||||
|
points,
|
||||||
|
distance_threshold_m=self._dist_thresh,
|
||||||
|
min_points=self._min_pts,
|
||||||
|
)
|
||||||
|
|
||||||
|
depth = self._depth_image
|
||||||
|
|
||||||
|
# Build output message
|
||||||
|
out = ObstacleSizeArray()
|
||||||
|
out.header.stamp = msg.header.stamp
|
||||||
|
out.header.frame_id = msg.header.frame_id or 'laser'
|
||||||
|
|
||||||
|
for idx, cluster in enumerate(clusters):
|
||||||
|
if depth is not None:
|
||||||
|
est = estimate_cluster_size(
|
||||||
|
cluster,
|
||||||
|
depth,
|
||||||
|
self._cam,
|
||||||
|
depth_window = self._win_px,
|
||||||
|
search_rows = self._rows_px,
|
||||||
|
col_hw = self._col_hw,
|
||||||
|
z_tol = self._z_tol,
|
||||||
|
obstacle_id = idx,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# No depth image available — publish LIDAR-only width
|
||||||
|
from ._obstacle_size import ObstacleSizeEstimate
|
||||||
|
import math as _math
|
||||||
|
cx = float(cluster.centroid[0])
|
||||||
|
cy = float(cluster.centroid[1])
|
||||||
|
est = ObstacleSizeEstimate(
|
||||||
|
obstacle_id = idx,
|
||||||
|
centroid_x = cx,
|
||||||
|
centroid_y = cy,
|
||||||
|
depth_z = 0.0,
|
||||||
|
width_m = float(cluster.width_m),
|
||||||
|
height_m = 0.0,
|
||||||
|
pixel_u = -1,
|
||||||
|
pixel_v = -1,
|
||||||
|
lidar_range = float(_math.sqrt(cx * cx + cy * cy)),
|
||||||
|
confidence = 0.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
obs = ObstacleSize()
|
||||||
|
obs.header = out.header
|
||||||
|
obs.obstacle_id = idx
|
||||||
|
obs.centroid_x = float(est.centroid_x)
|
||||||
|
obs.centroid_y = float(est.centroid_y)
|
||||||
|
obs.depth_z = float(est.depth_z)
|
||||||
|
obs.width_m = float(est.width_m)
|
||||||
|
obs.height_m = float(est.height_m)
|
||||||
|
obs.pixel_u = int(est.pixel_u)
|
||||||
|
obs.pixel_v = int(est.pixel_v)
|
||||||
|
obs.lidar_range = float(est.lidar_range)
|
||||||
|
obs.confidence = float(est.confidence)
|
||||||
|
out.obstacles.append(obs)
|
||||||
|
|
||||||
|
self._pub.publish(out)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = ObstacleSizeNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -55,6 +55,8 @@ setup(
|
|||||||
'obstacle_velocity = saltybot_bringup.obstacle_velocity_node:main',
|
'obstacle_velocity = saltybot_bringup.obstacle_velocity_node:main',
|
||||||
# Lane/path edge detector (Issue #339)
|
# Lane/path edge detector (Issue #339)
|
||||||
'path_edges = saltybot_bringup.path_edges_node:main',
|
'path_edges = saltybot_bringup.path_edges_node:main',
|
||||||
|
# Depth-based obstacle size estimator (Issue #348)
|
||||||
|
'obstacle_size = saltybot_bringup.obstacle_size_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
364
jetson/ros2_ws/src/saltybot_bringup/test/test_obstacle_size.py
Normal file
364
jetson/ros2_ws/src/saltybot_bringup/test/test_obstacle_size.py
Normal file
@ -0,0 +1,364 @@
|
|||||||
|
"""
|
||||||
|
test_obstacle_size.py — pytest tests for _obstacle_size.py (no ROS2 required).
|
||||||
|
|
||||||
|
Tests cover:
|
||||||
|
CameraParams — defaults and construction
|
||||||
|
lidar_to_camera — coordinate transform
|
||||||
|
project_to_pixel — pinhole projection + bounds checks
|
||||||
|
sample_depth_median — empty/uniform/sparse depth images
|
||||||
|
estimate_height — flat/obstacle/edge cases
|
||||||
|
estimate_cluster_size — full pipeline with synthetic cluster + depth
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_bringup._obstacle_size import (
|
||||||
|
CameraParams,
|
||||||
|
ObstacleSizeEstimate,
|
||||||
|
estimate_cluster_size,
|
||||||
|
estimate_height,
|
||||||
|
lidar_to_camera,
|
||||||
|
project_to_pixel,
|
||||||
|
sample_depth_median,
|
||||||
|
)
|
||||||
|
# Import Cluster helper from lidar_clustering for building test fixtures
|
||||||
|
from saltybot_bringup._lidar_clustering import Cluster
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _default_params(**kw) -> CameraParams:
|
||||||
|
p = CameraParams()
|
||||||
|
for k, v in kw.items():
|
||||||
|
object.__setattr__(p, k, v)
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
def _blank_depth(h: int = 480, w: int = 640, val: int = 0) -> np.ndarray:
|
||||||
|
return np.full((h, w), val, dtype=np.uint16)
|
||||||
|
|
||||||
|
|
||||||
|
def _obstacle_depth(
|
||||||
|
h: int = 480, w: int = 640,
|
||||||
|
depth_mm: int = 2000,
|
||||||
|
u_c: int = 320, v_c: int = 240,
|
||||||
|
half_w: int = 30, half_h: int = 60,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Create a synthetic depth image with a rectangular obstacle."""
|
||||||
|
img = _blank_depth(h, w)
|
||||||
|
r0, r1 = max(0, v_c - half_h), min(h, v_c + half_h + 1)
|
||||||
|
c0, c1 = max(0, u_c - half_w), min(w, u_c + half_w + 1)
|
||||||
|
img[r0:r1, c0:c1] = depth_mm
|
||||||
|
return img
|
||||||
|
|
||||||
|
|
||||||
|
def _make_cluster(
|
||||||
|
cx: float = 2.0, cy: float = 0.0,
|
||||||
|
width_m: float = 0.4, depth_m: float = 0.3,
|
||||||
|
n_pts: int = 10,
|
||||||
|
) -> Cluster:
|
||||||
|
"""Construct a minimal Cluster NamedTuple for testing."""
|
||||||
|
pts = np.column_stack([
|
||||||
|
np.full(n_pts, cx),
|
||||||
|
np.full(n_pts, cy),
|
||||||
|
]).astype(np.float64)
|
||||||
|
centroid = np.array([cx, cy], dtype=np.float64)
|
||||||
|
bbox_min = centroid - np.array([width_m / 2, depth_m / 2])
|
||||||
|
bbox_max = centroid + np.array([width_m / 2, depth_m / 2])
|
||||||
|
return Cluster(
|
||||||
|
points=pts, centroid=centroid,
|
||||||
|
bbox_min=bbox_min, bbox_max=bbox_max,
|
||||||
|
width_m=width_m, depth_m=depth_m,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── CameraParams ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestCameraParams:
|
||||||
|
def test_defaults(self):
|
||||||
|
p = CameraParams()
|
||||||
|
assert p.fx == pytest.approx(383.0)
|
||||||
|
assert p.fy == pytest.approx(383.0)
|
||||||
|
assert p.cx == pytest.approx(320.0)
|
||||||
|
assert p.cy == pytest.approx(240.0)
|
||||||
|
assert p.width == 640
|
||||||
|
assert p.height == 480
|
||||||
|
assert p.depth_scale == pytest.approx(0.001)
|
||||||
|
assert p.ey == pytest.approx(0.05)
|
||||||
|
|
||||||
|
def test_custom(self):
|
||||||
|
p = CameraParams(fx=400.0, fy=400.0, cx=330.0, cy=250.0)
|
||||||
|
assert p.fx == pytest.approx(400.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── lidar_to_camera ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestLidarToCamera:
|
||||||
|
def test_forward_no_offset(self):
|
||||||
|
"""A point directly in front of the LIDAR (y_lidar=0) → x_cam=0."""
|
||||||
|
p = CameraParams(ex=0.0, ey=0.0, ez=0.0)
|
||||||
|
x_cam, y_cam, z_cam = lidar_to_camera(3.0, 0.0, p)
|
||||||
|
assert x_cam == pytest.approx(0.0)
|
||||||
|
assert y_cam == pytest.approx(0.0)
|
||||||
|
assert z_cam == pytest.approx(3.0)
|
||||||
|
|
||||||
|
def test_lateral_maps_to_x(self):
|
||||||
|
"""y_lidar positive (left) → x_cam negative (right convention)."""
|
||||||
|
p = CameraParams(ex=0.0, ey=0.0, ez=0.0)
|
||||||
|
x_cam, y_cam, z_cam = lidar_to_camera(0.0, 1.0, p)
|
||||||
|
assert x_cam == pytest.approx(-1.0)
|
||||||
|
assert z_cam == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_extrinsic_offset_applied(self):
|
||||||
|
"""Extrinsic translation is added to camera-frame coords."""
|
||||||
|
p = CameraParams(ex=0.1, ey=0.05, ez=-0.02)
|
||||||
|
x_cam, y_cam, z_cam = lidar_to_camera(2.0, 0.0, p)
|
||||||
|
assert x_cam == pytest.approx(0.1) # -y_lidar + ex = 0 + 0.1
|
||||||
|
assert y_cam == pytest.approx(0.05) # 0 + ey
|
||||||
|
assert z_cam == pytest.approx(1.98) # 2.0 + (-0.02)
|
||||||
|
|
||||||
|
def test_zero_point(self):
|
||||||
|
p = CameraParams(ex=0.0, ey=0.0, ez=0.0)
|
||||||
|
x_cam, y_cam, z_cam = lidar_to_camera(0.0, 0.0, p)
|
||||||
|
assert z_cam == pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── project_to_pixel ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestProjectToPixel:
|
||||||
|
def test_centre_projects_to_principal(self):
|
||||||
|
"""A point exactly on the optical axis projects to (cx, cy)."""
|
||||||
|
p = CameraParams(fx=400.0, fy=400.0, cx=320.0, cy=240.0)
|
||||||
|
px = project_to_pixel(0.0, 0.0, 2.0, p)
|
||||||
|
assert px is not None
|
||||||
|
u, v = px
|
||||||
|
assert u == 320
|
||||||
|
assert v == 240
|
||||||
|
|
||||||
|
def test_negative_z_returns_none(self):
|
||||||
|
p = CameraParams()
|
||||||
|
assert project_to_pixel(0.0, 0.0, -1.0, p) is None
|
||||||
|
|
||||||
|
def test_zero_z_returns_none(self):
|
||||||
|
p = CameraParams()
|
||||||
|
assert project_to_pixel(0.0, 0.0, 0.0, p) is None
|
||||||
|
|
||||||
|
def test_off_to_right(self):
|
||||||
|
"""A point to the right of axis should land to the right of cx."""
|
||||||
|
p = CameraParams(fx=400.0, cx=320.0, cy=240.0)
|
||||||
|
px = project_to_pixel(0.5, 0.0, 2.0, p)
|
||||||
|
assert px is not None
|
||||||
|
u, _ = px
|
||||||
|
assert u > 320
|
||||||
|
|
||||||
|
def test_out_of_image_returns_none(self):
|
||||||
|
p = CameraParams(fx=100.0, fy=100.0, cx=50.0, cy=50.0,
|
||||||
|
width=100, height=100)
|
||||||
|
# Very far off-axis → outside image
|
||||||
|
px = project_to_pixel(10.0, 0.0, 1.0, p)
|
||||||
|
assert px is None
|
||||||
|
|
||||||
|
def test_known_projection(self):
|
||||||
|
"""Verify exact pixel for known 3D point."""
|
||||||
|
p = CameraParams(fx=400.0, fy=400.0, cx=320.0, cy=240.0,
|
||||||
|
width=640, height=480)
|
||||||
|
# x_cam=1.0, z_cam=4.0 → u = 400*(1/4)+320 = 420
|
||||||
|
# y_cam=0.0 → v = 400*(0/4)+240 = 240
|
||||||
|
px = project_to_pixel(1.0, 0.0, 4.0, p)
|
||||||
|
assert px == (420, 240)
|
||||||
|
|
||||||
|
|
||||||
|
# ── sample_depth_median ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestSampleDepthMedian:
|
||||||
|
def test_all_zeros_returns_zero(self):
|
||||||
|
img = _blank_depth()
|
||||||
|
d, n = sample_depth_median(img, 320, 240, window_px=5)
|
||||||
|
assert d == pytest.approx(0.0)
|
||||||
|
assert n == 0
|
||||||
|
|
||||||
|
def test_uniform_image(self):
|
||||||
|
"""All pixels set to 2000 mm → median = 2.0 m."""
|
||||||
|
img = _blank_depth(val=2000)
|
||||||
|
d, n = sample_depth_median(img, 320, 240, window_px=5, depth_scale=0.001)
|
||||||
|
assert d == pytest.approx(2.0)
|
||||||
|
assert n > 0
|
||||||
|
|
||||||
|
def test_scale_applied(self):
|
||||||
|
img = _blank_depth(val=3000)
|
||||||
|
d, _ = sample_depth_median(img, 320, 240, window_px=3, depth_scale=0.001)
|
||||||
|
assert d == pytest.approx(3.0)
|
||||||
|
|
||||||
|
def test_window_clips_at_image_edge(self):
|
||||||
|
"""Sampling near edge should not crash."""
|
||||||
|
img = _blank_depth(val=1500)
|
||||||
|
d, n = sample_depth_median(img, 0, 0, window_px=10)
|
||||||
|
assert d > 0.0
|
||||||
|
assert n > 0
|
||||||
|
|
||||||
|
def test_sparse_window(self):
|
||||||
|
"""Only some pixels are valid — median should reflect valid ones."""
|
||||||
|
img = _blank_depth()
|
||||||
|
img[240, 320] = 1000 # single valid pixel
|
||||||
|
d, n = sample_depth_median(img, 320, 240, window_px=5, depth_scale=0.001)
|
||||||
|
assert d == pytest.approx(1.0)
|
||||||
|
assert n == 1
|
||||||
|
|
||||||
|
def test_mixed_window_median(self):
|
||||||
|
"""Median of [1000, 2000, 3000] mm = 2000 mm = 2.0 m."""
|
||||||
|
img = _blank_depth()
|
||||||
|
img[240, 319] = 1000
|
||||||
|
img[240, 320] = 2000
|
||||||
|
img[240, 321] = 3000
|
||||||
|
d, n = sample_depth_median(img, 320, 240, window_px=1, depth_scale=0.001)
|
||||||
|
assert d == pytest.approx(2.0)
|
||||||
|
assert n == 3
|
||||||
|
|
||||||
|
|
||||||
|
# ── estimate_height ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestEstimateHeight:
|
||||||
|
def test_blank_image_returns_zero(self):
|
||||||
|
p = CameraParams(fy=383.0)
|
||||||
|
img = _blank_depth()
|
||||||
|
h = estimate_height(img, 320, 240, z_ref=2.0, params=p)
|
||||||
|
assert h == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_zero_z_ref_returns_zero(self):
|
||||||
|
p = CameraParams()
|
||||||
|
img = _blank_depth(val=2000)
|
||||||
|
h = estimate_height(img, 320, 240, z_ref=0.0, params=p)
|
||||||
|
assert h == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_known_height(self):
|
||||||
|
"""
|
||||||
|
Obstacle occupies rows 180–300 (120 row-pixel span) at depth 2.0m.
|
||||||
|
Expected height ≈ 120 * 2.0 / 383.0 ≈ 0.627 m.
|
||||||
|
"""
|
||||||
|
p = CameraParams(fy=383.0, depth_scale=0.001)
|
||||||
|
img = _blank_depth()
|
||||||
|
# Obstacle at z=2000 mm, rows 180–300, cols 300–340
|
||||||
|
img[180:301, 300:341] = 2000
|
||||||
|
h = estimate_height(img, 320, 240, z_ref=2.0, params=p,
|
||||||
|
search_rows=200, col_hw=30, z_tol=0.3)
|
||||||
|
expected = 120 * 2.0 / 383.0
|
||||||
|
assert h == pytest.approx(expected, rel=0.05)
|
||||||
|
|
||||||
|
def test_single_row_returns_zero(self):
|
||||||
|
"""Only one valid row → span=0 → height=0."""
|
||||||
|
p = CameraParams(fy=383.0, depth_scale=0.001)
|
||||||
|
img = _blank_depth()
|
||||||
|
img[240, 315:326] = 2000 # single row
|
||||||
|
h = estimate_height(img, 320, 240, z_ref=2.0, params=p,
|
||||||
|
search_rows=20, col_hw=10, z_tol=0.3)
|
||||||
|
assert h == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_depth_outside_tolerance_excluded(self):
|
||||||
|
"""Pixels far from z_ref should not contribute to height."""
|
||||||
|
p = CameraParams(fy=383.0, depth_scale=0.001)
|
||||||
|
img = _blank_depth()
|
||||||
|
# Two rows at very different depths — only one within z_tol
|
||||||
|
img[220, 315:326] = 2000 # z = 2.0 m (within tolerance)
|
||||||
|
img[260, 315:326] = 5000 # z = 5.0 m (outside tolerance)
|
||||||
|
h = estimate_height(img, 320, 240, z_ref=2.0, params=p,
|
||||||
|
search_rows=100, col_hw=10, z_tol=0.3)
|
||||||
|
assert h == pytest.approx(0.0) # only 1 valid row → 0
|
||||||
|
|
||||||
|
|
||||||
|
# ── estimate_cluster_size ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestEstimateClusterSize:
|
||||||
|
def test_returns_named_tuple(self):
|
||||||
|
cluster = _make_cluster()
|
||||||
|
img = _blank_depth(val=2000)
|
||||||
|
est = estimate_cluster_size(cluster, img, CameraParams())
|
||||||
|
assert isinstance(est, ObstacleSizeEstimate)
|
||||||
|
|
||||||
|
def test_width_comes_from_lidar(self):
|
||||||
|
"""Width should equal the LIDAR cluster width_m."""
|
||||||
|
cluster = _make_cluster(cx=3.0, cy=0.0, width_m=0.5)
|
||||||
|
img = _blank_depth(val=3000)
|
||||||
|
est = estimate_cluster_size(cluster, img, CameraParams())
|
||||||
|
assert est.width_m == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_centroid_preserved(self):
|
||||||
|
cluster = _make_cluster(cx=2.0, cy=0.3)
|
||||||
|
img = _blank_depth(val=2000)
|
||||||
|
est = estimate_cluster_size(cluster, img, CameraParams())
|
||||||
|
assert est.centroid_x == pytest.approx(2.0)
|
||||||
|
assert est.centroid_y == pytest.approx(0.3)
|
||||||
|
|
||||||
|
def test_lidar_range_correct(self):
|
||||||
|
cluster = _make_cluster(cx=3.0, cy=4.0) # range = 5.0
|
||||||
|
img = _blank_depth(val=5000)
|
||||||
|
est = estimate_cluster_size(cluster, img, CameraParams())
|
||||||
|
assert est.lidar_range == pytest.approx(5.0)
|
||||||
|
|
||||||
|
def test_blank_depth_gives_zero_confidence(self):
|
||||||
|
"""No valid depth pixels → confidence=0, depth_z falls back to z_cam."""
|
||||||
|
cluster = _make_cluster(cx=2.0, cy=0.0)
|
||||||
|
img = _blank_depth()
|
||||||
|
p = CameraParams(ex=0.0, ey=0.0, ez=0.0)
|
||||||
|
est = estimate_cluster_size(cluster, img, p)
|
||||||
|
assert est.confidence == pytest.approx(0.0)
|
||||||
|
assert est.depth_z > 0.0 # LIDAR-fallback z_cam
|
||||||
|
|
||||||
|
def test_obstacle_in_depth_image(self):
|
||||||
|
"""With a real depth patch, confidence > 0 and pixel is in-image."""
|
||||||
|
# Cluster at x=2m forward, y=0 → z_cam=2m, x_cam=0, y_cam=ey
|
||||||
|
p = CameraParams(fy=383.0, fx=383.0, cx=320.0, cy=240.0,
|
||||||
|
depth_scale=0.001, ex=0.0, ey=0.05, ez=0.0)
|
||||||
|
cluster = _make_cluster(cx=2.0, cy=0.0)
|
||||||
|
# z_cam=2m, x_cam=0, y_cam=0.05
|
||||||
|
# u = 383*0/2+320 = 320, v = 383*0.05/2+240 ≈ 249
|
||||||
|
img = _obstacle_depth(depth_mm=2000, u_c=320, v_c=249,
|
||||||
|
half_w=30, half_h=80)
|
||||||
|
est = estimate_cluster_size(cluster, img, p,
|
||||||
|
depth_window=5, search_rows=120,
|
||||||
|
col_hw=10, z_tol=0.3, obstacle_id=1)
|
||||||
|
assert est.confidence > 0.0
|
||||||
|
assert 0 <= est.pixel_u < 640
|
||||||
|
assert 0 <= est.pixel_v < 480
|
||||||
|
assert est.depth_z == pytest.approx(2.0, abs=0.1)
|
||||||
|
assert est.obstacle_id == 1
|
||||||
|
|
||||||
|
def test_behind_camera_returns_zero_confidence(self):
|
||||||
|
"""Cluster behind camera (x_lidar < 0 with no ez) → not projected."""
|
||||||
|
p = CameraParams(ex=0.0, ey=0.0, ez=0.0)
|
||||||
|
# x_lidar = -1.0 → z_cam = -1 + 0 = -1 → behind camera
|
||||||
|
cluster = _make_cluster(cx=-1.0, cy=0.0)
|
||||||
|
img = _blank_depth(val=1000)
|
||||||
|
est = estimate_cluster_size(cluster, img, p)
|
||||||
|
assert est.confidence == pytest.approx(0.0)
|
||||||
|
assert est.pixel_u == -1
|
||||||
|
assert est.pixel_v == -1
|
||||||
|
|
||||||
|
def test_height_estimated_from_depth(self):
|
||||||
|
"""When obstacle spans rows, height_m > 0."""
|
||||||
|
p = CameraParams(fy=383.0, fx=383.0, cx=320.0, cy=240.0,
|
||||||
|
depth_scale=0.001, ex=0.0, ey=0.05, ez=0.0)
|
||||||
|
cluster = _make_cluster(cx=2.0, cy=0.0)
|
||||||
|
# Expected pixel: u≈320, v≈249; give obstacle a 100-row span
|
||||||
|
img = _obstacle_depth(depth_mm=2000, u_c=320, v_c=249,
|
||||||
|
half_w=20, half_h=50)
|
||||||
|
est = estimate_cluster_size(cluster, img, p,
|
||||||
|
search_rows=150, col_hw=15, z_tol=0.3)
|
||||||
|
assert est.height_m > 0.0
|
||||||
|
|
||||||
|
def test_obstacle_id_passed_through(self):
|
||||||
|
cluster = _make_cluster()
|
||||||
|
img = _blank_depth(val=2000)
|
||||||
|
est = estimate_cluster_size(cluster, img, CameraParams(), obstacle_id=42)
|
||||||
|
assert est.obstacle_id == 42
|
||||||
|
|
||||||
|
def test_confidence_bounded(self):
|
||||||
|
cluster = _make_cluster(cx=2.0)
|
||||||
|
img = _blank_depth(val=2000)
|
||||||
|
est = estimate_cluster_size(cluster, img, CameraParams())
|
||||||
|
assert 0.0 <= est.confidence <= 1.0
|
||||||
@ -27,6 +27,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||||||
"msg/ObstacleVelocityArray.msg"
|
"msg/ObstacleVelocityArray.msg"
|
||||||
# Issue #339 — lane/path edge detector
|
# Issue #339 — lane/path edge detector
|
||||||
"msg/PathEdges.msg"
|
"msg/PathEdges.msg"
|
||||||
|
# Issue #348 — depth-based obstacle size estimator
|
||||||
|
"msg/ObstacleSize.msg"
|
||||||
|
"msg/ObstacleSizeArray.msg"
|
||||||
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
27
jetson/ros2_ws/src/saltybot_scene_msgs/msg/ObstacleSize.msg
Normal file
27
jetson/ros2_ws/src/saltybot_scene_msgs/msg/ObstacleSize.msg
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
# ObstacleSize.msg — Depth-projected LIDAR cluster size estimate (Issue #348)
|
||||||
|
#
|
||||||
|
# Fuses a 2-D LIDAR cluster with the D435i depth image to estimate the full
|
||||||
|
# 3-D size (width × height) of each detected obstacle.
|
||||||
|
#
|
||||||
|
# obstacle_id : matches the obstacle_id in ObstacleVelocity (same LIDAR cluster)
|
||||||
|
# centroid_x : LIDAR-frame forward distance to centroid (metres)
|
||||||
|
# centroid_y : LIDAR-frame lateral distance to centroid (metres, +Y = left)
|
||||||
|
# depth_z : sampled D435i depth at projected centroid (metres, 0 = unknown)
|
||||||
|
# width_m : horizontal size from LIDAR bbox (metres)
|
||||||
|
# height_m : vertical size from D435i depth strip (metres, 0 = unknown)
|
||||||
|
# pixel_u : projected centroid column in depth image (pixels)
|
||||||
|
# pixel_v : projected centroid row in depth image (pixels)
|
||||||
|
# lidar_range : range from LIDAR origin to centroid (metres)
|
||||||
|
# confidence : 0.0–1.0; based on depth sample quality and cluster track age
|
||||||
|
#
|
||||||
|
std_msgs/Header header
|
||||||
|
uint32 obstacle_id
|
||||||
|
float32 centroid_x
|
||||||
|
float32 centroid_y
|
||||||
|
float32 depth_z
|
||||||
|
float32 width_m
|
||||||
|
float32 height_m
|
||||||
|
int32 pixel_u
|
||||||
|
int32 pixel_v
|
||||||
|
float32 lidar_range
|
||||||
|
float32 confidence
|
||||||
@ -0,0 +1,4 @@
|
|||||||
|
# ObstacleSizeArray.msg — All depth-projected obstacle size estimates (Issue #348)
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
ObstacleSize[] obstacles
|
||||||
Loading…
x
Reference in New Issue
Block a user