Merge pull request 'feat(bringup): obstacle height filter with IMU pitch compensation (Issue #211)' (#219) from sl-perception/issue-211-height-filter into main

This commit is contained in:
sl-jetson 2026-03-02 11:51:37 -05:00
commit 3a34ec84e0
4 changed files with 499 additions and 0 deletions

View File

@ -0,0 +1,145 @@
"""
_scan_height_filter.py Pure-Python height filter for 2D LIDAR scans (no ROS2 deps).
Given pitch/roll angles (from IMU accelerometer) and the LIDAR's mount height
above the floor, marks scan rays whose computed world-frame height falls outside
[z_min, z_max] as invalid (range = inf).
Coordinate convention (ROS REP-103):
x = forward
y = left
z = up
pitch > 0 = nose down (front lower than rear)
roll > 0 = left side up (robot leaning right)
"""
from __future__ import annotations
import math
from typing import List, Tuple
def filter_scan_by_height(
ranges: List[float],
angle_min: float,
angle_increment: float,
lidar_height_m: float,
pitch_rad: float,
roll_rad: float,
z_min_m: float = 0.05,
z_max_m: float = 2.0,
range_min: float = 0.0,
range_max: float = float('inf'),
) -> List[float]:
"""
Filter a 2D LaserScan by estimated point height in the world frame.
For each ray at bearing angle θ and range r, the world-frame height is:
z = lidar_height + r·cos(θ)·sin(pitch) r·sin(θ)·sin(roll)
Rays whose estimated z falls outside [z_min_m, z_max_m] are set to inf
(Nav2 treats inf as "no obstacle" / max range).
Parameters
----------
ranges : raw scan ranges; inf / nan = no-return
angle_min : bearing of first ray (rad)
angle_increment : per-step bearing increment (rad)
lidar_height_m : height of the LIDAR above the floor (m)
pitch_rad : robot pitch angle (rad)
roll_rad : robot roll angle (rad)
z_min_m : minimum valid height (m); below ground plane filtered
z_max_m : maximum valid height (m); above ceiling / out-of-range filtered
range_min : minimum valid range from LaserScan message
range_max : maximum valid range from LaserScan message
Returns
-------
List[float] filtered ranges invalid points set to inf
"""
out: List[float] = []
sin_pitch = math.sin(pitch_rad)
sin_roll = math.sin(roll_rad)
for i, r in enumerate(ranges):
# Pass through existing invalid readings unchanged
if math.isinf(r) or math.isnan(r) or r <= range_min or r >= range_max:
out.append(r)
continue
angle = angle_min + i * angle_increment
x_laser = r * math.cos(angle) # forward component in laser frame
y_laser = r * math.sin(angle) # lateral component in laser frame
# Estimated world-frame height of this scan point
z_world = lidar_height_m + x_laser * sin_pitch - y_laser * sin_roll
out.append(float('inf') if z_world < z_min_m or z_world > z_max_m else r)
return out
def pitch_roll_from_accel(ax: float, ay: float, az: float) -> Tuple[float, float]:
"""
Estimate pitch and roll from raw accelerometer readings (static / low-dynamics).
Uses the gravity vector projected onto each tilt axis. The denominator
sqrt(ay²+az²) / sqrt(ax²+az²) is always non-negative, so the result is
independent of the sign convention for the Z axis (works for both
z-up and z-down IMU frames).
Parameters
----------
ax, ay, az : accelerometer components (m/) in body / IMU frame.
Convention-agnostic: works for both az +g and az g at rest.
Returns
-------
(pitch_rad, roll_rad)
pitch : rotation around Y axis; positive when nose is lower than tail
roll : rotation around X axis; positive when left side is higher
"""
pitch = math.atan2(-ax, math.sqrt(ay * ay + az * az))
roll = math.atan2( ay, math.sqrt(ax * ax + az * az))
return pitch, roll
class AttitudeEstimator:
"""
Single-pole low-pass filter applied to pitch and roll derived from
accelerometer readings. Smooths out vibration noise while remaining
responsive to genuine tilt changes.
Parameters
----------
alpha : float (0 < alpha < 1)
Smoothing factor. Higher = faster response, less filtering.
alpha = 1 exp(dt / τ) for desired time constant τ.
"""
def __init__(self, alpha: float = 0.1):
self._alpha = alpha
self._pitch = 0.0
self._roll = 0.0
self._initialised = False
def update(self, ax: float, ay: float, az: float) -> Tuple[float, float]:
"""Update filter with new accelerometer sample; return (pitch, roll)."""
pitch_raw, roll_raw = pitch_roll_from_accel(ax, ay, az)
if not self._initialised:
self._pitch = pitch_raw
self._roll = roll_raw
self._initialised = True
else:
self._pitch += self._alpha * (pitch_raw - self._pitch)
self._roll += self._alpha * (roll_raw - self._roll)
return self._pitch, self._roll
@property
def pitch(self) -> float:
return self._pitch
@property
def roll(self) -> float:
return self._roll

View File

@ -0,0 +1,131 @@
"""
scan_height_filter_node.py Ground-plane LIDAR filter with IMU pitch compensation.
Subscribes:
/scan sensor_msgs/LaserScan raw RPLIDAR A1M8 scan (~5.5 Hz)
/camera/imu sensor_msgs/Imu raw D435i IMU (~200 Hz)
Publishes:
/scan_filtered sensor_msgs/LaserScan ground-filtered scan for Nav2
Algorithm
---------
For each incoming LaserScan, the current robot pitch and roll are estimated
from the D435i accelerometer (low-pass filtered to suppress vibration).
Each ray at bearing θ and range r is assigned an estimated world-frame height:
z = lidar_height + r·cos(θ)·sin(pitch) r·sin(θ)·sin(roll)
Rays with z < z_min (ground plane) or z > z_max (ceiling/spurious) are set to
inf and are therefore treated by Nav2 as free space.
Parameters
----------
lidar_height_m float 0.10 LIDAR mount height above floor (m)
z_min_m float 0.05 Minimum valid obstacle height (m)
z_max_m float 2.00 Maximum valid obstacle height (m)
imu_alpha float 0.10 Accelerometer low-pass factor (01)
imu_topic str /camera/imu
scan_topic str /scan
filtered_topic str /scan_filtered
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import Imu, LaserScan
from ._scan_height_filter import filter_scan_by_height, AttitudeEstimator
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
class ScanHeightFilterNode(Node):
def __init__(self):
super().__init__('scan_height_filter')
self.declare_parameter('lidar_height_m', 0.10)
self.declare_parameter('z_min_m', 0.05)
self.declare_parameter('z_max_m', 2.00)
self.declare_parameter('imu_alpha', 0.10)
self.declare_parameter('imu_topic', '/camera/imu')
self.declare_parameter('scan_topic', '/scan')
self.declare_parameter('filtered_topic', '/scan_filtered')
self._lidar_h = self.get_parameter('lidar_height_m').value
self._z_min = self.get_parameter('z_min_m').value
self._z_max = self.get_parameter('z_max_m').value
alpha = self.get_parameter('imu_alpha').value
imu_topic = self.get_parameter('imu_topic').value
scan_topic = self.get_parameter('scan_topic').value
filtered_topic = self.get_parameter('filtered_topic').value
self._attitude = AttitudeEstimator(alpha=alpha)
self.create_subscription(Imu, imu_topic, self._on_imu, _SENSOR_QOS)
self.create_subscription(LaserScan, scan_topic, self._on_scan, _SENSOR_QOS)
self._pub = self.create_publisher(LaserScan, filtered_topic, 10)
self.get_logger().info(
f'scan_height_filter ready — '
f'lidar_h={self._lidar_h}m z=[{self._z_min},{self._z_max}]m '
f'imu_alpha={alpha}'
)
# ── Callbacks ─────────────────────────────────────────────────────────────
def _on_imu(self, msg: Imu) -> None:
a = msg.linear_acceleration
self._attitude.update(a.x, a.y, a.z)
def _on_scan(self, msg: LaserScan) -> None:
filtered_ranges = filter_scan_by_height(
ranges = list(msg.ranges),
angle_min = msg.angle_min,
angle_increment = msg.angle_increment,
lidar_height_m = self._lidar_h,
pitch_rad = self._attitude.pitch,
roll_rad = self._attitude.roll,
z_min_m = self._z_min,
z_max_m = self._z_max,
range_min = msg.range_min,
range_max = msg.range_max,
)
out = LaserScan()
out.header = msg.header
out.angle_min = msg.angle_min
out.angle_max = msg.angle_max
out.angle_increment = msg.angle_increment
out.time_increment = msg.time_increment
out.scan_time = msg.scan_time
out.range_min = msg.range_min
out.range_max = msg.range_max
out.ranges = filtered_ranges
out.intensities = list(msg.intensities) if msg.intensities else []
self._pub.publish(out)
def main(args=None):
rclpy.init(args=args)
node = ScanHeightFilterNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -30,6 +30,7 @@ setup(
'console_scripts': [
'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main',
'camera_health_monitor = saltybot_bringup.camera_health_node:main',
'scan_height_filter = saltybot_bringup.scan_height_filter_node:main',
],
},
)

View File

@ -0,0 +1,222 @@
"""
test_scan_height_filter.py Unit tests for scan height filter helpers (no ROS2 required).
Covers:
- pitch_roll_from_accel: accelerometer pitch/roll angles
- filter_scan_by_height: height-based ray filtering
- AttitudeEstimator: low-pass pitch/roll smoother
"""
import sys
import os
import math
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._scan_height_filter import (
pitch_roll_from_accel,
filter_scan_by_height,
AttitudeEstimator,
)
_G = 9.81 # m/s²
# ── pitch_roll_from_accel ─────────────────────────────────────────────────────
class TestPitchRollFromAccel:
def test_level_robot_zero_pitch_roll(self):
# Both z-up (az=-g) and z-down (az=+g) conventions should give 0,0
for az in (-_G, +_G):
pitch, roll = pitch_roll_from_accel(0.0, 0.0, az)
assert pitch == pytest.approx(0.0, abs=1e-6)
assert roll == pytest.approx(0.0, abs=1e-6)
def test_nose_down_positive_pitch(self):
# Nose down 10°: ax component of gravity = -g·sin(10°)
angle = math.radians(10.0)
ax = -_G * math.sin(angle)
az = _G * math.cos(angle) # z-down convention
pitch, _ = pitch_roll_from_accel(ax, 0.0, az)
assert pitch == pytest.approx(angle, abs=1e-4)
def test_roll_right_positive_roll(self):
# 15° roll (left side up): ay component of gravity = g·sin(15°)
angle = math.radians(15.0)
ay = _G * math.sin(angle)
az = _G * math.cos(angle) # z-down convention
_, roll = pitch_roll_from_accel(0.0, ay, az)
assert roll == pytest.approx(angle, abs=1e-4)
def test_roll_sign_convention(self):
# Negative ay (right side up) → negative roll
_, roll = pitch_roll_from_accel(0.0, -1.0, _G)
assert roll < 0.0
# ── filter_scan_by_height ─────────────────────────────────────────────────────
class TestFilterScanByHeight:
def _flat_scan(self, n: int = 360, r: float = 1.0):
"""Uniform range scan, 360°."""
angle_min = -math.pi
angle_inc = 2 * math.pi / n
return [r] * n, angle_min, angle_inc
def test_level_robot_no_filtering(self):
"""Flat ground, no pitch/roll: all points at lidar_height → kept."""
ranges, amin, ainc = self._flat_scan(r=2.0)
out = filter_scan_by_height(
ranges, amin, ainc,
lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0,
z_min_m=0.05, z_max_m=3.0,
)
assert all(o == 2.0 for o in out)
def test_forward_ray_filtered_when_pitched_down(self):
"""
Robot pitched nose-down 45°: forward ray (θ=0) at r=0.10m hits ground.
z = 0.10 + 0.10*cos(0)*sin(45°) = 0.10 + 0.0707 = 0.1707 -- NOT filtered
Make it shorter so it DOES hit ground:
r=0.05: z = 0.10 + 0.05*sin(45°) = 0.10 + 0.0354 = 0.135 -- still not
Actually: ground filter z < 0.05. Let's use pitch = -45° (nose up) so
forward ray dips below floor:
pitch = -45°: z = 0.10 + r*cos(0)*sin(-45°) = 0.10 - r*0.707
For r=0.15: z = 0.10 - 0.106 = -0.006 < 0.05 filtered
"""
pitch = -math.radians(45.0) # nose UP → forward ray height decreases
ranges = [float('inf')] * 360
# θ=0 ray: angle_min = -π, index 180 ≈ θ=0
ranges[180] = 0.15
angle_inc = 2 * math.pi / 360
out = filter_scan_by_height(
ranges, -math.pi, angle_inc,
lidar_height_m=0.10, pitch_rad=pitch, roll_rad=0.0,
z_min_m=0.05, z_max_m=3.0,
)
assert math.isinf(out[180])
def test_rear_ray_kept_when_pitched_down(self):
"""
With pitch=-45° (nose up), rear ray (θ=π) has z = lidar_h + r*cos(π)*sin(pitch)
= 0.10 + 0.15*(-1)*sin(-45°) = 0.10 + 0.106 = 0.206 kept
"""
pitch = -math.radians(45.0)
ranges = [float('inf')] * 360
ranges[0] = 0.15 # θ ≈ -π (rear)
angle_inc = 2 * math.pi / 360
out = filter_scan_by_height(
ranges, -math.pi, angle_inc,
lidar_height_m=0.10, pitch_rad=pitch, roll_rad=0.0,
z_min_m=0.05, z_max_m=3.0,
)
assert not math.isinf(out[0])
def test_inf_ranges_passed_through(self):
ranges = [float('inf')] * 10
out = filter_scan_by_height(
ranges, 0.0, 0.1,
lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0,
)
assert all(math.isinf(o) for o in out)
def test_nan_ranges_passed_through(self):
ranges = [float('nan')]
out = filter_scan_by_height(
ranges, 0.0, 0.1,
lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0,
)
assert math.isnan(out[0])
def test_out_of_range_bounds_passed_through(self):
"""Points outside [range_min, range_max] are passed through unchanged."""
ranges = [0.01, 15.0] # 0.01 < range_min=0.15; 15.0 > range_max=12.0
out = filter_scan_by_height(
ranges, 0.0, 0.1,
lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0,
range_min=0.15, range_max=12.0,
)
assert out[0] == pytest.approx(0.01)
assert out[1] == pytest.approx(15.0)
def test_z_max_filters_high_points(self):
"""A long-range forward ray on a steep upward pitch exceeds z_max → filtered."""
pitch = math.radians(80.0) # strongly nose-down (forward goes up)
ranges = [float('inf')] * 360
ranges[180] = 5.0 # θ=0, long range
angle_inc = 2 * math.pi / 360
# z = 0.10 + 5.0 * sin(80°) = 0.10 + 4.92 = 5.02 > z_max=2.0 → filtered
out = filter_scan_by_height(
ranges, -math.pi, angle_inc,
lidar_height_m=0.10, pitch_rad=pitch, roll_rad=0.0,
z_min_m=0.05, z_max_m=2.0,
)
assert math.isinf(out[180])
def test_output_length_equals_input(self):
ranges = [1.0] * 100
out = filter_scan_by_height(
ranges, -math.pi, 2 * math.pi / 100,
lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0,
)
assert len(out) == 100
def test_zero_pitch_roll_preserves_valid_points(self):
"""With no tilt, all finite ranges at lidar_height (0.10m) are kept."""
ranges = [1.0, 2.0, 3.0]
out = filter_scan_by_height(
ranges, 0.0, 0.5,
lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0,
z_min_m=0.05, z_max_m=3.0,
range_min=0.0, range_max=12.0,
)
assert out == ranges
# ── AttitudeEstimator ─────────────────────────────────────────────────────────
class TestAttitudeEstimator:
def test_first_update_initialises_exactly(self):
est = AttitudeEstimator(alpha=0.5)
pitch, roll = est.update(0.0, 0.0, _G)
assert pitch == pytest.approx(0.0, abs=1e-6)
assert roll == pytest.approx(0.0, abs=1e-6)
def test_converges_to_steady_tilt(self):
est = AttitudeEstimator(alpha=0.5)
angle = math.radians(10.0)
ax = -_G * math.sin(angle)
az = _G * math.cos(angle)
# Run many updates to converge
for _ in range(50):
est.update(ax, 0.0, az)
assert est.pitch == pytest.approx(angle, abs=1e-3)
def test_properties_accessible_after_update(self):
est = AttitudeEstimator()
est.update(0.0, 0.0, -_G)
assert isinstance(est.pitch, float)
assert isinstance(est.roll, float)
def test_initial_pitch_roll_zero_before_update(self):
est = AttitudeEstimator()
assert est.pitch == pytest.approx(0.0)
assert est.roll == pytest.approx(0.0)
def test_alpha_one_tracks_instantly(self):
est = AttitudeEstimator(alpha=1.0)
angle = math.radians(20.0)
ax = -_G * math.sin(angle)
az = _G * math.cos(angle)
est.update(0.0, 0.0, _G) # initialise level
est.update(ax, 0.0, az) # step to tilted angle
assert est.pitch == pytest.approx(angle, abs=1e-3)
if __name__ == '__main__':
pytest.main([__file__, '-v'])