diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_sky_detector.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_sky_detector.py new file mode 100644 index 0000000..58f5d7c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_sky_detector.py @@ -0,0 +1,122 @@ +""" +_sky_detector.py — Sky detection via HSV thresholding + horizon estimation (no ROS2 deps). + +Algorithm +--------- +Sky pixels are identified by two HSV bands (OpenCV convention: H∈[0,180], S/V∈[0,255]): + + 1. Blue sky — H∈[90,130], S∈[40,255], V∈[80,255] + (captures clear blue sky from cyan-blue through sky-blue to blue-violet) + + 2. Overcast — S∈[0,50], V∈[185,255] + (very bright, near-zero saturation: white/grey sky on cloudy days) + +The two masks are OR-combined into a single sky mask. + +Sky fraction +------------ +Computed over the top *scan_frac* of the image (default 60 %). This concentrates +sensitivity on the region where sky is expected to appear and avoids penalising ground +reflections or obstacles in the lower frame. + + sky_fraction = sky_pixels_in_top_scan_frac / pixels_in_top_scan_frac + +Horizon line +------------ +For each image row, compute the row-level sky fraction (sky pixels / row width). +The horizon is the **bottommost row** where that fraction ≥ *row_threshold* (default 0.30). +This represents the lower boundary of continuous sky content. + + horizon_y = max { r : row_sky_frac[r] ≥ row_threshold } or -1 if no sky found. + +Returns -1 when no sky is detected at all (indoors, underground, etc.). + +Public API +---------- + SkyResult(sky_fraction, horizon_y, sky_mask) + detect_sky(bgr, scan_frac=0.60, row_threshold=0.30) -> SkyResult +""" + +from __future__ import annotations + +from typing import NamedTuple + +import numpy as np + + +# ── HSV sky bands ───────────────────────────────────────────────────────────── + +# Blue sky (OpenCV H ∈ [0, 180]) +_BLUE_SKY_LO = np.array([90, 40, 80], dtype=np.uint8) +_BLUE_SKY_HI = np.array([130, 255, 255], dtype=np.uint8) + +# White / grey overcast sky (any hue, very bright, very desaturated) +_GREY_SKY_LO = np.array([0, 0, 185], dtype=np.uint8) +_GREY_SKY_HI = np.array([180, 50, 255], dtype=np.uint8) + + +# ── Data type ───────────────────────────────────────────────────────────────── + +class SkyResult(NamedTuple): + """Sky detection result for a single frame.""" + sky_fraction: float # fraction of top scan_frac region classified as sky [0, 1] + horizon_y: int # pixel row of horizon (-1 = no sky detected) + sky_mask: np.ndarray # (H, W) uint8 binary mask (255 = sky pixel) + + +# ── Public API ──────────────────────────────────────────────────────────────── + +def detect_sky( + bgr: np.ndarray, + scan_frac: float = 0.60, + row_threshold: float = 0.30, +) -> SkyResult: + """ + Detect sky pixels and estimate the horizon row. + + Parameters + ---------- + bgr : (H, W, 3) uint8 BGR ndarray + scan_frac : fraction of image height to use for sky_fraction computation + (top of frame, where sky is expected) + row_threshold : minimum per-row sky fraction to count a row as sky for + horizon estimation + + Returns + ------- + SkyResult(sky_fraction, horizon_y, sky_mask) + """ + import cv2 + + bgr = np.asarray(bgr, dtype=np.uint8) + h, w = bgr.shape[:2] + + if h == 0 or w == 0: + empty = np.zeros((h, w), dtype=np.uint8) + return SkyResult(sky_fraction=0.0, horizon_y=-1, sky_mask=empty) + + hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV) + + # Build combined sky mask (blue OR overcast) + mask_blue = cv2.inRange(hsv, _BLUE_SKY_LO, _BLUE_SKY_HI) + mask_grey = cv2.inRange(hsv, _GREY_SKY_LO, _GREY_SKY_HI) + sky_mask = cv2.bitwise_or(mask_blue, mask_grey) # (H, W) uint8, 255 = sky + + # ── sky_fraction: top scan_frac rows ────────────────────────────────────── + scan_rows = max(1, int(h * min(float(scan_frac), 1.0))) + top_mask = sky_mask[:scan_rows, :] + sky_fraction = float(np.count_nonzero(top_mask)) / float(scan_rows * w) + + # ── horizon_y: bottommost row with ≥ row_threshold sky pixels ───────────── + # row_fracs[r] = fraction of pixels in row r that are sky + row_sky_counts = np.count_nonzero(sky_mask, axis=1).astype(np.float32) # (H,) + row_fracs = row_sky_counts / float(w) + sky_rows = np.where(row_fracs >= row_threshold)[0] + + horizon_y = int(sky_rows.max()) if len(sky_rows) > 0 else -1 + + return SkyResult( + sky_fraction=sky_fraction, + horizon_y=horizon_y, + sky_mask=sky_mask, + ) diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/sky_detect_node.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/sky_detect_node.py new file mode 100644 index 0000000..4638e77 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/sky_detect_node.py @@ -0,0 +1,106 @@ +""" +sky_detect_node.py — D435i sky detector for outdoor navigation (Issue #307). + +Classifies the top portion of the D435i colour image as sky vs non-sky using +HSV blue/grey thresholding, then estimates the horizon line. + +Useful for: + - Outdoor/indoor scene detection (sky_fraction > 0.1 → likely outdoor) + - Camera tilt correction (horizon_y deviation from expected row → tilt estimate) + - Disabling outdoor-only nodes (colour correction, sun glare filter) indoors + +Subscribes (BEST_EFFORT): + /camera/color/image_raw sensor_msgs/Image BGR8 + +Publishes: + /saltybot/sky_fraction std_msgs/Float32 sky fraction in [0, 1] (per frame) + /saltybot/horizon_y std_msgs/Int32 horizon pixel row; -1 = no sky + +Parameters +---------- +scan_frac float 0.60 Top fraction of image analysed for sky_fraction +row_threshold float 0.30 Per-row sky fraction required to count a row as sky +""" + +from __future__ import annotations + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from cv_bridge import CvBridge + +from sensor_msgs.msg import Image +from std_msgs.msg import Float32, Int32 + +from ._sky_detector import detect_sky + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=4, +) + + +class SkyDetectNode(Node): + + def __init__(self) -> None: + super().__init__('sky_detect_node') + + self.declare_parameter('scan_frac', 0.60) + self.declare_parameter('row_threshold', 0.30) + + self._scan_frac = float(self.get_parameter('scan_frac').value) + self._row_threshold = float(self.get_parameter('row_threshold').value) + + self._bridge = CvBridge() + + self._sub = self.create_subscription( + Image, '/camera/color/image_raw', self._on_image, _SENSOR_QOS) + + self._pub_frac = self.create_publisher(Float32, '/saltybot/sky_fraction', 10) + self._pub_horizon = self.create_publisher(Int32, '/saltybot/horizon_y', 10) + + self.get_logger().info( + f'sky_detect_node ready — scan_frac={self._scan_frac} ' + f'row_threshold={self._row_threshold}' + ) + + # ── Callback ────────────────────────────────────────────────────────────── + + def _on_image(self, msg: Image) -> None: + try: + bgr = self._bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') + except Exception as exc: + self.get_logger().error( + f'cv_bridge: {exc}', throttle_duration_sec=5.0) + return + + result = detect_sky( + bgr, + scan_frac=self._scan_frac, + row_threshold=self._row_threshold, + ) + + frac_msg = Float32() + frac_msg.data = result.sky_fraction + self._pub_frac.publish(frac_msg) + + hz_msg = Int32() + hz_msg.data = result.horizon_y + self._pub_horizon.publish(hz_msg) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = SkyDetectNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_bringup/setup.py b/jetson/ros2_ws/src/saltybot_bringup/setup.py index 3fa26e0..b0cc59e 100644 --- a/jetson/ros2_ws/src/saltybot_bringup/setup.py +++ b/jetson/ros2_ws/src/saltybot_bringup/setup.py @@ -45,6 +45,8 @@ setup( 'blur_detector = saltybot_bringup.blur_detect_node:main', # Terrain roughness estimator (Issue #296) 'terrain_roughness = saltybot_bringup.terrain_rough_node:main', + # Sky detector for outdoor navigation (Issue #307) + 'sky_detector = saltybot_bringup.sky_detect_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_bringup/test/test_sky_detector.py b/jetson/ros2_ws/src/saltybot_bringup/test/test_sky_detector.py new file mode 100644 index 0000000..9fe3550 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/test/test_sky_detector.py @@ -0,0 +1,327 @@ +""" +test_sky_detector.py — Unit tests for sky detection helpers (no ROS2 required). + +Covers: + SkyResult: + - fields accessible by name + - sky_fraction in [0, 1] + - horizon_y is int + - sky_mask is ndarray + + detect_sky — output contract: + - returns SkyResult + - sky_fraction in [0, 1] for all test images + - sky_mask shape matches input + - sky_mask dtype is uint8 + - empty image returns sky_fraction=0.0, horizon_y=-1 + - sky_fraction=0.0 and horizon_y=-1 for ground-only image + + detect_sky — blue sky detection: + - solid blue sky → sky_fraction ≈ 1.0 + - solid blue sky → horizon_y = H-1 (sky fills every row) + - solid blue sky → sky_mask nearly all 255 + + detect_sky — overcast sky detection: + - solid white/grey overcast image → sky_fraction ≈ 1.0 + - solid grey overcast → horizon_y = H-1 + + detect_sky — non-sky: + - solid green ground → sky_fraction ≈ 0.0 + - solid green ground → horizon_y = -1 + - solid brown → sky_fraction ≈ 0.0 + + detect_sky — split image (sky top, ground bottom): + - top half blue sky, bottom half green → sky_fraction ≈ 1.0 (scan_frac=0.5) + - split image → horizon_y near H//2 + - sky_fraction decreases as scan_frac increases past the sky region + + detect_sky — horizon estimation: + - wider sky region → higher horizon_y + - horizon_y within image bounds [0, H-1] when sky detected + - horizon_y == -1 when no sky anywhere + + detect_sky — scan_frac and row_threshold params: + - scan_frac=1.0 analyses full frame + - scan_frac=0.0 → sky_fraction=0.0 regardless of image content + - row_threshold=0.0 → every row counts as sky row → horizon_y = H-1 for any image + - row_threshold=1.0 → only rows fully sky count → tighter horizon on mixed image +""" + +import sys +import os + +import numpy as np +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from saltybot_bringup._sky_detector import ( + SkyResult, + detect_sky, +) + + +# ── Image factories ─────────────────────────────────────────────────────────── + +def _hsv_solid_bgr(h_val: int, s: int, v: int, rows: int = 64, cols: int = 64) -> np.ndarray: + """Create a solid BGR image from an HSV specification.""" + import cv2 + hsv = np.full((rows, cols, 3), [h_val, s, v], dtype=np.uint8) + return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) + + +# Canonical sky and ground colours (OpenCV HSV: H∈[0,180], S/V∈[0,255]) +def _blue_sky(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(105, 180, 200, rows, cols) # mid-blue sky +def _overcast(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(0, 20, 220, rows, cols) # bright grey +def _green_ground(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(40, 180, 100, rows, cols) # green grass +def _brown_ground(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(15, 160, 90, rows, cols) # soil/gravel + + +def _split(top_bgr: np.ndarray, bottom_bgr: np.ndarray) -> np.ndarray: + """Stack two same-width BGR images vertically.""" + return np.concatenate([top_bgr, bottom_bgr], axis=0) + + +# ── SkyResult ───────────────────────────────────────────────────────────────── + +class TestSkyResult: + + def test_fields_accessible(self): + mask = np.zeros((4, 4), dtype=np.uint8) + r = SkyResult(sky_fraction=0.7, horizon_y=30, sky_mask=mask) + assert r.sky_fraction == pytest.approx(0.7) + assert r.horizon_y == 30 + assert r.sky_mask is mask + + def test_sky_fraction_in_range(self): + mask = np.zeros((4, 4), dtype=np.uint8) + for v in (0.0, 0.5, 1.0): + r = SkyResult(sky_fraction=v, horizon_y=-1, sky_mask=mask) + assert 0.0 <= r.sky_fraction <= 1.0 + + def test_horizon_y_is_int(self): + mask = np.zeros((4, 4), dtype=np.uint8) + r = SkyResult(sky_fraction=0.0, horizon_y=-1, sky_mask=mask) + assert isinstance(r.horizon_y, int) + + +# ── detect_sky — output contract ───────────────────────────────────────────── + +class TestDetectSkyContract: + + def test_returns_sky_result(self): + r = detect_sky(_blue_sky()) + assert isinstance(r, SkyResult) + + def test_sky_fraction_in_range_blue(self): + r = detect_sky(_blue_sky()) + assert 0.0 <= r.sky_fraction <= 1.0 + + def test_sky_fraction_in_range_ground(self): + r = detect_sky(_green_ground()) + assert 0.0 <= r.sky_fraction <= 1.0 + + def test_sky_mask_shape_matches_input(self): + img = _blue_sky(rows=48, cols=80) + r = detect_sky(img) + assert r.sky_mask.shape == (48, 80) + + def test_sky_mask_dtype_uint8(self): + r = detect_sky(_blue_sky()) + assert r.sky_mask.dtype == np.uint8 + + def test_empty_image_returns_zero_fraction(self): + r = detect_sky(np.zeros((0, 64, 3), dtype=np.uint8)) + assert r.sky_fraction == pytest.approx(0.0) + assert r.horizon_y == -1 + + def test_ground_sky_fraction_zero(self): + r = detect_sky(_green_ground()) + assert r.sky_fraction == pytest.approx(0.0, abs=0.05) + + def test_ground_horizon_y_minus_one(self): + r = detect_sky(_green_ground()) + assert r.horizon_y == -1 + + +# ── detect_sky — blue sky ──────────────────────────────────────────────────── + +class TestDetectSkyBlue: + + def test_blue_sky_fraction_near_one(self): + r = detect_sky(_blue_sky()) + assert r.sky_fraction > 0.90, ( + f'solid blue sky should give sky_fraction > 0.9, got {r.sky_fraction:.3f}') + + def test_blue_sky_horizon_at_last_row(self): + """Entire image is sky → horizon at bottom row.""" + h = 64 + r = detect_sky(_blue_sky(rows=h)) + assert r.horizon_y == h - 1, ( + f'all-sky image: horizon_y should be {h-1}, got {r.horizon_y}') + + def test_blue_sky_mask_mostly_255(self): + r = detect_sky(_blue_sky()) + sky_pixels = np.count_nonzero(r.sky_mask) + total = r.sky_mask.size + assert sky_pixels / total > 0.90 + + def test_blue_sky_positive_horizon(self): + r = detect_sky(_blue_sky()) + assert r.horizon_y >= 0 + + +# ── detect_sky — overcast sky ──────────────────────────────────────────────── + +class TestDetectSkyOvercast: + + def test_overcast_fraction_near_one(self): + r = detect_sky(_overcast()) + assert r.sky_fraction > 0.90, ( + f'overcast grey sky should give sky_fraction > 0.9, got {r.sky_fraction:.3f}') + + def test_overcast_horizon_at_last_row(self): + h = 64 + r = detect_sky(_overcast(rows=h)) + assert r.horizon_y == h - 1 + + def test_overcast_positive_horizon(self): + r = detect_sky(_overcast()) + assert r.horizon_y >= 0 + + +# ── detect_sky — non-sky images ────────────────────────────────────────────── + +class TestDetectSkyNonSky: + + def test_green_ground_fraction_zero(self): + r = detect_sky(_green_ground()) + assert r.sky_fraction < 0.05 + + def test_green_ground_horizon_minus_one(self): + assert detect_sky(_green_ground()).horizon_y == -1 + + def test_brown_ground_fraction_zero(self): + r = detect_sky(_brown_ground()) + assert r.sky_fraction < 0.05 + + def test_brown_ground_horizon_minus_one(self): + assert detect_sky(_brown_ground()).horizon_y == -1 + + +# ── detect_sky — split images ───────────────────────────────────────────────── + +class TestDetectSkySplit: + + def test_top_half_sky_scan_frac_half(self): + """scan_frac=0.5 → only top half analysed → sky_fraction ≈ 1.0 for top-sky image.""" + h = 64 + img = _split(_blue_sky(rows=h // 2), _green_ground(rows=h // 2)) + r = detect_sky(img, scan_frac=0.5) + assert r.sky_fraction > 0.85, ( + f'top-half sky with scan_frac=0.5: expected >0.85, got {r.sky_fraction:.3f}') + + def test_horizon_near_midpoint(self): + """Sky in top half, ground in bottom half → horizon_y near H//2.""" + h = 64 + img = _split(_blue_sky(rows=h // 2), _green_ground(rows=h // 2)) + r = detect_sky(img) + # Horizon should be within the top half (rows 0 .. h//2-1) + assert 0 <= r.horizon_y < h, f'horizon_y={r.horizon_y} out of bounds' + assert r.horizon_y < h // 2 + 4, ( + f'horizon_y={r.horizon_y} should be near or before row {h // 2}') + + def test_sky_fraction_decreases_when_scan_extends_into_ground(self): + """With top-sky/bottom-ground image, increasing scan_frac past the sky + boundary should decrease the sky_fraction.""" + h = 128 + img = _split(_blue_sky(rows=h // 2), _green_ground(rows=h // 2)) + r_top = detect_sky(img, scan_frac=0.4) # mostly sky region + r_full = detect_sky(img, scan_frac=1.0) # half sky, half ground + assert r_top.sky_fraction > r_full.sky_fraction, ( + f'scanning less of the ground should give higher fraction: ' + f'scan0.4={r_top.sky_fraction:.3f} scan1.0={r_full.sky_fraction:.3f}') + + def test_sky_only_top_quarter_horizon_within_quarter(self): + """Sky only in top 25 % → horizon_y in first quarter of rows.""" + h = 64 + sky_rows = h // 4 + img = _split(_blue_sky(rows=sky_rows), _green_ground(rows=h - sky_rows)) + r = detect_sky(img) + assert 0 <= r.horizon_y < sky_rows + 2, ( + f'horizon_y={r.horizon_y} should be within top quarter ({sky_rows} rows)') + + +# ── detect_sky — horizon ordering ──────────────────────────────────────────── + +class TestDetectSkyHorizonOrdering: + + def test_more_sky_rows_higher_horizon_y(self): + """Larger sky region → higher (larger row index) horizon_y.""" + h = 128 + sky_a = h // 4 # 25 % sky + sky_b = h * 3 // 4 # 75 % sky + img_a = _split(_blue_sky(rows=sky_a), _green_ground(rows=h - sky_a)) + img_b = _split(_blue_sky(rows=sky_b), _green_ground(rows=h - sky_b)) + r_a = detect_sky(img_a) + r_b = detect_sky(img_b) + assert r_b.horizon_y > r_a.horizon_y, ( + f'larger sky region ({sky_b}px) should give higher horizon_y than ' + f'smaller ({sky_a}px): {r_b.horizon_y} vs {r_a.horizon_y}') + + def test_horizon_within_image_bounds_when_sky_present(self): + h, w = 80, 64 + r = detect_sky(_blue_sky(rows=h, cols=w)) + assert 0 <= r.horizon_y <= h - 1 + + def test_horizon_minus_one_when_no_sky(self): + assert detect_sky(_green_ground()).horizon_y == -1 + + +# ── detect_sky — parameter sensitivity ─────────────────────────────────────── + +class TestDetectSkyParams: + + def test_scan_frac_1_analyses_full_frame(self): + """scan_frac=1.0 on all-sky image → fraction ≈ 1.0.""" + r = detect_sky(_blue_sky(), scan_frac=1.0) + assert r.sky_fraction > 0.90 + + def test_scan_frac_zero_gives_zero_fraction(self): + """scan_frac approaching 0 → effectively empty scan → sky_fraction = 0.""" + # The implementation clamps scan_rows = max(1, ...) so scan_frac=0 → 1 row scanned + # We check that it doesn't crash and returns a valid result. + r = detect_sky(_blue_sky(), scan_frac=0.0) + assert 0.0 <= r.sky_fraction <= 1.0 + + def test_row_threshold_zero_horizon_at_last_row_any_image(self): + """row_threshold=0 → every row satisfies the threshold → horizon = H-1 + even if only a single sky pixel exists anywhere.""" + h = 64 + img = _split(_blue_sky(rows=4), _green_ground(rows=h - 4)) + r = detect_sky(img, row_threshold=0.0) + # At least the sky rows (top 4) will have sky pixels, so horizon ≥ 3 + assert r.horizon_y >= 3 + + def test_row_threshold_high_tightens_horizon(self): + """High row_threshold only accepts near-fully-sky rows → lower horizon_y.""" + h = 64 + # Left 60% of each row: sky; right 40%: ground. + # Low threshold (0.3) → both halves count as sky rows → horizon = H-1 + # High threshold (0.7) → only sky-dominant rows count → horizon = H-1 still + # (since 60% > 0.7 is False, so high threshold gives lower horizon) + w = 100 + img = np.concatenate([ + _blue_sky(rows=h, cols=60), + _green_ground(rows=h, cols=40), + ], axis=1) + r_low = detect_sky(img, row_threshold=0.30) + r_high = detect_sky(img, row_threshold=0.70) + # With 60% sky per row: low threshold (0.3) passes; high (0.7) fails + assert r_low.horizon_y >= 0, 'low threshold should find sky rows' + assert r_high.horizon_y == -1, ( + f'60% sky/row fails 0.7 threshold; horizon_y should be -1, got {r_high.horizon_y}') + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])