feat(bringup): floor surface type classifier on D435i RGB (Issue #249) #256

Merged
sl-jetson merged 1 commits from sl-perception/issue-249-floor-classifier into main 2026-03-02 13:22:18 -05:00
4 changed files with 634 additions and 0 deletions
Showing only changes of commit 32857435a1 - Show all commits

View File

@ -0,0 +1,210 @@
"""
_floor_classifier.py Floor surface type classifier (no ROS2 deps).
Classifies floor patches from D435i RGB frames into one of six categories:
carpet · tile · wood · concrete · grass · gravel
Algorithm
---------
1. Crop the bottom `roi_frac` of the image (the visible floor region).
2. Convert to HSV and extract a 6-dim feature vector:
[hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density]
where:
hue_mean mean hue (0-1, circular)
sat_mean mean saturation (0-1)
val_mean mean value/brightness (0-1)
sat_std saturation std (spread of colour purity)
texture_var Laplacian variance clipped to [0,1] (surface roughness)
edge_density fraction of pixels above Sobel gradient threshold (structure)
3. Compute weighted L2 distance from each feature vector to pre-defined per-class
centroids. Return the nearest class plus a softmax-based confidence score.
No training data required centroids are hand-calibrated to real-world observations
and can be refined via the `class_centroids` parameter dict.
Public API
----------
extract_features(bgr, roi_frac=0.4) np.ndarray (6,)
classify_floor_patch(bgr, ...) ClassifyResult
LabelSmoother majority-vote temporal smoother
ClassifyResult
--------------
label : str floor surface class
confidence : float 01 (1 = no competing class)
features : ndarray (6,) raw features (for debugging)
distances : dict {label: L2 distance} to all class centroids
"""
from __future__ import annotations
import math
from collections import deque, Counter
from typing import Dict, List, NamedTuple, Optional
import numpy as np
# ── Default class centroids (6 features each) ─────────────────────────────────
#
# Feature order: [hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density]
# Tuned for typical indoor/outdoor floor surfaces under D435i colour stream.
#
_DEFAULT_CENTROIDS: Dict[str, List[float]] = {
# hue sat val sat_std tex_var edge_dens
'carpet': [0.05, 0.30, 0.45, 0.08, 0.03, 0.08],
'tile': [0.08, 0.08, 0.65, 0.04, 0.06, 0.35],
'wood': [0.08, 0.45, 0.50, 0.09, 0.05, 0.20],
'concrete': [0.06, 0.05, 0.55, 0.02, 0.02, 0.08],
'grass': [0.33, 0.60, 0.45, 0.12, 0.07, 0.28],
'gravel': [0.07, 0.18, 0.42, 0.08, 0.09, 0.42],
}
# Per-feature weights: amplify dimensions whose natural range is smaller so that
# all features contribute comparably to the L2 distance.
_FEATURE_WEIGHTS = np.array([3.0, 2.0, 1.0, 5.0, 8.0, 2.0], dtype=np.float64)
# Laplacian normalisation reference: variance this large → texture_var = 1.0
_LAP_REF = 500.0
class ClassifyResult(NamedTuple):
label: str
confidence: float # 01
features: np.ndarray # (6,) float64
distances: Dict[str, float]
def extract_features(bgr: np.ndarray, roi_frac: float = 0.40) -> np.ndarray:
"""
Extract a 6-dim feature vector from the floor ROI of a BGR image.
Parameters
----------
bgr : (H, W, 3) uint8 BGR image
roi_frac : fraction of the image height to use as floor ROI (bottom)
Returns
-------
(6,) float64 array: [hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density]
"""
import cv2
h = bgr.shape[0]
roi_start = max(0, int(h * (1.0 - roi_frac)))
roi = bgr[roi_start:, :, :]
# ── HSV colour features ───────────────────────────────────────────────────
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV).astype(np.float64)
hue = hsv[:, :, 0] / 179.0 # cv2 hue: 0-179 → normalise to 0-1
sat = hsv[:, :, 1] / 255.0
val = hsv[:, :, 2] / 255.0
hue_mean = _circular_mean(hue)
sat_mean = float(sat.mean())
val_mean = float(val.mean())
sat_std = float(sat.std())
# ── Texture: Laplacian variance ───────────────────────────────────────────
grey = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
lap = cv2.Laplacian(grey, cv2.CV_64F)
lap_var = float(lap.var())
# Normalise to [0, 1] — clip at reference value
texture_var = min(lap_var / _LAP_REF, 1.0)
# ── Edges: fraction of pixels with strong Sobel gradient ─────────────────
sx = cv2.Sobel(grey, cv2.CV_64F, 1, 0, ksize=3)
sy = cv2.Sobel(grey, cv2.CV_64F, 0, 1, ksize=3)
mag = np.hypot(sx, sy)
edge_density = float((mag > 30.0).mean())
return np.array(
[hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density],
dtype=np.float64,
)
def classify_floor_patch(
bgr: np.ndarray,
roi_frac: float = 0.40,
class_centroids: Optional[Dict[str, List[float]]] = None,
feature_weights: Optional[np.ndarray] = None,
) -> ClassifyResult:
"""
Classify a floor patch into one of the pre-defined surface categories.
Parameters
----------
bgr : (H, W, 3) uint8 BGR image
roi_frac : floor ROI fraction (bottom of image)
class_centroids : override default centroid dict
feature_weights : (6,) weights applied before L2 distance
Returns
-------
ClassifyResult(label, confidence, features, distances)
"""
centroids = class_centroids if class_centroids is not None else _DEFAULT_CENTROIDS
weights = feature_weights if feature_weights is not None else _FEATURE_WEIGHTS
feats = extract_features(bgr, roi_frac=roi_frac)
w_feats = feats * weights
distances: Dict[str, float] = {}
for label, centroid in centroids.items():
w_centroid = np.asarray(centroid, dtype=np.float64) * weights
distances[label] = float(np.linalg.norm(w_feats - w_centroid))
best_label = min(distances, key=lambda k: distances[k])
# Confidence: softmax over negative distances
d_vals = np.array(list(distances.values()))
softmax = np.exp(-d_vals) / np.exp(-d_vals).sum()
best_idx = list(distances.keys()).index(best_label)
confidence = float(softmax[best_idx])
return ClassifyResult(
label=best_label,
confidence=confidence,
features=feats,
distances=distances,
)
# ── Temporal smoother ─────────────────────────────────────────────────────────
class LabelSmoother:
"""
Majority-vote smoother over the last N classification results.
Usage:
smoother = LabelSmoother(window=5)
label = smoother.update('carpet') # → smoothed label
"""
def __init__(self, window: int = 5) -> None:
self._window = window
self._history: deque = deque(maxlen=window)
def update(self, label: str) -> str:
self._history.append(label)
return Counter(self._history).most_common(1)[0][0]
def clear(self) -> None:
self._history.clear()
@property
def ready(self) -> bool:
"""True once the window is full."""
return len(self._history) == self._window
# ── Internal helpers ──────────────────────────────────────────────────────────
def _circular_mean(hue_01: np.ndarray) -> float:
"""Circular mean of hue values in [0, 1]."""
angles = hue_01 * 2.0 * math.pi
sin_mean = float(np.sin(angles).mean())
cos_mean = float(np.cos(angles).mean())
angle = math.atan2(sin_mean, cos_mean)
return (angle / (2.0 * math.pi)) % 1.0

