feat(perception): sky detector for outdoor navigation (Issue #307) #313

Merged
sl-jetson merged 1 commits from sl-perception/issue-307-sky-detect into main 2026-03-03 00:20:05 -05:00
4 changed files with 557 additions and 0 deletions

View File

@ -0,0 +1,122 @@
"""
_sky_detector.py Sky detection via HSV thresholding + horizon estimation (no ROS2 deps).
Algorithm
---------
Sky pixels are identified by two HSV bands (OpenCV convention: H[0,180], S/V[0,255]):
1. Blue sky H[90,130], S[40,255], V[80,255]
(captures clear blue sky from cyan-blue through sky-blue to blue-violet)
2. Overcast S[0,50], V[185,255]
(very bright, near-zero saturation: white/grey sky on cloudy days)
The two masks are OR-combined into a single sky mask.
Sky fraction
------------
Computed over the top *scan_frac* of the image (default 60 %). This concentrates
sensitivity on the region where sky is expected to appear and avoids penalising ground
reflections or obstacles in the lower frame.
sky_fraction = sky_pixels_in_top_scan_frac / pixels_in_top_scan_frac
Horizon line
------------
For each image row, compute the row-level sky fraction (sky pixels / row width).
The horizon is the **bottommost row** where that fraction *row_threshold* (default 0.30).
This represents the lower boundary of continuous sky content.
horizon_y = max { r : row_sky_frac[r] row_threshold } or -1 if no sky found.
Returns -1 when no sky is detected at all (indoors, underground, etc.).
Public API
----------
SkyResult(sky_fraction, horizon_y, sky_mask)
detect_sky(bgr, scan_frac=0.60, row_threshold=0.30) -> SkyResult
"""
from __future__ import annotations
from typing import NamedTuple
import numpy as np
# ── HSV sky bands ─────────────────────────────────────────────────────────────
# Blue sky (OpenCV H ∈ [0, 180])
_BLUE_SKY_LO = np.array([90, 40, 80], dtype=np.uint8)
_BLUE_SKY_HI = np.array([130, 255, 255], dtype=np.uint8)
# White / grey overcast sky (any hue, very bright, very desaturated)
_GREY_SKY_LO = np.array([0, 0, 185], dtype=np.uint8)
_GREY_SKY_HI = np.array([180, 50, 255], dtype=np.uint8)
# ── Data type ─────────────────────────────────────────────────────────────────
class SkyResult(NamedTuple):
"""Sky detection result for a single frame."""
sky_fraction: float # fraction of top scan_frac region classified as sky [0, 1]
horizon_y: int # pixel row of horizon (-1 = no sky detected)
sky_mask: np.ndarray # (H, W) uint8 binary mask (255 = sky pixel)
# ── Public API ────────────────────────────────────────────────────────────────
def detect_sky(
bgr: np.ndarray,
scan_frac: float = 0.60,
row_threshold: float = 0.30,
) -> SkyResult:
"""
Detect sky pixels and estimate the horizon row.
Parameters
----------
bgr : (H, W, 3) uint8 BGR ndarray
scan_frac : fraction of image height to use for sky_fraction computation
(top of frame, where sky is expected)
row_threshold : minimum per-row sky fraction to count a row as sky for
horizon estimation
Returns
-------
SkyResult(sky_fraction, horizon_y, sky_mask)
"""
import cv2
bgr = np.asarray(bgr, dtype=np.uint8)
h, w = bgr.shape[:2]
if h == 0 or w == 0:
empty = np.zeros((h, w), dtype=np.uint8)
return SkyResult(sky_fraction=0.0, horizon_y=-1, sky_mask=empty)
hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV)
# Build combined sky mask (blue OR overcast)
mask_blue = cv2.inRange(hsv, _BLUE_SKY_LO, _BLUE_SKY_HI)
mask_grey = cv2.inRange(hsv, _GREY_SKY_LO, _GREY_SKY_HI)
sky_mask = cv2.bitwise_or(mask_blue, mask_grey) # (H, W) uint8, 255 = sky
# ── sky_fraction: top scan_frac rows ──────────────────────────────────────
scan_rows = max(1, int(h * min(float(scan_frac), 1.0)))
top_mask = sky_mask[:scan_rows, :]
sky_fraction = float(np.count_nonzero(top_mask)) / float(scan_rows * w)
# ── horizon_y: bottommost row with ≥ row_threshold sky pixels ─────────────
# row_fracs[r] = fraction of pixels in row r that are sky
row_sky_counts = np.count_nonzero(sky_mask, axis=1).astype(np.float32) # (H,)
row_fracs = row_sky_counts / float(w)
sky_rows = np.where(row_fracs >= row_threshold)[0]
horizon_y = int(sky_rows.max()) if len(sky_rows) > 0 else -1
return SkyResult(
sky_fraction=sky_fraction,
horizon_y=horizon_y,
sky_mask=sky_mask,
)

