Compare commits

..

3 Commits

Author SHA1 Message Date
c7dd07f9ed feat(social): proximity-based greeting trigger — Issue #270
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 12s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Adds greeting_trigger_node to saltybot_social:
- Subscribes to /social/faces/detected (FaceDetectionArray) for face arrivals
- Subscribes to /social/person_states (PersonStateArray) to cache face_id→distance
- Fires greeting when face_id is within proximity_m (default 2m) and
  not in per-face_id cooldown window (default 300s)
- Publishes JSON on /saltybot/greeting_trigger:
  {face_id, person_name, distance_m, ts}
- unknown_distance param controls assumed distance for faces with no PersonState yet
- Thread-safe distance cache and greeted map
- 50/50 tests passing

Closes #270
2026-03-02 17:26:40 -05:00
01ee02f837 Merge pull request 'feat(bringup): D435i depth hole filler via bilateral interpolation (Issue #268)' (#271) from sl-perception/issue-268-depth-holes into main 2026-03-02 17:26:22 -05:00
f0e11fe7ca feat(bringup): depth image hole filler via bilateral interpolation (Issue #268)
Adds multi-pass spatial-Gaussian hole filler for D435i depth images.
Each pass replaces zero/NaN pixels with the Gaussian-weighted mean of valid
neighbours in a growing kernel (×1, ×2.5, ×6 default); original valid
pixels are never modified.  Handles uint16 mm → float32 m conversion,
border pixels via BORDER_REFLECT, and above-d_max pixels as holes.
Publishes filled float32 depth on /camera/depth/filled at camera rate.
37/37 pure-Python tests pass (no ROS2 required).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 14:19:27 -05:00
4 changed files with 552 additions and 0 deletions

View File

@ -0,0 +1,141 @@
"""
_depth_hole_fill.py Depth image hole filling via bilateral interpolation (no ROS2 deps).
Algorithm
---------
A "hole" is any pixel where depth == 0, depth is NaN, or depth is outside the
valid range [d_min, d_max].
Each pass replaces every hole pixel with the spatial-Gaussian-weighted mean of
valid pixels in a (kernel_size × kernel_size) neighbourhood:
filled[x,y] = Σ G(||p - q||; σ) · d[q] / Σ G(||p - q||; σ)
q valid neighbours of (x,y)
The denominator (sum of spatial weights over valid pixels) normalises correctly
even at image borders and around isolated valid pixels.
Multiple passes with geometrically growing kernels are applied so that:
Pass 1 kernel_size fills small holes ( kernel_size/2 px radius)
Pass 2 kernel_size × 2.5 fills medium holes
Pass 3 kernel_size × 6.0 fills large holes / fronto-parallel surfaces
After all passes any remaining zeros are left as-is (no valid neighbourhood data).
Because only the spatial Gaussian (not a depth range term) is used as the weighting
function, this is equivalent to a bilateral filter with σ_range . In practice
this produces smooth, physically plausible fills in the depth domain.
Public API
----------
fill_holes(depth, kernel_size=5, d_min=0.1, d_max=10.0, max_passes=3) ndarray
valid_mask(depth, d_min=0.1, d_max=10.0) bool ndarray
"""
from __future__ import annotations
import math
from typing import Optional
import numpy as np
# Kernel size multipliers for successive passes
_PASS_SCALE = [1.0, 2.5, 6.0]
def valid_mask(
depth: np.ndarray,
d_min: float = 0.1,
d_max: float = 10.0,
) -> np.ndarray:
"""
Return a boolean mask of valid (non-hole) pixels.
Parameters
----------
depth : (H, W) float32 ndarray, depth in metres
d_min : minimum valid depth (m)
d_max : maximum valid depth (m)
Returns
-------
(H, W) bool ndarray True where depth is finite and in [d_min, d_max]
"""
return np.isfinite(depth) & (depth >= d_min) & (depth <= d_max)
def fill_holes(
depth: np.ndarray,
kernel_size: int = 5,
d_min: float = 0.1,
d_max: float = 10.0,
max_passes: int = 3,
) -> np.ndarray:
"""
Fill zero/NaN depth pixels using multi-pass spatial Gaussian interpolation.
Parameters
----------
depth : (H, W) float32 ndarray, depth in metres
kernel_size : initial kernel side length (pixels, forced odd, 3)
d_min : minimum valid depth pixels below this are treated as holes
d_max : maximum valid depth pixels above this are treated as holes
max_passes : number of fill passes (13); each uses a larger kernel
Returns
-------
(H, W) float32 ndarray same as input, with holes filled where possible.
Pixels with no valid neighbours after all passes remain 0.0.
Original valid pixels are never modified.
"""
import cv2
depth = np.asarray(depth, dtype=np.float32)
# Replace NaN with 0 so arithmetic is clean
depth = np.where(np.isfinite(depth), depth, 0.0).astype(np.float32)
mask = valid_mask(depth, d_min, d_max) # True where already valid
result = depth.copy()
n_passes = max(1, min(max_passes, len(_PASS_SCALE)))
for i in range(n_passes):
if mask.all():
break # no holes left
ks = _odd_kernel_size(kernel_size, _PASS_SCALE[i])
half = ks // 2
sigma = max(half / 2.0, 0.5)
gk = cv2.getGaussianKernel(ks, sigma).astype(np.float32)
kernel = (gk @ gk.T)
# Multiply depth by mask so invalid pixels contribute 0 weight
d_valid = np.where(mask, result, 0.0).astype(np.float32)
w_valid = mask.astype(np.float32)
sum_d = cv2.filter2D(d_valid, ddepth=-1, kernel=kernel,
borderType=cv2.BORDER_REFLECT)
sum_w = cv2.filter2D(w_valid, ddepth=-1, kernel=kernel,
borderType=cv2.BORDER_REFLECT)
# Where we have enough weight, compute the weighted mean
has_data = sum_w > 1e-6
interp = np.where(has_data, sum_d / np.where(has_data, sum_w, 1.0), 0.0)
# Only fill holes — never overwrite original valid pixels
result = np.where(mask, result, interp.astype(np.float32))
# Update mask with newly filled pixels (for the next pass)
newly_filled = (~mask) & (result > 0.0)
mask = mask | newly_filled
return result.astype(np.float32)
# ── Internal helpers ──────────────────────────────────────────────────────────
def _odd_kernel_size(base: int, scale: float) -> int:
"""Return the nearest odd integer to base * scale, minimum 3."""
raw = max(3, int(round(base * scale)))
return raw if raw % 2 == 1 else raw + 1

View File

@ -0,0 +1,128 @@
"""
depth_hole_fill_node.py D435i depth image hole filler (Issue #268).
Subscribes to the raw D435i depth stream, fills zero/NaN pixels using
multi-pass spatial-Gaussian bilateral interpolation, and republishes the
filled image at camera rate.
Subscribes (BEST_EFFORT):
/camera/depth/image_rect_raw sensor_msgs/Image float32 depth (m)
Publishes:
/camera/depth/filled sensor_msgs/Image float32 depth (m), holes filled
The filled image preserves all original valid pixels exactly and only
modifies pixels that had no return (0 or NaN). The output is suitable
for all downstream consumers that expect a dense depth map (VO, RTAB-Map,
collision avoidance, floor classifier).
Parameters
----------
input_topic str /camera/depth/image_rect_raw Input depth topic
output_topic str /camera/depth/filled Output depth topic
kernel_size int 5 Initial Gaussian kernel side length (pixels)
d_min float 0.1 Minimum valid depth (m)
d_max float 10.0 Maximum valid depth (m)
max_passes int 3 Fill passes (growing kernel per pass)
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from ._depth_hole_fill import fill_holes
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=4,
)
class DepthHoleFillNode(Node):
def __init__(self) -> None:
super().__init__('depth_hole_fill_node')
self.declare_parameter('input_topic', '/camera/depth/image_rect_raw')
self.declare_parameter('output_topic', '/camera/depth/filled')
self.declare_parameter('kernel_size', 5)
self.declare_parameter('d_min', 0.1)
self.declare_parameter('d_max', 10.0)
self.declare_parameter('max_passes', 3)
input_topic = self.get_parameter('input_topic').value
output_topic = self.get_parameter('output_topic').value
self._ks = int(self.get_parameter('kernel_size').value)
self._d_min = self.get_parameter('d_min').value
self._d_max = self.get_parameter('d_max').value
self._passes = int(self.get_parameter('max_passes').value)
self._bridge = CvBridge()
self._sub = self.create_subscription(
Image, input_topic, self._on_depth, _SENSOR_QOS)
self._pub = self.create_publisher(Image, output_topic, 10)
self.get_logger().info(
f'depth_hole_fill_node ready — '
f'{input_topic}{output_topic} '
f'kernel={self._ks} passes={self._passes} '
f'd=[{self._d_min},{self._d_max}]m'
)
# ── Callback ──────────────────────────────────────────────────────────────
def _on_depth(self, msg: Image) -> None:
try:
depth = self._bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')
except Exception as exc:
self.get_logger().error(
f'cv_bridge: {exc}', throttle_duration_sec=5.0)
return
depth = depth.astype(np.float32)
# Handle uint16 mm → float32 m conversion (D435i raw stream)
if depth.max() > 100.0:
depth /= 1000.0
filled = fill_holes(
depth,
kernel_size=self._ks,
d_min=self._d_min,
d_max=self._d_max,
max_passes=self._passes,
)
try:
out_msg = self._bridge.cv2_to_imgmsg(filled, encoding='32FC1')
except Exception as exc:
self.get_logger().error(
f'cv2_to_imgmsg: {exc}', throttle_duration_sec=5.0)
return
out_msg.header = msg.header
self._pub.publish(out_msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = DepthHoleFillNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -37,6 +37,8 @@ setup(
'floor_classifier = saltybot_bringup.floor_classifier_node:main', 'floor_classifier = saltybot_bringup.floor_classifier_node:main',
# Visual odometry drift detector (Issue #260) # Visual odometry drift detector (Issue #260)
'vo_drift_detector = saltybot_bringup.vo_drift_node:main', 'vo_drift_detector = saltybot_bringup.vo_drift_node:main',
# Depth image hole filler (Issue #268)
'depth_hole_fill = saltybot_bringup.depth_hole_fill_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,281 @@
"""
test_depth_hole_fill.py Unit tests for depth hole fill helpers (no ROS2 required).
Covers:
valid_mask:
- valid range returns True
- zero / below d_min returns False
- NaN returns False
- above d_max returns False
- mixed array has correct mask
_odd_kernel_size:
- result is always odd
- result >= 3
- scales correctly
fill_holes no-hole cases:
- fully valid image is returned unchanged
- output dtype is float32
- output shape matches input
fill_holes basic fills:
- single centre hole in uniform depth filled with correct depth
- single centre hole in uniform depth original valid pixels unchanged
- NaN pixel treated as hole and filled
- row of zeros within uniform depth filled
fill_holes fill quality:
- linear gradient: centre hole filled with interpolated value
- multi-pass fills larger holes than single pass
- all-zero image stays zero (no valid neighbours)
- border hole (edge pixel) is handled without crash
- depth range: pixel above d_max treated as hole
fill_holes valid pixel preservation:
- original valid pixels are never modified
- max_passes=1 still fills small holes
"""
import sys
import os
import math
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._depth_hole_fill import (
fill_holes,
valid_mask,
_odd_kernel_size,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _uniform(val=2.0, h=64, w=64) -> np.ndarray:
return np.full((h, w), val, dtype=np.float32)
def _poke_hole(arr, r, c) -> np.ndarray:
arr = arr.copy()
arr[r, c] = 0.0
return arr
def _poke_nan(arr, r, c) -> np.ndarray:
arr = arr.copy()
arr[r, c] = float('nan')
return arr
# ── valid_mask ────────────────────────────────────────────────────────────────
class TestValidMask:
def test_valid_pixel_is_true(self):
d = np.array([[1.0]], dtype=np.float32)
assert valid_mask(d, 0.1, 10.0)[0, 0]
def test_zero_is_false(self):
d = np.array([[0.0]], dtype=np.float32)
assert not valid_mask(d, 0.1, 10.0)[0, 0]
def test_below_dmin_is_false(self):
d = np.array([[0.05]], dtype=np.float32)
assert not valid_mask(d, 0.1, 10.0)[0, 0]
def test_nan_is_false(self):
d = np.array([[float('nan')]], dtype=np.float32)
assert not valid_mask(d, 0.1, 10.0)[0, 0]
def test_above_dmax_is_false(self):
d = np.array([[15.0]], dtype=np.float32)
assert not valid_mask(d, 0.1, 10.0)[0, 0]
def test_at_dmin_is_true(self):
d = np.array([[0.1]], dtype=np.float32)
assert valid_mask(d, 0.1, 10.0)[0, 0]
def test_at_dmax_is_true(self):
d = np.array([[10.0]], dtype=np.float32)
assert valid_mask(d, 0.1, 10.0)[0, 0]
def test_mixed_array(self):
d = np.array([[0.0, 1.0, float('nan'), 5.0, 11.0]], dtype=np.float32)
m = valid_mask(d, 0.1, 10.0)
np.testing.assert_array_equal(m, [[False, True, False, True, False]])
# ── _odd_kernel_size ──────────────────────────────────────────────────────────
class TestOddKernelSize:
@pytest.mark.parametrize('base,scale', [
(5, 1.0), (5, 2.5), (5, 6.0),
(3, 1.0), (7, 2.0), (9, 3.0),
(4, 1.0), # even base → must become odd
])
def test_result_is_odd(self, base, scale):
ks = _odd_kernel_size(base, scale)
assert ks % 2 == 1
@pytest.mark.parametrize('base,scale', [(3, 1.0), (1, 5.0), (2, 0.5)])
def test_result_at_least_3(self, base, scale):
assert _odd_kernel_size(base, scale) >= 3
def test_scale_1_returns_base_or_nearby_odd(self):
ks = _odd_kernel_size(5, 1.0)
assert ks == 5
def test_large_scale_gives_large_kernel(self):
ks = _odd_kernel_size(5, 6.0)
assert ks >= 25 # 5 * 6 = 30 → 31
# ── fill_holes — output contract ──────────────────────────────────────────────
class TestFillHolesOutputContract:
def test_output_dtype_float32(self):
out = fill_holes(_uniform(2.0))
assert out.dtype == np.float32
def test_output_shape_preserved(self):
img = _uniform(2.0, h=48, w=64)
out = fill_holes(img)
assert out.shape == img.shape
def test_fully_valid_image_unchanged(self):
img = _uniform(2.0)
out = fill_holes(img)
np.testing.assert_allclose(out, img, atol=1e-6)
def test_valid_pixels_never_modified(self):
"""Any pixel valid in the input must be identical in the output."""
img = _uniform(3.0, h=32, w=32)
img[16, 16] = 0.0 # one hole
mask_before = valid_mask(img)
out = fill_holes(img)
np.testing.assert_allclose(out[mask_before], img[mask_before], atol=1e-6)
# ── fill_holes — basic hole filling ──────────────────────────────────────────
class TestFillHolesBasic:
def test_centre_zero_filled_uniform(self):
"""Single zero pixel in uniform depth → filled with that depth."""
img = _poke_hole(_uniform(2.0, 32, 32), 16, 16)
out = fill_holes(img, kernel_size=5, max_passes=1)
assert out[16, 16] == pytest.approx(2.0, abs=0.05)
def test_centre_nan_filled_uniform(self):
"""Single NaN pixel in uniform depth → filled."""
img = _poke_nan(_uniform(2.0, 32, 32), 16, 16)
out = fill_holes(img, kernel_size=5, max_passes=1)
assert out[16, 16] == pytest.approx(2.0, abs=0.05)
def test_filled_value_is_positive(self):
img = _poke_hole(_uniform(1.5, 32, 32), 16, 16)
out = fill_holes(img)
assert out[16, 16] > 0.0
def test_row_of_holes_filled(self):
"""Entire middle row zeroed → should be filled from neighbours above/below."""
img = _uniform(3.0, 32, 32)
img[16, :] = 0.0
out = fill_holes(img, kernel_size=7, max_passes=1)
# All pixels in the row should be non-zero after filling
assert (out[16, :] > 0.0).all()
def test_all_zero_stays_zero(self):
"""Image with no valid pixels → stays zero (nothing to interpolate from)."""
img = np.zeros((32, 32), dtype=np.float32)
out = fill_holes(img, d_min=0.1)
assert (out == 0.0).all()
def test_border_hole_no_crash(self):
"""Holes at image corners must not raise exceptions."""
img = _uniform(2.0, 32, 32)
img[0, 0] = 0.0
img[0, -1] = 0.0
img[-1, 0] = 0.0
img[-1, -1] = 0.0
out = fill_holes(img) # must not raise
assert out.shape == img.shape
def test_border_holes_filled(self):
"""Corner holes should be filled from their neighbours."""
img = _uniform(2.0, 32, 32)
img[0, 0] = 0.0
out = fill_holes(img, kernel_size=5, max_passes=1)
assert out[0, 0] == pytest.approx(2.0, abs=0.1)
# ── fill_holes — fill quality ─────────────────────────────────────────────────
class TestFillHolesQuality:
def test_linear_gradient_centre_hole_interpolated(self):
"""
Depth linearly increasing from 1.0 (left) to 3.0 (right).
Centre hole should be filled near the midpoint (~2.0).
"""
h, w = 32, 32
img = np.tile(np.linspace(1.0, 3.0, w, dtype=np.float32), (h, 1))
cx = w // 2
img[:, cx] = 0.0
out = fill_holes(img, kernel_size=5, max_passes=1)
mid = out[h // 2, cx]
assert 1.5 <= mid <= 2.5, f'interpolated value {mid:.3f} not in [1.5, 2.5]'
def test_large_hole_filled_with_more_passes(self):
"""A 9×9 hole in uniform depth: single pass may not fully fill it,
but 3 passes should."""
img = _uniform(2.0, 64, 64)
# Create a 9×9 hole
img[28:37, 28:37] = 0.0
out1 = fill_holes(img, kernel_size=5, max_passes=1)
out3 = fill_holes(img, kernel_size=5, max_passes=3)
# More passes → fewer remaining holes
holes1 = (out1 == 0.0).sum()
holes3 = (out3 == 0.0).sum()
assert holes3 <= holes1, f'more passes should reduce holes: {holes3} vs {holes1}'
def test_3pass_fills_9x9_hole_completely(self):
img = _uniform(2.0, 64, 64)
img[28:37, 28:37] = 0.0
out = fill_holes(img, kernel_size=5, max_passes=3)
assert (out[28:37, 28:37] > 0.0).all()
def test_filled_depth_within_valid_range(self):
"""Filled pixels should have depth within [d_min, d_max]."""
img = _uniform(2.0, 32, 32)
img[10:15, 10:15] = 0.0
out = fill_holes(img, d_min=0.1, d_max=10.0, max_passes=3)
# Only check pixels that were actually filled
was_hole = (img == 0.0)
filled = out[was_hole]
positive = filled[filled > 0.0]
assert (positive >= 0.1).all()
assert (positive <= 10.0).all()
def test_above_dmax_treated_as_hole(self):
"""Pixels above d_max should be treated as holes and filled."""
img = _uniform(2.0, 32, 32)
img[16, 16] = 15.0 # out of range
out = fill_holes(img, d_max=10.0, max_passes=1)
assert out[16, 16] == pytest.approx(2.0, abs=0.1)
def test_max_passes_1_works(self):
img = _poke_hole(_uniform(2.0, 32, 32), 16, 16)
out = fill_holes(img, max_passes=1)
assert out.shape == img.shape
assert out[16, 16] > 0.0
if __name__ == '__main__':
pytest.main([__file__, '-v'])