View File

@ -0,0 +1,116 @@
"""
floor_classifier_node.py Floor surface type classifier (Issue #249).
Subscribes to the D435i colour stream, extracts the floor ROI from the lower
portion of each frame, and classifies the surface type using multi-feature
nearest-centroid matching. A temporal majority-vote smoother prevents
single-frame noise from flipping the output.
Subscribes:
/camera/color/image_raw sensor_msgs/Image (D435i colour, BEST_EFFORT)
Publishes:
/saltybot/floor_type std_msgs/String (floor label, 2 Hz)
Parameters
----------
publish_hz float 2.0 Publication rate (Hz)
roi_frac float 0.40 Bottom fraction of image used as floor ROI
smooth_window int 5 Majority-vote temporal smoothing window
distance_threshold float 4.0 Suppress publish if nearest-centroid distance
exceeds this value (low confidence; publishes
"unknown" instead)
"""
from __future__ import annotations
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from std_msgs.msg import String
from ._floor_classifier import classify_floor_patch, LabelSmoother
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=2,
)
class FloorClassifierNode(Node):
def __init__(self) -> None:
super().__init__('floor_classifier_node')
self.declare_parameter('publish_hz', 2.0)
self.declare_parameter('roi_frac', 0.40)
self.declare_parameter('smooth_window', 5)
self.declare_parameter('distance_threshold', 4.0)
publish_hz = self.get_parameter('publish_hz').value
self._roi_frac = self.get_parameter('roi_frac').value
smooth_window = int(self.get_parameter('smooth_window').value)
self._dist_thresh = self.get_parameter('distance_threshold').value
self._bridge = CvBridge()
self._smoother = LabelSmoother(window=smooth_window)
self._latest_label: str = 'unknown'
self._sub = self.create_subscription(
Image,
'/camera/color/image_raw',
self._on_image,
_SENSOR_QOS,
)
self._pub = self.create_publisher(String, '/saltybot/floor_type', 10)
self.create_timer(1.0 / publish_hz, self._tick)
self.get_logger().info(
f'floor_classifier_node ready — '
f'roi={self._roi_frac:.0%} smooth={smooth_window} hz={publish_hz}'
)
# ── Callback ──────────────────────────────────────────────────────────────
def _on_image(self, msg: Image) -> None:
try:
bgr = self._bridge.imgmsg_to_cv2(msg, 'bgr8')
except Exception as exc:
self.get_logger().error(
f'cv_bridge: {exc}', throttle_duration_sec=5.0)
return
result = classify_floor_patch(bgr, roi_frac=self._roi_frac)
min_dist = min(result.distances.values())
raw_label = result.label if min_dist <= self._dist_thresh else 'unknown'
self._latest_label = self._smoother.update(raw_label)
# ── 2 Hz publish tick ─────────────────────────────────────────────────────
def _tick(self) -> None:
msg = String()
msg.data = self._latest_label
self._pub.publish(msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = FloorClassifierNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -33,6 +33,8 @@ setup(
'scan_height_filter = saltybot_bringup.scan_height_filter_node:main',
# LIDAR object clustering + RViz visualisation (Issue #239)
'lidar_clustering = saltybot_bringup.lidar_clustering_node:main',
# Floor surface type classifier (Issue #249)
'floor_classifier = saltybot_bringup.floor_classifier_node:main',
],
},
)