View File

@ -0,0 +1,106 @@
"""
sky_detect_node.py D435i sky detector for outdoor navigation (Issue #307).
Classifies the top portion of the D435i colour image as sky vs non-sky using
HSV blue/grey thresholding, then estimates the horizon line.
Useful for:
- Outdoor/indoor scene detection (sky_fraction > 0.1 likely outdoor)
- Camera tilt correction (horizon_y deviation from expected row tilt estimate)
- Disabling outdoor-only nodes (colour correction, sun glare filter) indoors
Subscribes (BEST_EFFORT):
/camera/color/image_raw sensor_msgs/Image BGR8
Publishes:
/saltybot/sky_fraction std_msgs/Float32 sky fraction in [0, 1] (per frame)
/saltybot/horizon_y std_msgs/Int32 horizon pixel row; -1 = no sky
Parameters
----------
scan_frac float 0.60 Top fraction of image analysed for sky_fraction
row_threshold float 0.30 Per-row sky fraction required to count a row as sky
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from std_msgs.msg import Float32, Int32
from ._sky_detector import detect_sky
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=4,
)
class SkyDetectNode(Node):
def __init__(self) -> None:
super().__init__('sky_detect_node')
self.declare_parameter('scan_frac', 0.60)
self.declare_parameter('row_threshold', 0.30)
self._scan_frac = float(self.get_parameter('scan_frac').value)
self._row_threshold = float(self.get_parameter('row_threshold').value)
self._bridge = CvBridge()
self._sub = self.create_subscription(
Image, '/camera/color/image_raw', self._on_image, _SENSOR_QOS)
self._pub_frac = self.create_publisher(Float32, '/saltybot/sky_fraction', 10)
self._pub_horizon = self.create_publisher(Int32, '/saltybot/horizon_y', 10)
self.get_logger().info(
f'sky_detect_node ready — scan_frac={self._scan_frac} '
f'row_threshold={self._row_threshold}'
)
# ── Callback ──────────────────────────────────────────────────────────────
def _on_image(self, msg: Image) -> None:
try:
bgr = self._bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
except Exception as exc:
self.get_logger().error(
f'cv_bridge: {exc}', throttle_duration_sec=5.0)
return
result = detect_sky(
bgr,
scan_frac=self._scan_frac,
row_threshold=self._row_threshold,
)
frac_msg = Float32()
frac_msg.data = result.sky_fraction
self._pub_frac.publish(frac_msg)
hz_msg = Int32()
hz_msg.data = result.horizon_y
self._pub_horizon.publish(hz_msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = SkyDetectNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -45,6 +45,8 @@ setup(
'blur_detector = saltybot_bringup.blur_detect_node:main',
# Terrain roughness estimator (Issue #296)
'terrain_roughness = saltybot_bringup.terrain_rough_node:main',
# Sky detector for outdoor navigation (Issue #307)
'sky_detector = saltybot_bringup.sky_detect_node:main',
],
},
)

View File

@ -0,0 +1,327 @@
"""
test_sky_detector.py Unit tests for sky detection helpers (no ROS2 required).
Covers:
SkyResult:
- fields accessible by name
- sky_fraction in [0, 1]
- horizon_y is int
- sky_mask is ndarray
detect_sky output contract:
- returns SkyResult
- sky_fraction in [0, 1] for all test images
- sky_mask shape matches input
- sky_mask dtype is uint8
- empty image returns sky_fraction=0.0, horizon_y=-1
- sky_fraction=0.0 and horizon_y=-1 for ground-only image
detect_sky blue sky detection:
- solid blue sky sky_fraction 1.0
- solid blue sky horizon_y = H-1 (sky fills every row)
- solid blue sky sky_mask nearly all 255
detect_sky overcast sky detection:
- solid white/grey overcast image sky_fraction 1.0
- solid grey overcast horizon_y = H-1
detect_sky non-sky:
- solid green ground sky_fraction 0.0
- solid green ground horizon_y = -1
- solid brown sky_fraction 0.0
detect_sky split image (sky top, ground bottom):
- top half blue sky, bottom half green sky_fraction 1.0 (scan_frac=0.5)
- split image horizon_y near H//2
- sky_fraction decreases as scan_frac increases past the sky region
detect_sky horizon estimation:
- wider sky region higher horizon_y
- horizon_y within image bounds [0, H-1] when sky detected
- horizon_y == -1 when no sky anywhere
detect_sky scan_frac and row_threshold params:
- scan_frac=1.0 analyses full frame
- scan_frac=0.0 sky_fraction=0.0 regardless of image content
- row_threshold=0.0 every row counts as sky row horizon_y = H-1 for any image
- row_threshold=1.0 only rows fully sky count tighter horizon on mixed image
"""
import sys
import os
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._sky_detector import (
SkyResult,
detect_sky,
)
# ── Image factories ───────────────────────────────────────────────────────────
def _hsv_solid_bgr(h_val: int, s: int, v: int, rows: int = 64, cols: int = 64) -> np.ndarray:
"""Create a solid BGR image from an HSV specification."""
import cv2
hsv = np.full((rows, cols, 3), [h_val, s, v], dtype=np.uint8)
return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
# Canonical sky and ground colours (OpenCV HSV: H∈[0,180], S/V∈[0,255])
def _blue_sky(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(105, 180, 200, rows, cols) # mid-blue sky
def _overcast(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(0, 20, 220, rows, cols) # bright grey
def _green_ground(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(40, 180, 100, rows, cols) # green grass
def _brown_ground(rows=64, cols=64) -> np.ndarray: return _hsv_solid_bgr(15, 160, 90, rows, cols) # soil/gravel
def _split(top_bgr: np.ndarray, bottom_bgr: np.ndarray) -> np.ndarray:
"""Stack two same-width BGR images vertically."""
return np.concatenate([top_bgr, bottom_bgr], axis=0)
# ── SkyResult ─────────────────────────────────────────────────────────────────
class TestSkyResult:
def test_fields_accessible(self):
mask = np.zeros((4, 4), dtype=np.uint8)
r = SkyResult(sky_fraction=0.7, horizon_y=30, sky_mask=mask)
assert r.sky_fraction == pytest.approx(0.7)
assert r.horizon_y == 30
assert r.sky_mask is mask
def test_sky_fraction_in_range(self):
mask = np.zeros((4, 4), dtype=np.uint8)
for v in (0.0, 0.5, 1.0):
r = SkyResult(sky_fraction=v, horizon_y=-1, sky_mask=mask)
assert 0.0 <= r.sky_fraction <= 1.0
def test_horizon_y_is_int(self):
mask = np.zeros((4, 4), dtype=np.uint8)
r = SkyResult(sky_fraction=0.0, horizon_y=-1, sky_mask=mask)
assert isinstance(r.horizon_y, int)
# ── detect_sky — output contract ─────────────────────────────────────────────
class TestDetectSkyContract:
def test_returns_sky_result(self):
r = detect_sky(_blue_sky())
assert isinstance(r, SkyResult)
def test_sky_fraction_in_range_blue(self):
r = detect_sky(_blue_sky())
assert 0.0 <= r.sky_fraction <= 1.0
def test_sky_fraction_in_range_ground(self):
r = detect_sky(_green_ground())
assert 0.0 <= r.sky_fraction <= 1.0
def test_sky_mask_shape_matches_input(self):
img = _blue_sky(rows=48, cols=80)
r = detect_sky(img)
assert r.sky_mask.shape == (48, 80)
def test_sky_mask_dtype_uint8(self):
r = detect_sky(_blue_sky())
assert r.sky_mask.dtype == np.uint8
def test_empty_image_returns_zero_fraction(self):
r = detect_sky(np.zeros((0, 64, 3), dtype=np.uint8))
assert r.sky_fraction == pytest.approx(0.0)
assert r.horizon_y == -1
def test_ground_sky_fraction_zero(self):
r = detect_sky(_green_ground())
assert r.sky_fraction == pytest.approx(0.0, abs=0.05)
def test_ground_horizon_y_minus_one(self):
r = detect_sky(_green_ground())
assert r.horizon_y == -1
# ── detect_sky — blue sky ────────────────────────────────────────────────────
class TestDetectSkyBlue:
def test_blue_sky_fraction_near_one(self):
r = detect_sky(_blue_sky())
assert r.sky_fraction > 0.90, (
f'solid blue sky should give sky_fraction > 0.9, got {r.sky_fraction:.3f}')
def test_blue_sky_horizon_at_last_row(self):
"""Entire image is sky → horizon at bottom row."""
h = 64
r = detect_sky(_blue_sky(rows=h))
assert r.horizon_y == h - 1, (
f'all-sky image: horizon_y should be {h-1}, got {r.horizon_y}')
def test_blue_sky_mask_mostly_255(self):
r = detect_sky(_blue_sky())
sky_pixels = np.count_nonzero(r.sky_mask)
total = r.sky_mask.size
assert sky_pixels / total > 0.90
def test_blue_sky_positive_horizon(self):
r = detect_sky(_blue_sky())
assert r.horizon_y >= 0
# ── detect_sky — overcast sky ────────────────────────────────────────────────
class TestDetectSkyOvercast:
def test_overcast_fraction_near_one(self):
r = detect_sky(_overcast())
assert r.sky_fraction > 0.90, (
f'overcast grey sky should give sky_fraction > 0.9, got {r.sky_fraction:.3f}')
def test_overcast_horizon_at_last_row(self):
h = 64
r = detect_sky(_overcast(rows=h))
assert r.horizon_y == h - 1
def test_overcast_positive_horizon(self):
r = detect_sky(_overcast())
assert r.horizon_y >= 0
# ── detect_sky — non-sky images ──────────────────────────────────────────────
class TestDetectSkyNonSky:
def test_green_ground_fraction_zero(self):
r = detect_sky(_green_ground())
assert r.sky_fraction < 0.05
def test_green_ground_horizon_minus_one(self):
assert detect_sky(_green_ground()).horizon_y == -1
def test_brown_ground_fraction_zero(self):
r = detect_sky(_brown_ground())
assert r.sky_fraction < 0.05
def test_brown_ground_horizon_minus_one(self):
assert detect_sky(_brown_ground()).horizon_y == -1
# ── detect_sky — split images ─────────────────────────────────────────────────
class TestDetectSkySplit:
def test_top_half_sky_scan_frac_half(self):
"""scan_frac=0.5 → only top half analysed → sky_fraction ≈ 1.0 for top-sky image."""
h = 64
img = _split(_blue_sky(rows=h // 2), _green_ground(rows=h // 2))
r = detect_sky(img, scan_frac=0.5)
assert r.sky_fraction > 0.85, (
f'top-half sky with scan_frac=0.5: expected >0.85, got {r.sky_fraction:.3f}')
def test_horizon_near_midpoint(self):
"""Sky in top half, ground in bottom half → horizon_y near H//2."""
h = 64
img = _split(_blue_sky(rows=h // 2), _green_ground(rows=h // 2))
r = detect_sky(img)
# Horizon should be within the top half (rows 0 .. h//2-1)
assert 0 <= r.horizon_y < h, f'horizon_y={r.horizon_y} out of bounds'
assert r.horizon_y < h // 2 + 4, (
f'horizon_y={r.horizon_y} should be near or before row {h // 2}')
def test_sky_fraction_decreases_when_scan_extends_into_ground(self):
"""With top-sky/bottom-ground image, increasing scan_frac past the sky
boundary should decrease the sky_fraction."""
h = 128
img = _split(_blue_sky(rows=h // 2), _green_ground(rows=h // 2))
r_top = detect_sky(img, scan_frac=0.4) # mostly sky region
r_full = detect_sky(img, scan_frac=1.0) # half sky, half ground
assert r_top.sky_fraction > r_full.sky_fraction, (
f'scanning less of the ground should give higher fraction: '
f'scan0.4={r_top.sky_fraction:.3f} scan1.0={r_full.sky_fraction:.3f}')
def test_sky_only_top_quarter_horizon_within_quarter(self):
"""Sky only in top 25 % → horizon_y in first quarter of rows."""
h = 64
sky_rows = h // 4
img = _split(_blue_sky(rows=sky_rows), _green_ground(rows=h - sky_rows))
r = detect_sky(img)
assert 0 <= r.horizon_y < sky_rows + 2, (
f'horizon_y={r.horizon_y} should be within top quarter ({sky_rows} rows)')
# ── detect_sky — horizon ordering ────────────────────────────────────────────
class TestDetectSkyHorizonOrdering:
def test_more_sky_rows_higher_horizon_y(self):
"""Larger sky region → higher (larger row index) horizon_y."""
h = 128
sky_a = h // 4 # 25 % sky
sky_b = h * 3 // 4 # 75 % sky
img_a = _split(_blue_sky(rows=sky_a), _green_ground(rows=h - sky_a))
img_b = _split(_blue_sky(rows=sky_b), _green_ground(rows=h - sky_b))
r_a = detect_sky(img_a)
r_b = detect_sky(img_b)
assert r_b.horizon_y > r_a.horizon_y, (
f'larger sky region ({sky_b}px) should give higher horizon_y than '
f'smaller ({sky_a}px): {r_b.horizon_y} vs {r_a.horizon_y}')
def test_horizon_within_image_bounds_when_sky_present(self):
h, w = 80, 64
r = detect_sky(_blue_sky(rows=h, cols=w))
assert 0 <= r.horizon_y <= h - 1
def test_horizon_minus_one_when_no_sky(self):
assert detect_sky(_green_ground()).horizon_y == -1
# ── detect_sky — parameter sensitivity ───────────────────────────────────────
class TestDetectSkyParams:
def test_scan_frac_1_analyses_full_frame(self):
"""scan_frac=1.0 on all-sky image → fraction ≈ 1.0."""
r = detect_sky(_blue_sky(), scan_frac=1.0)
assert r.sky_fraction > 0.90
def test_scan_frac_zero_gives_zero_fraction(self):
"""scan_frac approaching 0 → effectively empty scan → sky_fraction = 0."""
# The implementation clamps scan_rows = max(1, ...) so scan_frac=0 → 1 row scanned
# We check that it doesn't crash and returns a valid result.
r = detect_sky(_blue_sky(), scan_frac=0.0)
assert 0.0 <= r.sky_fraction <= 1.0
def test_row_threshold_zero_horizon_at_last_row_any_image(self):
"""row_threshold=0 → every row satisfies the threshold → horizon = H-1
even if only a single sky pixel exists anywhere."""
h = 64
img = _split(_blue_sky(rows=4), _green_ground(rows=h - 4))
r = detect_sky(img, row_threshold=0.0)
# At least the sky rows (top 4) will have sky pixels, so horizon ≥ 3
assert r.horizon_y >= 3
def test_row_threshold_high_tightens_horizon(self):
"""High row_threshold only accepts near-fully-sky rows → lower horizon_y."""
h = 64
# Left 60% of each row: sky; right 40%: ground.
# Low threshold (0.3) → both halves count as sky rows → horizon = H-1
# High threshold (0.7) → only sky-dominant rows count → horizon = H-1 still
# (since 60% > 0.7 is False, so high threshold gives lower horizon)
w = 100
img = np.concatenate([
_blue_sky(rows=h, cols=60),
_green_ground(rows=h, cols=40),
], axis=1)
r_low = detect_sky(img, row_threshold=0.30)
r_high = detect_sky(img, row_threshold=0.70)
# With 60% sky per row: low threshold (0.3) passes; high (0.7) fails
assert r_low.horizon_y >= 0, 'low threshold should find sky rows'
assert r_high.horizon_y == -1, (
f'60% sky/row fails 0.7 threshold; horizon_y should be -1, got {r_high.horizon_y}')
if __name__ == '__main__':
pytest.main([__file__, '-v'])