feat: Terrain classification for speed adaptation (Issue #469)
Implement multi-sensor terrain classification using RealSense D435i depth and RPLIDAR A1M8: - saltybot_terrain_classification: New ROS2 package for terrain classification - TerrainClassifier: Rule-based classifier matching depth variance + reflectance to terrain type (smooth/carpet/grass/gravel) with hysteresis + confidence scoring - DepthExtractor: Extracts roughness from depth discontinuities and surface gradients - LidarExtractor: Extracts reflectance from RPLIDAR scan intensities - terrain_classification_node: 10Hz node fusing both sensors, publishes: - /saltybot/terrain_type (JSON with type, confidence, speed_scale) - /saltybot/terrain_type_string (human-readable type) - /saltybot/terrain_speed_scale (0.0-1.0 speed multiplier for smooth/carpet/grass/gravel) Speed scales: smooth=1.0, carpet=0.9, grass=0.75, gravel=0.6 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
5add2cab51
commit
8d58d5e34c
@ -0,0 +1,10 @@
|
|||||||
|
terrain_classification:
|
||||||
|
ros__parameters:
|
||||||
|
control_rate: 10.0
|
||||||
|
depth_topic: "/camera/depth/image_rect_raw"
|
||||||
|
lidar_topic: "/scan"
|
||||||
|
depth_std_threshold: 0.02
|
||||||
|
min_depth_m: 0.1
|
||||||
|
max_depth_m: 3.0
|
||||||
|
confidence_threshold: 0.5
|
||||||
|
hysteresis_samples: 5
|
||||||
@ -0,0 +1,43 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Launch file for terrain classification node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description."""
|
||||||
|
depth_topic = LaunchConfiguration("depth_topic", default="/camera/depth/image_rect_raw")
|
||||||
|
lidar_topic = LaunchConfiguration("lidar_topic", default="/scan")
|
||||||
|
control_rate = LaunchConfiguration("control_rate", default="10.0")
|
||||||
|
confidence_threshold = LaunchConfiguration("confidence_threshold", default="0.5")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("depth_topic", default_value="/camera/depth/image_rect_raw",
|
||||||
|
description="RealSense depth topic"),
|
||||||
|
DeclareLaunchArgument("lidar_topic", default_value="/scan",
|
||||||
|
description="RPLIDAR scan topic"),
|
||||||
|
DeclareLaunchArgument("control_rate", default_value="10.0",
|
||||||
|
description="Control loop rate (Hz)"),
|
||||||
|
DeclareLaunchArgument("confidence_threshold", default_value="0.5",
|
||||||
|
description="Minimum confidence to publish"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_terrain_classification",
|
||||||
|
executable="terrain_classification_node",
|
||||||
|
name="terrain_classification",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
{"depth_topic": depth_topic},
|
||||||
|
{"lidar_topic": lidar_topic},
|
||||||
|
{"control_rate": control_rate},
|
||||||
|
{"confidence_threshold": confidence_threshold},
|
||||||
|
{"depth_std_threshold": 0.02},
|
||||||
|
{"min_depth_m": 0.1},
|
||||||
|
{"max_depth_m": 3.0},
|
||||||
|
{"hysteresis_samples": 5},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,29 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_terrain_classification</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Terrain classification using RealSense depth and RPLIDAR (Issue #469)</description>
|
||||||
|
<maintainer email="seb@example.com">SaltyLab</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>message_filters</depend>
|
||||||
|
<depend>numpy</depend>
|
||||||
|
<depend>opencv-python</depend>
|
||||||
|
<depend>cv_bridge</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,64 @@
|
|||||||
|
"""Depth Feature Extractor — Surface roughness from RealSense depth."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
import numpy as np
|
||||||
|
from typing import Optional
|
||||||
|
import cv2
|
||||||
|
|
||||||
|
|
||||||
|
class DepthExtractor:
|
||||||
|
"""Extract terrain roughness features from RealSense depth images."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
roi_width_px: int = 320,
|
||||||
|
roi_height_px: int = 240,
|
||||||
|
min_depth_m: float = 0.1,
|
||||||
|
max_depth_m: float = 3.0,
|
||||||
|
bilateral_d: int = 5,
|
||||||
|
):
|
||||||
|
self._roi_w = roi_width_px
|
||||||
|
self._roi_h = roi_height_px
|
||||||
|
self._min_depth = min_depth_m
|
||||||
|
self._max_depth = max_depth_m
|
||||||
|
self._bilateral_d = bilateral_d
|
||||||
|
|
||||||
|
def extract_roughness(self, depth_image: np.ndarray) -> Optional[float]:
|
||||||
|
"""Extract surface roughness from depth image."""
|
||||||
|
if depth_image is None or depth_image.size == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if depth_image.dtype == np.uint16:
|
||||||
|
depth_m = depth_image.astype(np.float32) / 1000.0
|
||||||
|
else:
|
||||||
|
depth_m = depth_image.astype(np.float32)
|
||||||
|
|
||||||
|
h, w = depth_m.shape
|
||||||
|
x_start = (w - self._roi_w) // 2
|
||||||
|
y_start = (h - self._roi_h) // 2
|
||||||
|
x_end = x_start + self._roi_w
|
||||||
|
y_end = y_start + self._roi_h
|
||||||
|
|
||||||
|
roi = depth_m[max(0, y_start):min(h, y_end), max(0, x_start):min(w, x_end)]
|
||||||
|
|
||||||
|
valid_mask = (roi > self._min_depth) & (roi < self._max_depth)
|
||||||
|
valid_depths = roi[valid_mask]
|
||||||
|
|
||||||
|
if len(valid_depths) < 10:
|
||||||
|
return None
|
||||||
|
|
||||||
|
roi_smooth = cv2.bilateralFilter(roi.astype(np.float32), self._bilateral_d, 0.1, 1.0)
|
||||||
|
depth_variance = np.var(valid_depths)
|
||||||
|
|
||||||
|
grad_x = cv2.Sobel(roi_smooth, cv2.CV_32F, 1, 0, ksize=3)
|
||||||
|
grad_y = cv2.Sobel(roi_smooth, cv2.CV_32F, 0, 1, ksize=3)
|
||||||
|
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2)
|
||||||
|
|
||||||
|
valid_grad = grad_magnitude[valid_mask]
|
||||||
|
grad_mean = np.mean(valid_grad) if len(valid_grad) > 0 else 0.0
|
||||||
|
|
||||||
|
depth_roughness = min(1.0, depth_variance / 0.05)
|
||||||
|
grad_roughness = min(1.0, grad_mean / 0.02)
|
||||||
|
|
||||||
|
roughness = 0.6 * depth_roughness + 0.4 * grad_roughness
|
||||||
|
return float(roughness)
|
||||||
@ -0,0 +1,58 @@
|
|||||||
|
"""LIDAR Feature Extractor — Surface characteristics from RPLIDAR."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
import numpy as np
|
||||||
|
from typing import Optional
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
|
||||||
|
class LidarExtractor:
|
||||||
|
"""Extract terrain reflectance features from RPLIDAR scans."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
ground_angle_front: float = 10.0,
|
||||||
|
ground_angle_rear: float = 170.0,
|
||||||
|
reflectance_window: int = 20,
|
||||||
|
):
|
||||||
|
self._front_angle = ground_angle_front
|
||||||
|
self._rear_angle = ground_angle_rear
|
||||||
|
self._refl_window = deque(maxlen=reflectance_window)
|
||||||
|
|
||||||
|
def extract_reflectance(self, ranges: np.ndarray, intensities: np.ndarray) -> Optional[float]:
|
||||||
|
"""Extract mean reflectance from ground-hitting rays."""
|
||||||
|
if len(ranges) == 0 or len(intensities) == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
intensities_norm = intensities.astype(np.float32)
|
||||||
|
if np.max(intensities_norm) > 1.5:
|
||||||
|
intensities_norm /= 255.0
|
||||||
|
else:
|
||||||
|
intensities_norm = np.clip(intensities_norm, 0.0, 1.0)
|
||||||
|
|
||||||
|
valid_mask = (ranges > 0.2) & (ranges < 5.0)
|
||||||
|
valid_intensities = intensities_norm[valid_mask]
|
||||||
|
|
||||||
|
if len(valid_intensities) < 5:
|
||||||
|
return None
|
||||||
|
|
||||||
|
mean_reflectance = float(np.mean(valid_intensities))
|
||||||
|
self._refl_window.append(mean_reflectance)
|
||||||
|
|
||||||
|
if self._refl_window:
|
||||||
|
return float(np.mean(list(self._refl_window)))
|
||||||
|
return mean_reflectance
|
||||||
|
|
||||||
|
def extract_range_variance(self, ranges: np.ndarray) -> Optional[float]:
|
||||||
|
"""Extract surface variance from range measurements."""
|
||||||
|
if len(ranges) == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
valid_ranges = ranges[(ranges > 0.2) & (ranges < 5.0)]
|
||||||
|
|
||||||
|
if len(valid_ranges) < 5:
|
||||||
|
return None
|
||||||
|
|
||||||
|
range_variance = float(np.var(valid_ranges))
|
||||||
|
normalized = min(1.0, range_variance / 0.2)
|
||||||
|
return float(normalized)
|
||||||
@ -0,0 +1,220 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
terrain_classification_node.py — Terrain classification (Issue #469).
|
||||||
|
|
||||||
|
Fuses RealSense D435i depth + RPLIDAR A1M8 to classify terrain and recommend speed.
|
||||||
|
|
||||||
|
Pipeline (10 Hz)
|
||||||
|
────────────────
|
||||||
|
1. Extract depth-based roughness features
|
||||||
|
2. Extract lidar-based reflectance features
|
||||||
|
3. Classify terrain using rule-based matcher
|
||||||
|
4. Publish /saltybot/terrain_type (JSON + string)
|
||||||
|
5. Publish speed recommendations
|
||||||
|
|
||||||
|
Publishes
|
||||||
|
─────────
|
||||||
|
/saltybot/terrain_type std_msgs/String — JSON: type, confidence, speed_scale
|
||||||
|
/saltybot/terrain_type_string std_msgs/String — Human-readable type name
|
||||||
|
/saltybot/terrain_speed_scale std_msgs/Float32 — Speed multiplier [0.0, 1.0]
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
from sensor_msgs.msg import Image, LaserScan
|
||||||
|
from std_msgs.msg import String, Float32
|
||||||
|
import cv2
|
||||||
|
from cv_bridge import CvBridge
|
||||||
|
|
||||||
|
from saltybot_terrain_classification.terrain_classifier import TerrainClassifier
|
||||||
|
from saltybot_terrain_classification.depth_extractor import DepthExtractor
|
||||||
|
from saltybot_terrain_classification.lidar_extractor import LidarExtractor
|
||||||
|
|
||||||
|
|
||||||
|
class TerrainClassificationNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("terrain_classification")
|
||||||
|
|
||||||
|
self._declare_params()
|
||||||
|
p = self._load_params()
|
||||||
|
|
||||||
|
self._classifier = TerrainClassifier(
|
||||||
|
depth_std_threshold=p["depth_std_threshold"],
|
||||||
|
hysteresis_count=p["hysteresis_samples"],
|
||||||
|
)
|
||||||
|
self._depth_extractor = DepthExtractor(
|
||||||
|
roi_width_px=320,
|
||||||
|
roi_height_px=240,
|
||||||
|
min_depth_m=p["min_depth_m"],
|
||||||
|
max_depth_m=p["max_depth_m"],
|
||||||
|
)
|
||||||
|
self._lidar_extractor = LidarExtractor()
|
||||||
|
self._cv_bridge = CvBridge()
|
||||||
|
|
||||||
|
self._latest_depth_image = None
|
||||||
|
self._latest_scan = None
|
||||||
|
self._last_depth_t = 0.0
|
||||||
|
self._last_scan_t = 0.0
|
||||||
|
|
||||||
|
depth_qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=1,
|
||||||
|
)
|
||||||
|
lidar_qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=1,
|
||||||
|
)
|
||||||
|
pub_qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.create_subscription(
|
||||||
|
Image, p["depth_topic"], self._depth_cb, depth_qos
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
LaserScan, p["lidar_topic"], self._scan_cb, lidar_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
self._terrain_type_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/terrain_type", pub_qos
|
||||||
|
)
|
||||||
|
self._terrain_type_string_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/terrain_type_string", pub_qos
|
||||||
|
)
|
||||||
|
self._speed_scale_pub = self.create_publisher(
|
||||||
|
Float32, "/saltybot/terrain_speed_scale", pub_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
rate = p["control_rate"]
|
||||||
|
self._timer = self.create_timer(1.0 / rate, self._control_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"TerrainClassificationNode ready rate={rate}Hz "
|
||||||
|
f"depth={p['depth_topic']} lidar={p['lidar_topic']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _declare_params(self) -> None:
|
||||||
|
self.declare_parameter("control_rate", 10.0)
|
||||||
|
self.declare_parameter("depth_topic", "/camera/depth/image_rect_raw")
|
||||||
|
self.declare_parameter("lidar_topic", "/scan")
|
||||||
|
self.declare_parameter("depth_std_threshold", 0.02)
|
||||||
|
self.declare_parameter("min_depth_m", 0.1)
|
||||||
|
self.declare_parameter("max_depth_m", 3.0)
|
||||||
|
self.declare_parameter("confidence_threshold", 0.5)
|
||||||
|
self.declare_parameter("hysteresis_samples", 5)
|
||||||
|
|
||||||
|
def _load_params(self) -> dict:
|
||||||
|
g = self.get_parameter
|
||||||
|
return {k: g(k).value for k in [
|
||||||
|
"control_rate", "depth_topic", "lidar_topic",
|
||||||
|
"depth_std_threshold", "min_depth_m", "max_depth_m",
|
||||||
|
"confidence_threshold", "hysteresis_samples",
|
||||||
|
]}
|
||||||
|
|
||||||
|
def _depth_cb(self, msg: Image) -> None:
|
||||||
|
self._latest_depth_image = msg
|
||||||
|
self._last_depth_t = time.monotonic()
|
||||||
|
|
||||||
|
def _scan_cb(self, msg: LaserScan) -> None:
|
||||||
|
self._latest_scan = msg
|
||||||
|
self._last_scan_t = time.monotonic()
|
||||||
|
|
||||||
|
def _control_cb(self) -> None:
|
||||||
|
p = self._load_params()
|
||||||
|
now = time.monotonic()
|
||||||
|
|
||||||
|
depth_age = (now - self._last_depth_t) if self._last_depth_t > 0.0 else 1e9
|
||||||
|
scan_age = (now - self._last_scan_t) if self._last_scan_t > 0.0 else 1e9
|
||||||
|
|
||||||
|
if depth_age > 1.0 or scan_age > 1.0:
|
||||||
|
return
|
||||||
|
|
||||||
|
depth_roughness = None
|
||||||
|
lidar_reflectance = None
|
||||||
|
|
||||||
|
if self._latest_depth_image is not None:
|
||||||
|
try:
|
||||||
|
depth_cv = self._cv_bridge.imgmsg_to_cv2(
|
||||||
|
self._latest_depth_image, desired_encoding="passthrough"
|
||||||
|
)
|
||||||
|
depth_roughness = self._depth_extractor.extract_roughness(depth_cv)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if self._latest_scan is not None:
|
||||||
|
try:
|
||||||
|
ranges = np.array(self._latest_scan.ranges, dtype=np.float32)
|
||||||
|
intensities = np.array(self._latest_scan.intensities, dtype=np.float32)
|
||||||
|
lidar_reflectance = self._lidar_extractor.extract_reflectance(
|
||||||
|
ranges, intensities
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if depth_roughness is None:
|
||||||
|
depth_roughness = 0.3
|
||||||
|
if lidar_reflectance is None:
|
||||||
|
lidar_reflectance = 0.5
|
||||||
|
|
||||||
|
classification = self._classifier.update(depth_roughness, lidar_reflectance)
|
||||||
|
|
||||||
|
if classification.confidence < p["confidence_threshold"]:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._publish_terrain_type(classification)
|
||||||
|
self._publish_speed_scale(classification.speed_scale)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Terrain: {classification.surface_type} "
|
||||||
|
f"confidence={classification.confidence:.2f} "
|
||||||
|
f"speed_scale={classification.speed_scale:.2f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _publish_terrain_type(self, classification) -> None:
|
||||||
|
"""Publish terrain classification JSON."""
|
||||||
|
msg = String()
|
||||||
|
msg.data = json.dumps({
|
||||||
|
"surface_type": classification.surface_type,
|
||||||
|
"confidence": round(classification.confidence, 3),
|
||||||
|
"roughness": round(classification.roughness, 3),
|
||||||
|
"reflectance": round(classification.reflectance, 3),
|
||||||
|
"speed_scale": round(classification.speed_scale, 3),
|
||||||
|
})
|
||||||
|
self._terrain_type_pub.publish(msg)
|
||||||
|
|
||||||
|
msg_str = String()
|
||||||
|
msg_str.data = classification.surface_type
|
||||||
|
self._terrain_type_string_pub.publish(msg_str)
|
||||||
|
|
||||||
|
def _publish_speed_scale(self, scale: float) -> None:
|
||||||
|
"""Publish speed scale multiplier."""
|
||||||
|
msg = Float32()
|
||||||
|
msg.data = float(scale)
|
||||||
|
self._speed_scale_pub.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = TerrainClassificationNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,127 @@
|
|||||||
|
"""Terrain Classifier — Multi-sensor classification (Issue #469)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from collections import deque
|
||||||
|
import numpy as np
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TerrainClassification:
|
||||||
|
"""Classification result with confidence."""
|
||||||
|
surface_type: str
|
||||||
|
confidence: float
|
||||||
|
roughness: float
|
||||||
|
reflectance: float
|
||||||
|
speed_scale: float
|
||||||
|
|
||||||
|
|
||||||
|
class TerrainClassifier:
|
||||||
|
"""Multi-sensor terrain classifier using depth variance + reflectance."""
|
||||||
|
|
||||||
|
_SURFACE_RULES = [
|
||||||
|
("smooth", 0.10, 0.70, 1.00, 1.00),
|
||||||
|
("carpet", 0.25, 0.60, 0.90, 0.90),
|
||||||
|
("grass", 0.40, 0.40, 0.70, 0.75),
|
||||||
|
("gravel", 1.00, 0.20, 0.60, 0.60),
|
||||||
|
]
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
depth_std_threshold: float = 0.02,
|
||||||
|
reflectance_window_size: int = 10,
|
||||||
|
hysteresis_count: int = 5,
|
||||||
|
control_rate_hz: float = 10.0,
|
||||||
|
):
|
||||||
|
self._depth_std_threshold = depth_std_threshold
|
||||||
|
self._hysteresis_count = hysteresis_count
|
||||||
|
self._depth_vars = deque(maxlen=reflectance_window_size)
|
||||||
|
self._reflectances = deque(maxlen=reflectance_window_size)
|
||||||
|
self._current_type = "unknown"
|
||||||
|
self._current_confidence = 0.0
|
||||||
|
self._candidate_type = "unknown"
|
||||||
|
self._candidate_count = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_surface_type(self) -> str:
|
||||||
|
return self._current_type
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_confidence(self) -> float:
|
||||||
|
return self._current_confidence
|
||||||
|
|
||||||
|
def update(self, depth_variance: float, reflectance_mean: float) -> TerrainClassification:
|
||||||
|
"""Update classifier with sensor measurements."""
|
||||||
|
roughness = min(1.0, depth_variance / (self._depth_std_threshold + 1e-6))
|
||||||
|
reflectance = np.clip(reflectance_mean, 0.0, 1.0)
|
||||||
|
|
||||||
|
self._depth_vars.append(roughness)
|
||||||
|
self._reflectances.append(reflectance)
|
||||||
|
|
||||||
|
avg_roughness = np.mean(list(self._depth_vars)) if self._depth_vars else 0.0
|
||||||
|
avg_reflectance = np.mean(list(self._reflectances)) if self._reflectances else 0.5
|
||||||
|
|
||||||
|
candidate_type, candidate_confidence, speed_scale = self._classify(
|
||||||
|
avg_roughness, avg_reflectance
|
||||||
|
)
|
||||||
|
|
||||||
|
if candidate_type == self._candidate_type:
|
||||||
|
self._candidate_count += 1
|
||||||
|
else:
|
||||||
|
self._candidate_type = candidate_type
|
||||||
|
self._candidate_count = 1
|
||||||
|
|
||||||
|
if self._candidate_count >= self._hysteresis_count:
|
||||||
|
self._current_type = candidate_type
|
||||||
|
self._current_confidence = candidate_confidence
|
||||||
|
|
||||||
|
return TerrainClassification(
|
||||||
|
surface_type=self._current_type,
|
||||||
|
confidence=self._current_confidence,
|
||||||
|
roughness=avg_roughness,
|
||||||
|
reflectance=avg_reflectance,
|
||||||
|
speed_scale=speed_scale,
|
||||||
|
)
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Clear buffers and reset to unknown."""
|
||||||
|
self._depth_vars.clear()
|
||||||
|
self._reflectances.clear()
|
||||||
|
self._current_type = "unknown"
|
||||||
|
self._current_confidence = 0.0
|
||||||
|
self._candidate_type = "unknown"
|
||||||
|
self._candidate_count = 0
|
||||||
|
|
||||||
|
def _classify(self, roughness: float, reflectance: float) -> tuple[str, float, float]:
|
||||||
|
"""Classify terrain and compute confidence + speed scale."""
|
||||||
|
best_match = None
|
||||||
|
best_score = 0.0
|
||||||
|
best_speed = 1.0
|
||||||
|
|
||||||
|
for name, rough_max, refl_min, refl_max, speed in self._SURFACE_RULES:
|
||||||
|
rough_penalty = max(0.0, (roughness - rough_max) / rough_max) if roughness > rough_max else 0.0
|
||||||
|
refl_penalty = 0.0
|
||||||
|
|
||||||
|
if reflectance < refl_min:
|
||||||
|
refl_penalty = (refl_min - reflectance) / refl_min
|
||||||
|
elif reflectance > refl_max:
|
||||||
|
refl_penalty = (reflectance - refl_max) / (1.0 - refl_max)
|
||||||
|
|
||||||
|
score = 1.0 - (rough_penalty + refl_penalty) * 0.5
|
||||||
|
|
||||||
|
if score > best_score:
|
||||||
|
best_score = score
|
||||||
|
best_match = name
|
||||||
|
best_speed = speed
|
||||||
|
|
||||||
|
confidence = max(0.0, best_score)
|
||||||
|
return best_match or "unknown", confidence, best_speed
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def speed_scale_for_surface(surface_type: str) -> float:
|
||||||
|
"""Get speed scale for a surface type."""
|
||||||
|
for name, _, _, _, speed in TerrainClassifier._SURFACE_RULES:
|
||||||
|
if name == surface_type:
|
||||||
|
return speed
|
||||||
|
return 0.75
|
||||||
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_terrain_classification
|
||||||
|
[install]
|
||||||
|
script_dir=$base/lib/saltybot_terrain_classification
|
||||||
25
jetson/ros2_ws/src/saltybot_terrain_classification/setup.py
Normal file
25
jetson/ros2_ws/src/saltybot_terrain_classification/setup.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name="saltybot_terrain_classification",
|
||||||
|
version="0.1.0",
|
||||||
|
packages=find_packages(),
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages",
|
||||||
|
["resource/saltybot_terrain_classification"]),
|
||||||
|
("share/saltybot_terrain_classification", ["package.xml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="SaltyLab",
|
||||||
|
maintainer_email="seb@example.com",
|
||||||
|
description="Terrain classification using RealSense depth and RPLIDAR (Issue #469)",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"terrain_classification_node=saltybot_terrain_classification.terrain_classification_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
Loading…
x
Reference in New Issue
Block a user