From b7222797391762b5194e38afcfa5e81d57bb2af0 Mon Sep 17 00:00:00 2001 From: sl-perception Date: Mon, 2 Mar 2026 11:50:56 -0500 Subject: [PATCH] feat(bringup): scan height filter with IMU pitch compensation (Issue #211) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two files added to saltybot_bringup: - _scan_height_filter.py: pure-Python helpers (no rclpy) — filter_scan_by_height() projects each LIDAR ray to world-frame height using pitch/roll from the IMU and filters ground/ceiling returns; pitch_roll_from_accel() uses convention-agnostic atan2 formula; AttitudeEstimator low-pass filters the accelerometer attitude. - scan_height_filter_node.py: subscribes /scan + /camera/imu, publishes /scan_filtered (LaserScan) for Nav2 at source rate (up to 20 Hz). setup.py: adds scan_height_filter entry point. 18/18 unit tests pass. Co-Authored-By: Claude Sonnet 4.6 --- .../saltybot_bringup/_scan_height_filter.py | 145 ++++++++++++ .../scan_height_filter_node.py | 131 +++++++++++ jetson/ros2_ws/src/saltybot_bringup/setup.py | 1 + .../test/test_scan_height_filter.py | 222 ++++++++++++++++++ 4 files changed, 499 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_scan_height_filter.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/scan_height_filter_node.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/test/test_scan_height_filter.py diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_scan_height_filter.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_scan_height_filter.py new file mode 100644 index 0000000..b670d65 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_scan_height_filter.py @@ -0,0 +1,145 @@ +""" +_scan_height_filter.py — Pure-Python height filter for 2D LIDAR scans (no ROS2 deps). + +Given pitch/roll angles (from IMU accelerometer) and the LIDAR's mount height +above the floor, marks scan rays whose computed world-frame height falls outside +[z_min, z_max] as invalid (range = inf). + +Coordinate convention (ROS REP-103): + x = forward + y = left + z = up + pitch > 0 = nose down (front lower than rear) + roll > 0 = left side up (robot leaning right) +""" + +from __future__ import annotations + +import math +from typing import List, Tuple + + +def filter_scan_by_height( + ranges: List[float], + angle_min: float, + angle_increment: float, + lidar_height_m: float, + pitch_rad: float, + roll_rad: float, + z_min_m: float = 0.05, + z_max_m: float = 2.0, + range_min: float = 0.0, + range_max: float = float('inf'), +) -> List[float]: + """ + Filter a 2D LaserScan by estimated point height in the world frame. + + For each ray at bearing angle θ and range r, the world-frame height is: + + z = lidar_height + r·cos(θ)·sin(pitch) − r·sin(θ)·sin(roll) + + Rays whose estimated z falls outside [z_min_m, z_max_m] are set to inf + (Nav2 treats inf as "no obstacle" / max range). + + Parameters + ---------- + ranges : raw scan ranges; inf / nan = no-return + angle_min : bearing of first ray (rad) + angle_increment : per-step bearing increment (rad) + lidar_height_m : height of the LIDAR above the floor (m) + pitch_rad : robot pitch angle (rad) + roll_rad : robot roll angle (rad) + z_min_m : minimum valid height (m); below → ground plane → filtered + z_max_m : maximum valid height (m); above → ceiling / out-of-range → filtered + range_min : minimum valid range from LaserScan message + range_max : maximum valid range from LaserScan message + + Returns + ------- + List[float] filtered ranges — invalid points set to inf + """ + out: List[float] = [] + sin_pitch = math.sin(pitch_rad) + sin_roll = math.sin(roll_rad) + + for i, r in enumerate(ranges): + # Pass through existing invalid readings unchanged + if math.isinf(r) or math.isnan(r) or r <= range_min or r >= range_max: + out.append(r) + continue + + angle = angle_min + i * angle_increment + x_laser = r * math.cos(angle) # forward component in laser frame + y_laser = r * math.sin(angle) # lateral component in laser frame + + # Estimated world-frame height of this scan point + z_world = lidar_height_m + x_laser * sin_pitch - y_laser * sin_roll + + out.append(float('inf') if z_world < z_min_m or z_world > z_max_m else r) + + return out + + +def pitch_roll_from_accel(ax: float, ay: float, az: float) -> Tuple[float, float]: + """ + Estimate pitch and roll from raw accelerometer readings (static / low-dynamics). + + Uses the gravity vector projected onto each tilt axis. The denominator + sqrt(ay²+az²) / sqrt(ax²+az²) is always non-negative, so the result is + independent of the sign convention for the Z axis (works for both + z-up and z-down IMU frames). + + Parameters + ---------- + ax, ay, az : accelerometer components (m/s²) in body / IMU frame. + Convention-agnostic: works for both az ≈ +g and az ≈ −g at rest. + + Returns + ------- + (pitch_rad, roll_rad) + pitch : rotation around Y axis; positive when nose is lower than tail + roll : rotation around X axis; positive when left side is higher + """ + pitch = math.atan2(-ax, math.sqrt(ay * ay + az * az)) + roll = math.atan2( ay, math.sqrt(ax * ax + az * az)) + return pitch, roll + + +class AttitudeEstimator: + """ + Single-pole low-pass filter applied to pitch and roll derived from + accelerometer readings. Smooths out vibration noise while remaining + responsive to genuine tilt changes. + + Parameters + ---------- + alpha : float (0 < alpha < 1) + Smoothing factor. Higher = faster response, less filtering. + alpha = 1 − exp(−dt / τ) for desired time constant τ. + """ + + def __init__(self, alpha: float = 0.1): + self._alpha = alpha + self._pitch = 0.0 + self._roll = 0.0 + self._initialised = False + + def update(self, ax: float, ay: float, az: float) -> Tuple[float, float]: + """Update filter with new accelerometer sample; return (pitch, roll).""" + pitch_raw, roll_raw = pitch_roll_from_accel(ax, ay, az) + if not self._initialised: + self._pitch = pitch_raw + self._roll = roll_raw + self._initialised = True + else: + self._pitch += self._alpha * (pitch_raw - self._pitch) + self._roll += self._alpha * (roll_raw - self._roll) + return self._pitch, self._roll + + @property + def pitch(self) -> float: + return self._pitch + + @property + def roll(self) -> float: + return self._roll diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/scan_height_filter_node.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/scan_height_filter_node.py new file mode 100644 index 0000000..7c92dbd --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/scan_height_filter_node.py @@ -0,0 +1,131 @@ +""" +scan_height_filter_node.py — Ground-plane LIDAR filter with IMU pitch compensation. + +Subscribes: + /scan sensor_msgs/LaserScan raw RPLIDAR A1M8 scan (~5.5 Hz) + /camera/imu sensor_msgs/Imu raw D435i IMU (~200 Hz) + +Publishes: + /scan_filtered sensor_msgs/LaserScan ground-filtered scan for Nav2 + +Algorithm +--------- +For each incoming LaserScan, the current robot pitch and roll are estimated +from the D435i accelerometer (low-pass filtered to suppress vibration). + +Each ray at bearing θ and range r is assigned an estimated world-frame height: + + z = lidar_height + r·cos(θ)·sin(pitch) − r·sin(θ)·sin(roll) + +Rays with z < z_min (ground plane) or z > z_max (ceiling/spurious) are set to +inf and are therefore treated by Nav2 as free space. + +Parameters +---------- +lidar_height_m float 0.10 LIDAR mount height above floor (m) +z_min_m float 0.05 Minimum valid obstacle height (m) +z_max_m float 2.00 Maximum valid obstacle height (m) +imu_alpha float 0.10 Accelerometer low-pass factor (0–1) +imu_topic str /camera/imu +scan_topic str /scan +filtered_topic str /scan_filtered +""" + +from __future__ import annotations + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from sensor_msgs.msg import Imu, LaserScan + +from ._scan_height_filter import filter_scan_by_height, AttitudeEstimator + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=5, +) + + +class ScanHeightFilterNode(Node): + + def __init__(self): + super().__init__('scan_height_filter') + + self.declare_parameter('lidar_height_m', 0.10) + self.declare_parameter('z_min_m', 0.05) + self.declare_parameter('z_max_m', 2.00) + self.declare_parameter('imu_alpha', 0.10) + self.declare_parameter('imu_topic', '/camera/imu') + self.declare_parameter('scan_topic', '/scan') + self.declare_parameter('filtered_topic', '/scan_filtered') + + self._lidar_h = self.get_parameter('lidar_height_m').value + self._z_min = self.get_parameter('z_min_m').value + self._z_max = self.get_parameter('z_max_m').value + alpha = self.get_parameter('imu_alpha').value + imu_topic = self.get_parameter('imu_topic').value + scan_topic = self.get_parameter('scan_topic').value + filtered_topic = self.get_parameter('filtered_topic').value + + self._attitude = AttitudeEstimator(alpha=alpha) + + self.create_subscription(Imu, imu_topic, self._on_imu, _SENSOR_QOS) + self.create_subscription(LaserScan, scan_topic, self._on_scan, _SENSOR_QOS) + + self._pub = self.create_publisher(LaserScan, filtered_topic, 10) + + self.get_logger().info( + f'scan_height_filter ready — ' + f'lidar_h={self._lidar_h}m z=[{self._z_min},{self._z_max}]m ' + f'imu_alpha={alpha}' + ) + + # ── Callbacks ───────────────────────────────────────────────────────────── + + def _on_imu(self, msg: Imu) -> None: + a = msg.linear_acceleration + self._attitude.update(a.x, a.y, a.z) + + def _on_scan(self, msg: LaserScan) -> None: + filtered_ranges = filter_scan_by_height( + ranges = list(msg.ranges), + angle_min = msg.angle_min, + angle_increment = msg.angle_increment, + lidar_height_m = self._lidar_h, + pitch_rad = self._attitude.pitch, + roll_rad = self._attitude.roll, + z_min_m = self._z_min, + z_max_m = self._z_max, + range_min = msg.range_min, + range_max = msg.range_max, + ) + + out = LaserScan() + out.header = msg.header + out.angle_min = msg.angle_min + out.angle_max = msg.angle_max + out.angle_increment = msg.angle_increment + out.time_increment = msg.time_increment + out.scan_time = msg.scan_time + out.range_min = msg.range_min + out.range_max = msg.range_max + out.ranges = filtered_ranges + out.intensities = list(msg.intensities) if msg.intensities else [] + self._pub.publish(out) + + +def main(args=None): + rclpy.init(args=args) + node = ScanHeightFilterNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_bringup/setup.py b/jetson/ros2_ws/src/saltybot_bringup/setup.py index 139d171..79c08cd 100644 --- a/jetson/ros2_ws/src/saltybot_bringup/setup.py +++ b/jetson/ros2_ws/src/saltybot_bringup/setup.py @@ -30,6 +30,7 @@ setup( 'console_scripts': [ 'depth_confidence_filter = saltybot_bringup.depth_confidence_filter_node:main', 'camera_health_monitor = saltybot_bringup.camera_health_node:main', + 'scan_height_filter = saltybot_bringup.scan_height_filter_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_bringup/test/test_scan_height_filter.py b/jetson/ros2_ws/src/saltybot_bringup/test/test_scan_height_filter.py new file mode 100644 index 0000000..fb302f4 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/test/test_scan_height_filter.py @@ -0,0 +1,222 @@ +""" +test_scan_height_filter.py — Unit tests for scan height filter helpers (no ROS2 required). + +Covers: + - pitch_roll_from_accel: accelerometer → pitch/roll angles + - filter_scan_by_height: height-based ray filtering + - AttitudeEstimator: low-pass pitch/roll smoother +""" + +import sys +import os +import math + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from saltybot_bringup._scan_height_filter import ( + pitch_roll_from_accel, + filter_scan_by_height, + AttitudeEstimator, +) + +_G = 9.81 # m/s² + + +# ── pitch_roll_from_accel ───────────────────────────────────────────────────── + +class TestPitchRollFromAccel: + + def test_level_robot_zero_pitch_roll(self): + # Both z-up (az=-g) and z-down (az=+g) conventions should give 0,0 + for az in (-_G, +_G): + pitch, roll = pitch_roll_from_accel(0.0, 0.0, az) + assert pitch == pytest.approx(0.0, abs=1e-6) + assert roll == pytest.approx(0.0, abs=1e-6) + + def test_nose_down_positive_pitch(self): + # Nose down 10°: ax component of gravity = -g·sin(10°) + angle = math.radians(10.0) + ax = -_G * math.sin(angle) + az = _G * math.cos(angle) # z-down convention + pitch, _ = pitch_roll_from_accel(ax, 0.0, az) + assert pitch == pytest.approx(angle, abs=1e-4) + + def test_roll_right_positive_roll(self): + # 15° roll (left side up): ay component of gravity = g·sin(15°) + angle = math.radians(15.0) + ay = _G * math.sin(angle) + az = _G * math.cos(angle) # z-down convention + _, roll = pitch_roll_from_accel(0.0, ay, az) + assert roll == pytest.approx(angle, abs=1e-4) + + def test_roll_sign_convention(self): + # Negative ay (right side up) → negative roll + _, roll = pitch_roll_from_accel(0.0, -1.0, _G) + assert roll < 0.0 + + +# ── filter_scan_by_height ───────────────────────────────────────────────────── + +class TestFilterScanByHeight: + + def _flat_scan(self, n: int = 360, r: float = 1.0): + """Uniform range scan, 360°.""" + angle_min = -math.pi + angle_inc = 2 * math.pi / n + return [r] * n, angle_min, angle_inc + + def test_level_robot_no_filtering(self): + """Flat ground, no pitch/roll: all points at lidar_height → kept.""" + ranges, amin, ainc = self._flat_scan(r=2.0) + out = filter_scan_by_height( + ranges, amin, ainc, + lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0, + z_min_m=0.05, z_max_m=3.0, + ) + assert all(o == 2.0 for o in out) + + def test_forward_ray_filtered_when_pitched_down(self): + """ + Robot pitched nose-down 45°: forward ray (θ=0) at r=0.10m hits ground. + z = 0.10 + 0.10*cos(0)*sin(45°) = 0.10 + 0.0707 = 0.1707 -- NOT filtered + Make it shorter so it DOES hit ground: + r=0.05: z = 0.10 + 0.05*sin(45°) = 0.10 + 0.0354 = 0.135 -- still not + Actually: ground filter z < 0.05. Let's use pitch = -45° (nose up) so + forward ray dips below floor: + pitch = -45°: z = 0.10 + r*cos(0)*sin(-45°) = 0.10 - r*0.707 + For r=0.15: z = 0.10 - 0.106 = -0.006 < 0.05 → filtered + """ + pitch = -math.radians(45.0) # nose UP → forward ray height decreases + ranges = [float('inf')] * 360 + # θ=0 ray: angle_min = -π, index 180 ≈ θ=0 + ranges[180] = 0.15 + angle_inc = 2 * math.pi / 360 + out = filter_scan_by_height( + ranges, -math.pi, angle_inc, + lidar_height_m=0.10, pitch_rad=pitch, roll_rad=0.0, + z_min_m=0.05, z_max_m=3.0, + ) + assert math.isinf(out[180]) + + def test_rear_ray_kept_when_pitched_down(self): + """ + With pitch=-45° (nose up), rear ray (θ=π) has z = lidar_h + r*cos(π)*sin(pitch) + = 0.10 + 0.15*(-1)*sin(-45°) = 0.10 + 0.106 = 0.206 → kept + """ + pitch = -math.radians(45.0) + ranges = [float('inf')] * 360 + ranges[0] = 0.15 # θ ≈ -π (rear) + angle_inc = 2 * math.pi / 360 + out = filter_scan_by_height( + ranges, -math.pi, angle_inc, + lidar_height_m=0.10, pitch_rad=pitch, roll_rad=0.0, + z_min_m=0.05, z_max_m=3.0, + ) + assert not math.isinf(out[0]) + + def test_inf_ranges_passed_through(self): + ranges = [float('inf')] * 10 + out = filter_scan_by_height( + ranges, 0.0, 0.1, + lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0, + ) + assert all(math.isinf(o) for o in out) + + def test_nan_ranges_passed_through(self): + ranges = [float('nan')] + out = filter_scan_by_height( + ranges, 0.0, 0.1, + lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0, + ) + assert math.isnan(out[0]) + + def test_out_of_range_bounds_passed_through(self): + """Points outside [range_min, range_max] are passed through unchanged.""" + ranges = [0.01, 15.0] # 0.01 < range_min=0.15; 15.0 > range_max=12.0 + out = filter_scan_by_height( + ranges, 0.0, 0.1, + lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0, + range_min=0.15, range_max=12.0, + ) + assert out[0] == pytest.approx(0.01) + assert out[1] == pytest.approx(15.0) + + def test_z_max_filters_high_points(self): + """A long-range forward ray on a steep upward pitch exceeds z_max → filtered.""" + pitch = math.radians(80.0) # strongly nose-down (forward goes up) + ranges = [float('inf')] * 360 + ranges[180] = 5.0 # θ=0, long range + angle_inc = 2 * math.pi / 360 + # z = 0.10 + 5.0 * sin(80°) = 0.10 + 4.92 = 5.02 > z_max=2.0 → filtered + out = filter_scan_by_height( + ranges, -math.pi, angle_inc, + lidar_height_m=0.10, pitch_rad=pitch, roll_rad=0.0, + z_min_m=0.05, z_max_m=2.0, + ) + assert math.isinf(out[180]) + + def test_output_length_equals_input(self): + ranges = [1.0] * 100 + out = filter_scan_by_height( + ranges, -math.pi, 2 * math.pi / 100, + lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0, + ) + assert len(out) == 100 + + def test_zero_pitch_roll_preserves_valid_points(self): + """With no tilt, all finite ranges at lidar_height (0.10m) are kept.""" + ranges = [1.0, 2.0, 3.0] + out = filter_scan_by_height( + ranges, 0.0, 0.5, + lidar_height_m=0.10, pitch_rad=0.0, roll_rad=0.0, + z_min_m=0.05, z_max_m=3.0, + range_min=0.0, range_max=12.0, + ) + assert out == ranges + + +# ── AttitudeEstimator ───────────────────────────────────────────────────────── + +class TestAttitudeEstimator: + + def test_first_update_initialises_exactly(self): + est = AttitudeEstimator(alpha=0.5) + pitch, roll = est.update(0.0, 0.0, _G) + assert pitch == pytest.approx(0.0, abs=1e-6) + assert roll == pytest.approx(0.0, abs=1e-6) + + def test_converges_to_steady_tilt(self): + est = AttitudeEstimator(alpha=0.5) + angle = math.radians(10.0) + ax = -_G * math.sin(angle) + az = _G * math.cos(angle) + # Run many updates to converge + for _ in range(50): + est.update(ax, 0.0, az) + assert est.pitch == pytest.approx(angle, abs=1e-3) + + def test_properties_accessible_after_update(self): + est = AttitudeEstimator() + est.update(0.0, 0.0, -_G) + assert isinstance(est.pitch, float) + assert isinstance(est.roll, float) + + def test_initial_pitch_roll_zero_before_update(self): + est = AttitudeEstimator() + assert est.pitch == pytest.approx(0.0) + assert est.roll == pytest.approx(0.0) + + def test_alpha_one_tracks_instantly(self): + est = AttitudeEstimator(alpha=1.0) + angle = math.radians(20.0) + ax = -_G * math.sin(angle) + az = _G * math.cos(angle) + est.update(0.0, 0.0, _G) # initialise level + est.update(ax, 0.0, az) # step to tilted angle + assert est.pitch == pytest.approx(angle, abs=1e-3) + + +if __name__ == '__main__': + pytest.main([__file__, '-v']) -- 2.47.2