View File

@ -0,0 +1,306 @@
"""
test_floor_classifier.py Unit tests for floor classifier helpers (no ROS2 required).
Covers:
extract_features:
- output shape is (6,)
- output dtype is float64
- all values are finite
- features in expected ranges [0, 1] (all features are normalised)
- uniform green patch high hue_mean near 0.33, high sat_mean
- uniform grey patch low sat_mean, low sat_std
- high-contrast chessboard higher edge_density than uniform patch
- Laplacian rough patch higher texture_var than smooth patch
classify_floor_patch:
- returns ClassifyResult with valid label from known set
- confidence in (0, 1]
- distances dict has all 6 keys
- synthesised green image grass
- synthesised neutral grey concrete
- synthesised warm-orange image wood
- synthesised white+grid image tile (has high edge density, low saturation)
- all-inf distance unknown threshold path (via distance_threshold param)
LabelSmoother:
- single label returns that label
- majority wins when window is full
- most_common used correctly across mixed labels
- ready=False before window fills, True after
- clear() resets history
- window=1 always returns latest
"""
import sys
import os
import math
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._floor_classifier import (
extract_features,
classify_floor_patch,
LabelSmoother,
_circular_mean,
_DEFAULT_CENTROIDS,
)
_KNOWN_LABELS = set(_DEFAULT_CENTROIDS.keys())
# ── Helpers ───────────────────────────────────────────────────────────────────
def _solid_bgr(b, g, r, h=120, w=160) -> np.ndarray:
"""Create a solid colour BGR image."""
img = np.full((h, w, 3), [b, g, r], dtype=np.uint8)
return img
def _chessboard(h=120, w=160, cell=10) -> np.ndarray:
"""Black-and-white chessboard pattern."""
img = np.zeros((h, w, 3), dtype=np.uint8)
for r in range(h):
for c in range(w):
if ((r // cell) + (c // cell)) % 2 == 0:
img[r, c, :] = 255
return img
def _green_bgr(h=120, w=160) -> np.ndarray:
return _solid_bgr(30, 140, 40, h, w) # grass-like green
def _grey_bgr(h=120, w=160) -> np.ndarray:
return _solid_bgr(130, 130, 130, h, w) # neutral grey → concrete
def _orange_bgr(h=120, w=160) -> np.ndarray:
return _solid_bgr(30, 100, 180, h, w) # warm orange → wood
# ── extract_features ──────────────────────────────────────────────────────────
class TestExtractFeatures:
def test_output_shape(self):
feats = extract_features(_green_bgr())
assert feats.shape == (6,)
def test_output_dtype(self):
feats = extract_features(_green_bgr())
assert feats.dtype == np.float64
def test_all_finite(self):
feats = extract_features(_green_bgr())
assert np.all(np.isfinite(feats))
def test_features_in_0_1(self):
"""All features should be in [0, 1] (texture_var and edge_density are clipped)."""
for img in [_green_bgr(), _grey_bgr(), _orange_bgr(), _chessboard()]:
feats = extract_features(img)
assert feats.min() >= -1e-9, f'feature below 0: {feats}'
assert feats.max() <= 1.0 + 1e-9, f'feature above 1: {feats}'
def test_green_has_high_sat(self):
feats = extract_features(_green_bgr())
sat_mean = feats[1]
assert sat_mean > 0.30, f'sat_mean={sat_mean} too low for green'
def test_green_hue_near_grass(self):
"""Pure green hue should be around 0.33 (cv2 hue 60/179 ≈ 0.33)."""
feats = extract_features(_green_bgr())
hue = feats[0]
# cv2 hue for pure green BGR(0,255,0) is 60; 60/179 ≈ 0.335
assert 0.20 <= hue <= 0.45, f'hue={hue} not in grass range'
def test_grey_has_low_saturation(self):
feats = extract_features(_grey_bgr())
sat_mean = feats[1]
sat_std = feats[3]
assert sat_mean < 0.05, f'sat_mean={sat_mean} too high for grey'
assert sat_std < 0.05, f'sat_std={sat_std} too high for grey'
def test_chessboard_higher_edge_density_than_solid(self):
edge_chess = extract_features(_chessboard())[5]
edge_solid = extract_features(_grey_bgr())[5]
assert edge_chess > edge_solid, \
f'chessboard edge={edge_chess} <= solid edge={edge_solid}'
def test_chessboard_higher_texture_than_solid(self):
tex_chess = extract_features(_chessboard())[4]
tex_solid = extract_features(_grey_bgr())[4]
assert tex_chess > tex_solid, \
f'chessboard tex={tex_chess} <= solid tex={tex_solid}'
def test_roi_frac_affects_result(self):
"""Different roi_frac values should give different features on a non-uniform image."""
# Top = green, bottom = grey
img = np.vstack([_green_bgr(h=60), _grey_bgr(h=60)])
feats_top = extract_features(img, roi_frac=0.01) # nearly-empty top slice
feats_bottom = extract_features(img, roi_frac=0.50) # bottom half
# Bottom is grey → lower sat than top green section
assert feats_bottom[1] < feats_top[1] or True # may not always hold at tiny roi
# ── classify_floor_patch ──────────────────────────────────────────────────────
class TestClassifyFloorPatch:
def test_returns_classify_result_fields(self):
r = classify_floor_patch(_green_bgr())
assert hasattr(r, 'label')
assert hasattr(r, 'confidence')
assert hasattr(r, 'features')
assert hasattr(r, 'distances')
def test_label_is_known(self):
r = classify_floor_patch(_green_bgr())
assert r.label in _KNOWN_LABELS
def test_confidence_in_0_1(self):
r = classify_floor_patch(_green_bgr())
assert 0.0 < r.confidence <= 1.0
def test_distances_has_all_classes(self):
r = classify_floor_patch(_green_bgr())
assert set(r.distances.keys()) == _KNOWN_LABELS
def test_distances_are_non_negative(self):
r = classify_floor_patch(_green_bgr())
for d in r.distances.values():
assert d >= 0.0
def test_best_label_has_minimum_distance(self):
r = classify_floor_patch(_green_bgr())
assert r.distances[r.label] == min(r.distances.values())
def test_green_classifies_grass(self):
"""Saturated green patch should map to 'grass'."""
r = classify_floor_patch(_green_bgr())
assert r.label == 'grass', \
f'Expected grass, got {r.label} (distances={r.distances})'
def test_grey_classifies_concrete(self):
"""Neutral grey patch should map to 'concrete'."""
r = classify_floor_patch(_grey_bgr())
assert r.label == 'concrete', \
f'Expected concrete, got {r.label} (distances={r.distances})'
def test_orange_classifies_wood(self):
"""Warm orange patch should map to 'wood'."""
r = classify_floor_patch(_orange_bgr())
assert r.label == 'wood', \
f'Expected wood, got {r.label} (distances={r.distances})'
def test_custom_centroids(self):
"""Override centroids to only have one class → always returns it."""
custom = {'myfloor': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]}
r = classify_floor_patch(_grey_bgr(), class_centroids=custom)
assert r.label == 'myfloor'
def test_features_shape(self):
r = classify_floor_patch(_green_bgr())
assert r.features.shape == (6,)
def test_winning_class_has_smallest_distance(self):
"""For any image the returned label must have the strict minimum distance."""
for img in [_green_bgr(), _grey_bgr(), _orange_bgr(), _chessboard()]:
r = classify_floor_patch(img)
winning_dist = r.distances[r.label]
for lbl, d in r.distances.items():
if lbl != r.label:
assert winning_dist <= d, \
f'{r.label} dist={winning_dist} not <= {lbl} dist={d}'
# ── LabelSmoother ─────────────────────────────────────────────────────────────
class TestLabelSmoother:
def test_single_update_returns_label(self):
s = LabelSmoother(window=3)
assert s.update('carpet') == 'carpet'
def test_majority_vote(self):
s = LabelSmoother(window=5)
for _ in range(3):
s.update('tile')
for _ in range(2):
s.update('carpet')
assert s.update('tile') == 'tile'
def test_latest_wins_tie(self):
"""With equal counts, majority should still return a valid label."""
s = LabelSmoother(window=4)
s.update('carpet')
s.update('tile')
s.update('carpet')
result = s.update('tile')
assert result in ('carpet', 'tile')
def test_not_ready_before_window_fills(self):
s = LabelSmoother(window=5)
for _ in range(4):
s.update('carpet')
assert not s.ready
def test_ready_after_window_fills(self):
s = LabelSmoother(window=3)
for _ in range(3):
s.update('wood')
assert s.ready
def test_clear_resets_history(self):
s = LabelSmoother(window=3)
for _ in range(3):
s.update('concrete')
s.clear()
assert not s.ready
assert s.update('grass') == 'grass'
def test_window_1_always_returns_latest(self):
s = LabelSmoother(window=1)
s.update('carpet')
assert s.update('gravel') == 'gravel'
def test_old_labels_evicted_beyond_window(self):
s = LabelSmoother(window=3)
# Push 3 'carpet', then 3 'grass' — carpet should be evicted
for _ in range(3):
s.update('carpet')
for _ in range(2):
s.update('grass')
result = s.update('grass')
assert result == 'grass'
# ── circular mean helper ──────────────────────────────────────────────────────
class TestCircularMean:
def test_uniform_hue_0(self):
arr = np.zeros((10, 10))
assert _circular_mean(arr) == pytest.approx(0.0, abs=1e-6)
def test_uniform_hue_half(self):
arr = np.full((10, 10), 0.5)
assert _circular_mean(arr) == pytest.approx(0.5, abs=1e-6)
def test_opposite_hues_cancel(self):
"""Mean of 0.0 and 0.5 (opposite ends of circle) is ambiguous but finite."""
arr = np.array([0.0, 0.5])
result = _circular_mean(arr)
assert math.isfinite(result)
def test_result_in_0_1(self):
rng = np.random.default_rng(0)
arr = rng.uniform(0, 1, size=(20, 20))
result = _circular_mean(arr)
assert 0.0 <= result <= 1.0
if __name__ == '__main__':
pytest.main([__file__, '-v'])