Merge pull request 'feat(#142): terrain adaptation — surface detection + dynamic speed/PID/bias' (#154) from sl-controls/issue-142-terrain into main
This commit is contained in:
commit
33007fb5ed
@ -0,0 +1,41 @@
|
||||
/**:
|
||||
ros__parameters:
|
||||
# Control loop rate (Hz)
|
||||
control_rate: 50.0
|
||||
|
||||
# Odometry topic for slip detection
|
||||
odom_topic: "/saltybot/rover_odom"
|
||||
|
||||
# ── TerrainAnalyzer ────────────────────────────────────────────────────────
|
||||
# RMS window length (seconds)
|
||||
vibration_window_s: 0.5
|
||||
# Normalisation ceiling for roughness score (m/s²)
|
||||
roughness_max_ms2: 3.0
|
||||
# Hysteresis: samples required before surface-type changes
|
||||
hysteresis_count: 5
|
||||
|
||||
# ── InclineDetector ────────────────────────────────────────────────────────
|
||||
# Low-pass filter time constant for incline (seconds)
|
||||
incline_lpf_tau_s: 1.5
|
||||
# Incline at which incline_scale reaches minimum (~30°)
|
||||
max_incline_rad: 0.52
|
||||
|
||||
# ── SlipDetector ───────────────────────────────────────────────────────────
|
||||
# Slip ratio above which is_slipping flag is raised
|
||||
slip_threshold: 0.30
|
||||
# Minimum commanded speed before slip detection is active (m/s)
|
||||
slip_min_speed_ms: 0.15
|
||||
# EMA smoothing factor for slip ratio
|
||||
slip_smoothing_alpha: 0.20
|
||||
|
||||
# ── TerrainAdapter ─────────────────────────────────────────────────────────
|
||||
# Incline-to-motor-bias gain (multiplied by sin(incline_rad))
|
||||
k_bias: 1.0
|
||||
# Speed scale floor — never command below this fraction of max speed
|
||||
min_speed_scale: 0.15
|
||||
|
||||
# ── Misc ───────────────────────────────────────────────────────────────────
|
||||
# Rolling history buffer size
|
||||
history_max_entries: 200
|
||||
# IMU silence timeout before safe-defaults are published (seconds)
|
||||
imu_timeout_s: 0.5
|
||||
@ -0,0 +1,53 @@
|
||||
"""
|
||||
terrain_adaptation.launch.py — Launch the terrain adaptation node (Issue #142).
|
||||
|
||||
Usage
|
||||
-----
|
||||
ros2 launch saltybot_terrain_adaptation terrain_adaptation.launch.py
|
||||
ros2 launch saltybot_terrain_adaptation terrain_adaptation.launch.py \
|
||||
odom_topic:=/saltybot/rover_odom control_rate:=50.0
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_share = get_package_share_directory("saltybot_terrain_adaptation")
|
||||
default_params = os.path.join(pkg_share, "config", "terrain_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument(
|
||||
"params_file",
|
||||
default_value=default_params,
|
||||
description="Path to terrain_params.yaml",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"odom_topic",
|
||||
default_value="/saltybot/rover_odom",
|
||||
description="Odometry topic for wheel-slip detection",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"control_rate",
|
||||
default_value="50.0",
|
||||
description="Control loop rate (Hz)",
|
||||
),
|
||||
Node(
|
||||
package="saltybot_terrain_adaptation",
|
||||
executable="terrain_adaptation_node",
|
||||
name="terrain_adaptation",
|
||||
output="screen",
|
||||
parameters=[
|
||||
LaunchConfiguration("params_file"),
|
||||
{
|
||||
"odom_topic": LaunchConfiguration("odom_topic"),
|
||||
"control_rate": LaunchConfiguration("control_rate"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
24
jetson/ros2_ws/src/saltybot_terrain_adaptation/package.xml
Normal file
24
jetson/ros2_ws/src/saltybot_terrain_adaptation/package.xml
Normal file
@ -0,0 +1,24 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_terrain_adaptation</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Surface detection and dynamic terrain adaptation for SaltyBot (Issue #142)</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,106 @@
|
||||
"""
|
||||
incline_detector.py — Terrain incline detection from IMU pitch (Issue #142).
|
||||
|
||||
Method
|
||||
──────
|
||||
For a ground robot (rover/tank), the vehicle body tilts with the terrain,
|
||||
so IMU pitch ≈ terrain incline + vibration noise.
|
||||
|
||||
A first-order discrete low-pass filter separates the slow incline component
|
||||
from fast vibrations and balance corrections:
|
||||
|
||||
incline_filtered[n] = (1−α) · incline_filtered[n−1] + α · pitch_raw[n]
|
||||
α = dt / (τ + dt)
|
||||
|
||||
The motor bias required to compensate for gravity on the incline is:
|
||||
|
||||
bias_ms = k_bias · sin(incline_rad)
|
||||
|
||||
Positive incline = uphill (front higher than rear) → positive forward bias
|
||||
needed to counteract gravity pulling the robot backward.
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
|
||||
class InclineDetector:
|
||||
"""
|
||||
Incline estimator via first-order low-pass filter on IMU pitch.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
lpf_tau_s : low-pass filter time constant (s); larger = smoother
|
||||
control_rate_hz : expected update rate (Hz)
|
||||
max_incline_rad : hard clamp on reported incline magnitude (rad)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
lpf_tau_s: float = 1.5,
|
||||
control_rate_hz: float = 50.0,
|
||||
max_incline_rad: float = 0.70, # ≈ 40°
|
||||
):
|
||||
self._tau = max(1e-3, lpf_tau_s)
|
||||
self._dt = 1.0 / max(1.0, control_rate_hz)
|
||||
self._max = abs(max_incline_rad)
|
||||
|
||||
self._incline_rad: float = 0.0
|
||||
self._initialised: bool = False
|
||||
|
||||
# ── Properties ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def incline_rad(self) -> float:
|
||||
return self._incline_rad
|
||||
|
||||
# ── Update ────────────────────────────────────────────────────────────────
|
||||
|
||||
def update(self, pitch_rad: float, dt: float | None = None) -> float:
|
||||
"""
|
||||
Submit one pitch sample and return the filtered incline estimate.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
pitch_rad : IMU pitch angle (rad); +ve = front-up / uphill
|
||||
dt : elapsed time since last sample (s); uses 1/rate if None
|
||||
|
||||
Returns
|
||||
-------
|
||||
Filtered incline in radians, clamped to ±max_incline_rad.
|
||||
"""
|
||||
step = dt if dt is not None and dt > 0.0 else self._dt
|
||||
|
||||
if not self._initialised:
|
||||
# Seed filter with first measurement to avoid large initial transient
|
||||
self._incline_rad = pitch_rad
|
||||
self._initialised = True
|
||||
return max(-self._max, min(self._max, self._incline_rad))
|
||||
|
||||
alpha = step / (self._tau + step)
|
||||
self._incline_rad = (1.0 - alpha) * self._incline_rad + alpha * pitch_rad
|
||||
return max(-self._max, min(self._max, self._incline_rad))
|
||||
|
||||
def reset(self) -> None:
|
||||
self._incline_rad = 0.0
|
||||
self._initialised = False
|
||||
|
||||
# ── Motor bias ────────────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def motor_bias(incline_rad: float, k_bias: float = 1.0) -> float:
|
||||
"""
|
||||
Forward motor bias (m/s) to compensate for gravitational pull on incline.
|
||||
|
||||
Positive bias = add forward thrust for uphill.
|
||||
Negative bias = reduce thrust / add braking for downhill.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
incline_rad : filtered incline angle (rad)
|
||||
k_bias : tuning gain; scale bias magnitude
|
||||
"""
|
||||
return k_bias * math.sin(incline_rad)
|
||||
@ -0,0 +1,91 @@
|
||||
"""
|
||||
slip_detector.py — Wheel slip detection for SaltyBot (Issue #142).
|
||||
|
||||
Method
|
||||
──────
|
||||
Compare commanded speed (from /cmd_vel) to measured speed (from odometry).
|
||||
When the robot is commanded to move but the measured speed falls significantly
|
||||
below the command, the difference is attributed to wheel/track slip.
|
||||
|
||||
slip_ratio = max(0, |v_cmd| − |v_actual|) / |v_cmd|
|
||||
|
||||
slip_ratio = 0.0 → no slip (actual matches commanded)
|
||||
slip_ratio = 1.0 → full slip (wheels spinning, no forward motion)
|
||||
|
||||
Detection is suppressed below `min_speed_ms` to avoid noise at low speed.
|
||||
|
||||
A smoothed slip estimate is maintained via exponential moving average
|
||||
(alpha per update) to reduce transient spikes.
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
class SlipDetector:
|
||||
"""
|
||||
Wheel/track slip detector from commanded vs measured speed.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
slip_threshold : slip_ratio above which is_slipping = True
|
||||
min_speed_ms : minimum |v_cmd| to attempt detection (avoids noise)
|
||||
smoothing_alpha : EMA coefficient ∈ (0, 1]; 1 = no smoothing
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
slip_threshold: float = 0.30,
|
||||
min_speed_ms: float = 0.15,
|
||||
smoothing_alpha: float = 0.20,
|
||||
):
|
||||
self._threshold = max(0.0, min(1.0, slip_threshold))
|
||||
self._min_speed = abs(min_speed_ms)
|
||||
self._alpha = max(1e-6, min(1.0, smoothing_alpha))
|
||||
self._smoothed: float = 0.0
|
||||
|
||||
# ── Properties ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def smoothed_slip(self) -> float:
|
||||
"""EMA-smoothed slip ratio."""
|
||||
return self._smoothed
|
||||
|
||||
# ── Update ────────────────────────────────────────────────────────────────
|
||||
|
||||
def update(
|
||||
self,
|
||||
cmd_speed_ms: float,
|
||||
actual_speed_ms: float,
|
||||
) -> tuple[float, bool]:
|
||||
"""
|
||||
Compute slip from one speed pair.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cmd_speed_ms : commanded forward speed (m/s); signed
|
||||
actual_speed_ms : measured forward speed (m/s); signed (from odom)
|
||||
|
||||
Returns
|
||||
-------
|
||||
(slip_ratio, is_slipping)
|
||||
slip_ratio : 0.0 – 1.0 (instantaneous, unsmoothed)
|
||||
is_slipping : True if smoothed slip exceeds threshold
|
||||
"""
|
||||
abs_cmd = abs(cmd_speed_ms)
|
||||
|
||||
if abs_cmd < self._min_speed:
|
||||
# Decay smoothed slip toward zero when not commanding movement
|
||||
self._smoothed = (1.0 - self._alpha) * self._smoothed
|
||||
return 0.0, False
|
||||
|
||||
abs_actual = abs(actual_speed_ms)
|
||||
raw_slip = max(0.0, min(1.0, (abs_cmd - abs_actual) / abs_cmd))
|
||||
|
||||
self._smoothed = (1.0 - self._alpha) * self._smoothed + self._alpha * raw_slip
|
||||
|
||||
return raw_slip, self._smoothed > self._threshold
|
||||
|
||||
def reset(self) -> None:
|
||||
self._smoothed = 0.0
|
||||
@ -0,0 +1,346 @@
|
||||
"""
|
||||
terrain_adaptation_node.py — Surface detection and dynamic adaptation (Issue #142).
|
||||
|
||||
Overview
|
||||
────────
|
||||
Analyses IMU vibration to classify the terrain underfoot, detects incline
|
||||
and wheel slip, then publishes advisory outputs that other nodes use to
|
||||
adapt speed limits, PID aggressiveness, and motor bias.
|
||||
|
||||
Pipeline (50 Hz)
|
||||
────────────────
|
||||
1. IMU acc → TerrainAnalyzer → roughness, surface_type, grip
|
||||
2. IMU pitch (quaternion) → InclineDetector → incline_rad
|
||||
3. cmd_vel + odom → SlipDetector → slip_ratio
|
||||
4. (roughness, incline, grip, slip) → TerrainAdapter → speed_scale,
|
||||
pid_scales, motor_bias
|
||||
5. Publish Terrain msg + advisory scalars + history log
|
||||
|
||||
Subscribes
|
||||
──────────
|
||||
/saltybot/imu sensor_msgs/Imu — vibration + pitch
|
||||
<odom_topic> nav_msgs/Odometry — actual speed for slip
|
||||
/cmd_vel geometry_msgs/Twist — commanded speed for slip
|
||||
|
||||
Publishes
|
||||
─────────
|
||||
/saltybot/terrain saltybot_terrain_msgs/Terrain — full terrain state
|
||||
/saltybot/terrain_speed_scale std_msgs/Float32 — speed multiplier
|
||||
/saltybot/terrain_pid_scales std_msgs/String (JSON) — kp/ki/kd scales
|
||||
/saltybot/terrain_motor_bias std_msgs/Float32 — incline compensation
|
||||
/saltybot/terrain_history std_msgs/String (JSON) — change log
|
||||
|
||||
Parameters
|
||||
──────────
|
||||
control_rate 50.0 Hz
|
||||
odom_topic /saltybot/rover_odom
|
||||
vibration_window_s 0.5 s (RMS window)
|
||||
roughness_max_ms2 3.0 m/s² (normalisation ceiling)
|
||||
hysteresis_count 5 samples before class change
|
||||
incline_lpf_tau_s 1.5 s (incline low-pass time constant)
|
||||
max_incline_rad 0.52 rad (≈30°)
|
||||
slip_threshold 0.30 (slip flag threshold)
|
||||
slip_min_speed_ms 0.15 m/s (ignore slip below this speed)
|
||||
slip_smoothing_alpha 0.20
|
||||
k_bias 1.0 (incline motor bias gain)
|
||||
min_speed_scale 0.15 (speed_scale floor)
|
||||
history_max_entries 200
|
||||
imu_timeout_s 0.5 s (zero outputs if IMU goes silent)
|
||||
"""
|
||||
|
||||
import json
|
||||
import math
|
||||
import time
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||
|
||||
from geometry_msgs.msg import Twist
|
||||
from nav_msgs.msg import Odometry
|
||||
from sensor_msgs.msg import Imu
|
||||
from std_msgs.msg import Float32, String
|
||||
|
||||
from saltybot_terrain_adaptation.terrain_analyzer import TerrainAnalyzer
|
||||
from saltybot_terrain_adaptation.slip_detector import SlipDetector
|
||||
from saltybot_terrain_adaptation.incline_detector import InclineDetector
|
||||
from saltybot_terrain_adaptation.terrain_adapter import (
|
||||
TerrainAdapter,
|
||||
TerrainHistory,
|
||||
TerrainState,
|
||||
)
|
||||
|
||||
try:
|
||||
from saltybot_terrain_msgs.msg import Terrain as TerrainMsg
|
||||
_TERRAIN_MSG_OK = True
|
||||
except ImportError:
|
||||
_TERRAIN_MSG_OK = False
|
||||
|
||||
|
||||
def _quaternion_to_pitch(qx: float, qy: float, qz: float, qw: float) -> float:
|
||||
sinp = 2.0 * (qw * qy - qz * qx)
|
||||
return math.asin(max(-1.0, min(1.0, sinp)))
|
||||
|
||||
|
||||
class TerrainAdaptationNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("terrain_adaptation")
|
||||
|
||||
# ── Parameters ───────────────────────────────────────────────────────
|
||||
self._declare_params()
|
||||
p = self._load_params()
|
||||
|
||||
# ── Sub-systems ──────────────────────────────────────────────────────
|
||||
self._analyzer = TerrainAnalyzer(
|
||||
max_variance_ms2=p["roughness_max_ms2"],
|
||||
window_s=p["vibration_window_s"],
|
||||
control_rate_hz=p["control_rate"],
|
||||
hysteresis_count=p["hysteresis_count"],
|
||||
)
|
||||
self._incline = InclineDetector(
|
||||
lpf_tau_s=p["incline_lpf_tau_s"],
|
||||
control_rate_hz=p["control_rate"],
|
||||
max_incline_rad=p["max_incline_rad"],
|
||||
)
|
||||
self._slip = SlipDetector(
|
||||
slip_threshold=p["slip_threshold"],
|
||||
min_speed_ms=p["slip_min_speed_ms"],
|
||||
smoothing_alpha=p["slip_smoothing_alpha"],
|
||||
)
|
||||
self._adapter = TerrainAdapter(
|
||||
max_incline_rad=p["max_incline_rad"],
|
||||
min_speed_scale=p["min_speed_scale"],
|
||||
)
|
||||
self._history = TerrainHistory(max_entries=p["history_max_entries"])
|
||||
|
||||
# ── State ────────────────────────────────────────────────────────────
|
||||
self._latest_imu: Imu | None = None
|
||||
self._actual_speed_ms: float = 0.0
|
||||
self._cmd_speed_ms: float = 0.0
|
||||
self._last_imu_t: float = 0.0
|
||||
self._last_ctrl_t: float = time.monotonic()
|
||||
|
||||
# ── QoS ──────────────────────────────────────────────────────────────
|
||||
reliable = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
best_effort = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=1,
|
||||
)
|
||||
|
||||
# ── Subscriptions ────────────────────────────────────────────────────
|
||||
self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort)
|
||||
self.create_subscription(Odometry, p["odom_topic"], self._odom_cb, reliable)
|
||||
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable)
|
||||
|
||||
# ── Publishers ───────────────────────────────────────────────────────
|
||||
self._terrain_pub = self.create_publisher(String, "/saltybot/terrain", reliable)
|
||||
self._speed_scale_pub = self.create_publisher(Float32, "/saltybot/terrain_speed_scale", reliable)
|
||||
self._pid_scales_pub = self.create_publisher(String, "/saltybot/terrain_pid_scales", reliable)
|
||||
self._motor_bias_pub = self.create_publisher(Float32, "/saltybot/terrain_motor_bias", reliable)
|
||||
self._history_pub = self.create_publisher(String, "/saltybot/terrain_history", reliable)
|
||||
|
||||
# Try to create a typed Terrain publisher if message is available
|
||||
self._typed_terrain_pub = None
|
||||
if _TERRAIN_MSG_OK:
|
||||
self._typed_terrain_pub = self.create_publisher(
|
||||
TerrainMsg, "/saltybot/terrain_typed", reliable
|
||||
)
|
||||
|
||||
# ── Timer ────────────────────────────────────────────────────────────
|
||||
rate = p["control_rate"]
|
||||
self._timer = self.create_timer(1.0 / rate, self._control_cb)
|
||||
|
||||
self.get_logger().info(
|
||||
f"TerrainAdaptationNode ready rate={rate}Hz "
|
||||
f"odom={p['odom_topic']}"
|
||||
)
|
||||
|
||||
# ── Parameter management ─────────────────────────────────────────────────
|
||||
|
||||
def _declare_params(self) -> None:
|
||||
self.declare_parameter("control_rate", 50.0)
|
||||
self.declare_parameter("odom_topic", "/saltybot/rover_odom")
|
||||
self.declare_parameter("vibration_window_s", 0.5)
|
||||
self.declare_parameter("roughness_max_ms2", 3.0)
|
||||
self.declare_parameter("hysteresis_count", 5)
|
||||
self.declare_parameter("incline_lpf_tau_s", 1.5)
|
||||
self.declare_parameter("max_incline_rad", 0.52)
|
||||
self.declare_parameter("slip_threshold", 0.30)
|
||||
self.declare_parameter("slip_min_speed_ms", 0.15)
|
||||
self.declare_parameter("slip_smoothing_alpha", 0.20)
|
||||
self.declare_parameter("k_bias", 1.0)
|
||||
self.declare_parameter("min_speed_scale", 0.15)
|
||||
self.declare_parameter("history_max_entries", 200)
|
||||
self.declare_parameter("imu_timeout_s", 0.5)
|
||||
|
||||
def _load_params(self) -> dict:
|
||||
g = self.get_parameter
|
||||
return {k: g(k).value for k in [
|
||||
"control_rate", "odom_topic",
|
||||
"vibration_window_s", "roughness_max_ms2", "hysteresis_count",
|
||||
"incline_lpf_tau_s", "max_incline_rad",
|
||||
"slip_threshold", "slip_min_speed_ms", "slip_smoothing_alpha",
|
||||
"k_bias", "min_speed_scale", "history_max_entries", "imu_timeout_s",
|
||||
]}
|
||||
|
||||
# ── Subscriptions ─────────────────────────────────────────────────────────
|
||||
|
||||
def _imu_cb(self, msg: Imu) -> None:
|
||||
self._latest_imu = msg
|
||||
self._last_imu_t = time.monotonic()
|
||||
|
||||
def _odom_cb(self, msg: Odometry) -> None:
|
||||
self._actual_speed_ms = msg.twist.twist.linear.x
|
||||
|
||||
def _cmd_vel_cb(self, msg: Twist) -> None:
|
||||
self._cmd_speed_ms = msg.linear.x
|
||||
|
||||
# ── 50 Hz control loop ────────────────────────────────────────────────────
|
||||
|
||||
def _control_cb(self) -> None:
|
||||
p = self._load_params()
|
||||
now = time.monotonic()
|
||||
dt = now - self._last_ctrl_t
|
||||
self._last_ctrl_t = now
|
||||
|
||||
# IMU watchdog
|
||||
imu_age = (now - self._last_imu_t) if self._last_imu_t > 0.0 else 1e9
|
||||
if imu_age > p["imu_timeout_s"] or self._latest_imu is None:
|
||||
self._publish_safe_defaults()
|
||||
return
|
||||
|
||||
imu = self._latest_imu
|
||||
ax = imu.linear_acceleration.x
|
||||
ay = imu.linear_acceleration.y
|
||||
az = imu.linear_acceleration.z
|
||||
q = imu.orientation
|
||||
|
||||
# 1. Vibration analysis
|
||||
analysis = self._analyzer.update(ax, ay, az)
|
||||
roughness = analysis.roughness if analysis else 0.0
|
||||
surf_type = analysis.surface_type if analysis else "unknown"
|
||||
grip = analysis.grip if analysis else 0.6
|
||||
|
||||
# 2. Incline detection
|
||||
pitch = _quaternion_to_pitch(q.x, q.y, q.z, q.w)
|
||||
incline = self._incline.update(pitch, dt)
|
||||
|
||||
# 3. Slip detection
|
||||
slip_ratio, is_slipping = self._slip.update(
|
||||
self._cmd_speed_ms, self._actual_speed_ms
|
||||
)
|
||||
|
||||
# 4. Build terrain state
|
||||
raw_state = TerrainState(
|
||||
surface_type=surf_type,
|
||||
roughness=roughness,
|
||||
incline_rad=incline,
|
||||
grip=grip,
|
||||
slip_ratio=slip_ratio,
|
||||
timestamp_s=now,
|
||||
)
|
||||
state = self._adapter.apply(raw_state)
|
||||
|
||||
# 5. Terrain history + logging
|
||||
type_changed = self._history.add(state)
|
||||
if type_changed:
|
||||
self.get_logger().info(
|
||||
f"Terrain: {surf_type} rough={roughness:.2f} "
|
||||
f"grip={grip:.2f} incline={math.degrees(incline):.1f}° "
|
||||
f"speed_scale={state.speed_scale:.2f}"
|
||||
)
|
||||
if is_slipping:
|
||||
self.get_logger().warn(
|
||||
f"Wheel slip detected: ratio={slip_ratio:.2f} "
|
||||
f"cmd={self._cmd_speed_ms:.2f}m/s actual={self._actual_speed_ms:.2f}m/s"
|
||||
)
|
||||
|
||||
# 6. Publish
|
||||
self._publish_terrain_json(state)
|
||||
self._publish_speed_scale(state.speed_scale)
|
||||
self._publish_pid_scales(self._adapter.pid_scales(state))
|
||||
self._publish_motor_bias(
|
||||
self._adapter.motor_bias(state, k_bias=p["k_bias"])
|
||||
)
|
||||
# Publish history on every type change (not every tick)
|
||||
if type_changed:
|
||||
self._publish_history()
|
||||
|
||||
# Typed message (if saltybot_terrain_msgs is built)
|
||||
if self._typed_terrain_pub is not None and _TERRAIN_MSG_OK:
|
||||
self._publish_typed(state)
|
||||
|
||||
# ── Publishers ────────────────────────────────────────────────────────────
|
||||
|
||||
def _publish_safe_defaults(self) -> None:
|
||||
self._publish_speed_scale(1.0)
|
||||
self._publish_pid_scales({"kp": 1.0, "ki": 1.0, "kd": 1.0})
|
||||
self._publish_motor_bias(0.0)
|
||||
|
||||
def _publish_terrain_json(self, state: TerrainState) -> None:
|
||||
msg = String()
|
||||
msg.data = json.dumps({
|
||||
"surface_type": state.surface_type,
|
||||
"roughness": round(state.roughness, 3),
|
||||
"incline_rad": round(state.incline_rad, 4),
|
||||
"incline_deg": round(math.degrees(state.incline_rad), 2),
|
||||
"grip": round(state.grip, 3),
|
||||
"slip_ratio": round(state.slip_ratio, 3),
|
||||
"speed_scale": round(state.speed_scale, 3),
|
||||
})
|
||||
self._terrain_pub.publish(msg)
|
||||
|
||||
def _publish_speed_scale(self, scale: float) -> None:
|
||||
msg = Float32()
|
||||
msg.data = float(scale)
|
||||
self._speed_scale_pub.publish(msg)
|
||||
|
||||
def _publish_pid_scales(self, scales: dict) -> None:
|
||||
msg = String()
|
||||
msg.data = json.dumps({k: round(v, 4) for k, v in scales.items()})
|
||||
self._pid_scales_pub.publish(msg)
|
||||
|
||||
def _publish_motor_bias(self, bias: float) -> None:
|
||||
msg = Float32()
|
||||
msg.data = float(bias)
|
||||
self._motor_bias_pub.publish(msg)
|
||||
|
||||
def _publish_history(self) -> None:
|
||||
msg = String()
|
||||
msg.data = json.dumps(self._history.to_json_log())
|
||||
self._history_pub.publish(msg)
|
||||
|
||||
def _publish_typed(self, state: TerrainState) -> None:
|
||||
msg = TerrainMsg()
|
||||
msg.stamp = self.get_clock().now().to_msg()
|
||||
msg.surface_type = state.surface_type
|
||||
msg.roughness = float(state.roughness)
|
||||
msg.incline_rad = float(state.incline_rad)
|
||||
msg.grip = float(state.grip)
|
||||
msg.slip_ratio = float(state.slip_ratio)
|
||||
msg.speed_scale = float(state.speed_scale)
|
||||
self._typed_terrain_pub.publish(msg)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = TerrainAdaptationNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,195 @@
|
||||
"""
|
||||
terrain_adapter.py — Speed / PID / bias adaptation from terrain state (Issue #142).
|
||||
|
||||
Provides:
|
||||
TerrainState — typed snapshot of current terrain conditions
|
||||
TerrainAdapter — computes speed_scale, pid_scales, motor_bias
|
||||
TerrainHistory — rolling deque of TerrainState, JSON export for route planning
|
||||
|
||||
Speed scale model
|
||||
─────────────────
|
||||
speed_scale = roughness_scale × incline_scale × slip_scale
|
||||
|
||||
roughness_scale = 1.0 − 0.6 × roughness (0.4 at max roughness)
|
||||
incline_scale = 1.0 − 0.5 × |incline| / max_incline (0.5 at max incline)
|
||||
slip_scale = 1.0 − 0.8 × slip_ratio (0.2 at full slip)
|
||||
|
||||
Result clamped to [min_speed_scale, 1.0].
|
||||
|
||||
PID scale model (grip-based aggressiveness)
|
||||
──────────────────────────────────────────
|
||||
kp_scale = 0.70 + 0.30 × grip (0.70 at no grip, 1.00 at full grip)
|
||||
ki_scale = 0.50 + 0.50 × grip (0.50 at no grip, 1.00 at full grip)
|
||||
kd_scale = 0.80 + 0.20 × grip (0.80 at no grip, 1.00 at full grip)
|
||||
|
||||
On slippery surfaces, reduced P/I prevents oscillation from un-predicted
|
||||
wheel slip. D is kept relatively high to damp any vibration.
|
||||
|
||||
Motor bias
|
||||
──────────
|
||||
See incline_detector.motor_bias() — terrain_adapter re-exports it.
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# ── TerrainState ─────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class TerrainState:
|
||||
surface_type: str = "unknown" # "smooth"|"tile"|"carpet"|"grass"|"gravel"|"unknown"
|
||||
roughness: float = 0.0 # 0.0–1.0
|
||||
incline_rad: float = 0.0 # rad; +ve = uphill
|
||||
grip: float = 0.6 # 0.0–1.0
|
||||
slip_ratio: float = 0.0 # 0.0–1.0
|
||||
speed_scale: float = 1.0 # recommended speed multiplier
|
||||
timestamp_s: float = field(default_factory=time.monotonic)
|
||||
|
||||
|
||||
# ── TerrainAdapter ────────────────────────────────────────────────────────────
|
||||
|
||||
class TerrainAdapter:
|
||||
"""
|
||||
Computes adaptive outputs (speed scale, PID scales, motor bias) from a
|
||||
TerrainState.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
max_incline_rad : incline at which incline_scale reaches minimum
|
||||
min_speed_scale : floor for speed_scale (never zero)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_incline_rad: float = 0.52, # ≈ 30°
|
||||
min_speed_scale: float = 0.15,
|
||||
):
|
||||
self._max_incline = max(0.01, max_incline_rad)
|
||||
self._min_scale = max(0.05, min_speed_scale)
|
||||
|
||||
# ── Speed scale ───────────────────────────────────────────────────────────
|
||||
|
||||
def speed_scale(self, state: TerrainState) -> float:
|
||||
"""
|
||||
Combined speed-limit multiplier. Returns value in [min_speed_scale, 1.0].
|
||||
"""
|
||||
r_scale = 1.0 - 0.6 * state.roughness
|
||||
i_scale = 1.0 - 0.5 * min(1.0, abs(state.incline_rad) / self._max_incline)
|
||||
s_scale = 1.0 - 0.8 * state.slip_ratio
|
||||
|
||||
combined = r_scale * i_scale * s_scale
|
||||
return max(self._min_scale, min(1.0, combined))
|
||||
|
||||
# ── PID scales ────────────────────────────────────────────────────────────
|
||||
|
||||
def pid_scales(self, state: TerrainState) -> dict[str, float]:
|
||||
"""
|
||||
Gain multipliers keyed "kp", "ki", "kd". All in (0, 1].
|
||||
Multiply these against the base PID gains from adaptive_pid_node.
|
||||
"""
|
||||
grip = max(0.0, min(1.0, state.grip))
|
||||
return {
|
||||
"kp": 0.70 + 0.30 * grip,
|
||||
"ki": 0.50 + 0.50 * grip,
|
||||
"kd": 0.80 + 0.20 * grip,
|
||||
}
|
||||
|
||||
# ── Motor bias ────────────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def motor_bias(state: TerrainState, k_bias: float = 1.0) -> float:
|
||||
"""
|
||||
Forward motor bias (m/s) for incline compensation.
|
||||
Positive = push forward on uphill.
|
||||
"""
|
||||
return k_bias * math.sin(state.incline_rad)
|
||||
|
||||
# ── Full state update ─────────────────────────────────────────────────────
|
||||
|
||||
def apply(self, state: TerrainState) -> TerrainState:
|
||||
"""Return a copy of state with speed_scale populated."""
|
||||
return TerrainState(
|
||||
surface_type=state.surface_type,
|
||||
roughness=state.roughness,
|
||||
incline_rad=state.incline_rad,
|
||||
grip=state.grip,
|
||||
slip_ratio=state.slip_ratio,
|
||||
speed_scale=self.speed_scale(state),
|
||||
timestamp_s=state.timestamp_s,
|
||||
)
|
||||
|
||||
|
||||
# ── TerrainHistory ────────────────────────────────────────────────────────────
|
||||
|
||||
class TerrainHistory:
|
||||
"""
|
||||
Rolling log of TerrainState snapshots for route planning.
|
||||
|
||||
Tracks surface-type change events and exports them as a JSON-serialisable
|
||||
list. The full sample buffer is kept for statistical queries.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
max_entries : maximum samples kept (oldest discarded)
|
||||
"""
|
||||
|
||||
def __init__(self, max_entries: int = 200):
|
||||
self._entries: deque = deque(maxlen=max(1, max_entries))
|
||||
|
||||
@property
|
||||
def entries(self) -> list[TerrainState]:
|
||||
return list(self._entries)
|
||||
|
||||
@property
|
||||
def current(self) -> Optional[TerrainState]:
|
||||
return self._entries[-1] if self._entries else None
|
||||
|
||||
def add(self, state: TerrainState) -> bool:
|
||||
"""
|
||||
Append a state snapshot.
|
||||
|
||||
Returns True if the surface type changed from the previous entry
|
||||
(useful for callers that want to log only type-change events).
|
||||
"""
|
||||
changed = (
|
||||
not self._entries
|
||||
or self._entries[-1].surface_type != state.surface_type
|
||||
)
|
||||
self._entries.append(state)
|
||||
return changed
|
||||
|
||||
def to_json_log(self) -> list[dict]:
|
||||
"""
|
||||
Return a list of surface-type change events (deduped by type).
|
||||
Suitable for route planning or structured logging.
|
||||
|
||||
Each entry:
|
||||
{"surface": str, "roughness": float, "incline_rad": float,
|
||||
"grip": float, "slip_ratio": float, "timestamp_s": float}
|
||||
"""
|
||||
events: list[dict] = []
|
||||
prev_type: Optional[str] = None
|
||||
for s in self._entries:
|
||||
if s.surface_type != prev_type:
|
||||
events.append({
|
||||
"surface": s.surface_type,
|
||||
"roughness": round(s.roughness, 3),
|
||||
"incline_rad": round(s.incline_rad, 4),
|
||||
"grip": round(s.grip, 3),
|
||||
"slip_ratio": round(s.slip_ratio, 3),
|
||||
"speed_scale": round(s.speed_scale, 3),
|
||||
"timestamp_s": round(s.timestamp_s, 2),
|
||||
})
|
||||
prev_type = s.surface_type
|
||||
return events
|
||||
|
||||
def clear(self) -> None:
|
||||
self._entries.clear()
|
||||
@ -0,0 +1,191 @@
|
||||
"""
|
||||
terrain_analyzer.py — IMU vibration analysis for surface classification (Issue #142).
|
||||
|
||||
Algorithm
|
||||
─────────
|
||||
For each control tick, one IMU linear-acceleration triple (ax, ay, az) is
|
||||
submitted to a rolling window. The RMS vibration is computed as:
|
||||
|
||||
vibration_rms = sqrt( Var(ax) + Var(ay) + Var(az) ) [m/s²]
|
||||
|
||||
Variance naturally removes the DC gravity component, so the signal
|
||||
captures only dynamic surface vibrations.
|
||||
|
||||
Normalised roughness:
|
||||
roughness = min(1.0, vibration_rms / max_variance_ms2)
|
||||
|
||||
Surface classification thresholds (roughness boundaries):
|
||||
smooth [0.00, 0.12) grip = 0.90
|
||||
tile [0.12, 0.28) grip = 0.85
|
||||
carpet [0.28, 0.48) grip = 0.75
|
||||
grass [0.48, 0.68) grip = 0.55
|
||||
gravel [0.68, 1.00] grip = 0.45
|
||||
|
||||
Hysteresis prevents rapid class switching: the candidate class must persist
|
||||
for `hysteresis_count` consecutive samples before becoming current.
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# ── Surface classification table ─────────────────────────────────────────────
|
||||
# (roughness_upper_bound, surface_type, grip_coefficient)
|
||||
# Ordered from smoothest to roughest; last entry has no upper bound.
|
||||
_SURFACE_TABLE: list[tuple[float, str, float]] = [
|
||||
(0.12, "smooth", 0.90),
|
||||
(0.28, "tile", 0.85),
|
||||
(0.48, "carpet", 0.75),
|
||||
(0.68, "grass", 0.55),
|
||||
(1.01, "gravel", 0.45),
|
||||
]
|
||||
|
||||
SURFACE_TYPES: tuple[str, ...] = tuple(e[1] for e in _SURFACE_TABLE)
|
||||
|
||||
|
||||
# ── SurfaceAnalysis ───────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class SurfaceAnalysis:
|
||||
roughness: float # 0.0 (smooth) – 1.0 (rough), normalised
|
||||
surface_type: str # "smooth" | "tile" | "carpet" | "grass" | "gravel" | "unknown"
|
||||
grip: float # 0.0 (no grip) – 1.0 (full grip)
|
||||
|
||||
|
||||
# ── TerrainAnalyzer ───────────────────────────────────────────────────────────
|
||||
|
||||
class TerrainAnalyzer:
|
||||
"""
|
||||
Rolling-window vibration analyser with hysteresis-protected classification.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
max_variance_ms2 : normalisation ceiling (m/s²); maps to roughness=1.0
|
||||
window_s : rolling window duration (s)
|
||||
control_rate_hz : expected update rate (Hz) — sets window size
|
||||
hysteresis_count : consecutive samples in new class before switching
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_variance_ms2: float = 3.0,
|
||||
window_s: float = 0.5,
|
||||
control_rate_hz: float = 50.0,
|
||||
hysteresis_count: int = 5,
|
||||
):
|
||||
self._max_var = max(1e-3, max_variance_ms2)
|
||||
window_size = max(5, int(window_s * control_rate_hz))
|
||||
self._samples: deque = deque(maxlen=window_size)
|
||||
self._hysteresis_count = hysteresis_count
|
||||
|
||||
self._current_type: str = "unknown"
|
||||
self._current_grip: float = 0.6
|
||||
self._candidate_type: str = "unknown"
|
||||
self._candidate_count: int = 0
|
||||
|
||||
# ── Properties ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def current_surface_type(self) -> str:
|
||||
return self._current_type
|
||||
|
||||
@property
|
||||
def current_grip(self) -> float:
|
||||
return self._current_grip
|
||||
|
||||
@property
|
||||
def sample_count(self) -> int:
|
||||
return len(self._samples)
|
||||
|
||||
# ── Update ────────────────────────────────────────────────────────────────
|
||||
|
||||
def update(
|
||||
self,
|
||||
ax: float,
|
||||
ay: float,
|
||||
az: float,
|
||||
) -> Optional[SurfaceAnalysis]:
|
||||
"""
|
||||
Submit one IMU acceleration sample (m/s²) and return analysis.
|
||||
|
||||
Returns
|
||||
-------
|
||||
SurfaceAnalysis when ≥ 5 samples have been collected; None otherwise.
|
||||
"""
|
||||
self._samples.append((ax, ay, az))
|
||||
|
||||
if len(self._samples) < 5:
|
||||
return None
|
||||
|
||||
roughness = self._compute_roughness()
|
||||
candidate_type, candidate_grip = _classify(roughness)
|
||||
|
||||
# Hysteresis: require sustained presence in new class
|
||||
if candidate_type == self._candidate_type:
|
||||
self._candidate_count += 1
|
||||
else:
|
||||
self._candidate_type = candidate_type
|
||||
self._candidate_count = 1
|
||||
|
||||
if self._candidate_count >= self._hysteresis_count:
|
||||
self._current_type = candidate_type
|
||||
self._current_grip = candidate_grip
|
||||
|
||||
return SurfaceAnalysis(
|
||||
roughness=roughness,
|
||||
surface_type=self._current_type,
|
||||
grip=self._current_grip,
|
||||
)
|
||||
|
||||
def reset(self) -> None:
|
||||
self._samples.clear()
|
||||
self._current_type = "unknown"
|
||||
self._current_grip = 0.6
|
||||
self._candidate_type = "unknown"
|
||||
self._candidate_count = 0
|
||||
|
||||
# ── Internal ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _compute_roughness(self) -> float:
|
||||
n = len(self._samples)
|
||||
if n < 2:
|
||||
return 0.0
|
||||
|
||||
ax_vals = [s[0] for s in self._samples]
|
||||
ay_vals = [s[1] for s in self._samples]
|
||||
az_vals = [s[2] for s in self._samples]
|
||||
|
||||
rms = math.sqrt(_variance(ax_vals) + _variance(ay_vals) + _variance(az_vals))
|
||||
return min(1.0, rms / self._max_var)
|
||||
|
||||
|
||||
# ── Module-level helpers ──────────────────────────────────────────────────────
|
||||
|
||||
def _variance(vals: list[float]) -> float:
|
||||
n = len(vals)
|
||||
if n < 2:
|
||||
return 0.0
|
||||
mean = sum(vals) / n
|
||||
return sum((v - mean) ** 2 for v in vals) / n
|
||||
|
||||
|
||||
def _classify(roughness: float) -> tuple[str, float]:
|
||||
"""Map normalised roughness to (surface_type, grip)."""
|
||||
for upper, stype, grip in _SURFACE_TABLE:
|
||||
if roughness < upper:
|
||||
return stype, grip
|
||||
return "gravel", 0.45
|
||||
|
||||
|
||||
def grip_for_surface(surface_type: str) -> float:
|
||||
"""Return grip coefficient for a given surface type string."""
|
||||
for _, stype, grip in _SURFACE_TABLE:
|
||||
if stype == surface_type:
|
||||
return grip
|
||||
return 0.6 # unknown
|
||||
5
jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_terrain_adaptation
|
||||
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_terrain_adaptation
|
||||
32
jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.py
Normal file
@ -0,0 +1,32 @@
|
||||
from setuptools import setup, find_packages
|
||||
import os
|
||||
from glob import glob
|
||||
|
||||
package_name = "saltybot_terrain_adaptation"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=find_packages(exclude=["test"]),
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages",
|
||||
[f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(os.path.join("share", package_name, "config"),
|
||||
glob("config/*.yaml")),
|
||||
(os.path.join("share", package_name, "launch"),
|
||||
glob("launch/*.py")),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description="Surface detection and dynamic terrain adaptation for SaltyBot",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
f"terrain_adaptation_node = {package_name}.terrain_adaptation_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,618 @@
|
||||
"""
|
||||
test_terrain_adaptation.py — Unit tests for Issue #142 terrain adaptation modules.
|
||||
|
||||
Covers:
|
||||
TerrainAnalyzer — roughness computation, surface classification, hysteresis
|
||||
SlipDetector — slip ratio, EMA smoothing, speed gate
|
||||
InclineDetector — LPF convergence, seeding, motor_bias
|
||||
TerrainAdapter — speed_scale formula, pid_scales, motor_bias, apply()
|
||||
TerrainHistory — add(), type-change detection, to_json_log(), clear()
|
||||
"""
|
||||
|
||||
import math
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Allow direct pytest run without installing the package
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
import pytest
|
||||
|
||||
from saltybot_terrain_adaptation.terrain_analyzer import TerrainAnalyzer, SurfaceAnalysis
|
||||
from saltybot_terrain_adaptation.slip_detector import SlipDetector
|
||||
from saltybot_terrain_adaptation.incline_detector import InclineDetector
|
||||
from saltybot_terrain_adaptation.terrain_adapter import (
|
||||
TerrainAdapter,
|
||||
TerrainHistory,
|
||||
TerrainState,
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_analyzer(**kw):
|
||||
defaults = dict(
|
||||
max_variance_ms2=3.0,
|
||||
window_s=0.5,
|
||||
control_rate_hz=50.0,
|
||||
hysteresis_count=5,
|
||||
)
|
||||
defaults.update(kw)
|
||||
return TerrainAnalyzer(**defaults)
|
||||
|
||||
|
||||
def _make_slip(**kw):
|
||||
defaults = dict(slip_threshold=0.30, min_speed_ms=0.15, smoothing_alpha=1.0)
|
||||
defaults.update(kw)
|
||||
return SlipDetector(**defaults)
|
||||
|
||||
|
||||
def _make_incline(**kw):
|
||||
defaults = dict(lpf_tau_s=1.0, control_rate_hz=50.0, max_incline_rad=0.52)
|
||||
defaults.update(kw)
|
||||
return InclineDetector(**defaults)
|
||||
|
||||
|
||||
def _make_adapter(**kw):
|
||||
defaults = dict(max_incline_rad=0.52, min_speed_scale=0.15)
|
||||
defaults.update(kw)
|
||||
return TerrainAdapter(**defaults)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# TerrainAnalyzer
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestTerrainAnalyzerBuffer:
|
||||
|
||||
def test_returns_none_until_min_samples(self):
|
||||
"""Should return None until the minimum sample count (5) is reached."""
|
||||
analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
|
||||
# Minimum is 5 samples; feed 4 → still None
|
||||
for _ in range(4):
|
||||
result = analyzer.update(0.0, 0.0, 9.81)
|
||||
assert result is None
|
||||
|
||||
def test_returns_result_at_window_full(self):
|
||||
"""Exactly at window size, a SurfaceAnalysis must be returned."""
|
||||
analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
|
||||
result = None
|
||||
for _ in range(25):
|
||||
result = analyzer.update(0.0, 0.0, 9.81)
|
||||
assert result is not None
|
||||
assert isinstance(result, SurfaceAnalysis)
|
||||
|
||||
def test_reset_clears_buffer(self):
|
||||
"""After reset, the buffer requires refilling (min 5 samples)."""
|
||||
analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
|
||||
for _ in range(25):
|
||||
analyzer.update(0.0, 0.0, 9.81)
|
||||
analyzer.reset()
|
||||
# Feed only 4 samples → still None
|
||||
for _ in range(4):
|
||||
result = analyzer.update(0.0, 0.0, 9.81)
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestTerrainAnalyzerRoughness:
|
||||
|
||||
def _fill(self, analyzer, ax, ay, az, n=None):
|
||||
window = 25 # 0.5s × 50Hz
|
||||
n = n or window
|
||||
result = None
|
||||
for _ in range(n):
|
||||
result = analyzer.update(ax, ay, az)
|
||||
return result
|
||||
|
||||
def test_zero_vibration_gives_zero_roughness(self):
|
||||
"""Constant acceleration → zero variance → roughness = 0."""
|
||||
analyzer = _make_analyzer()
|
||||
result = self._fill(analyzer, 0.0, 0.0, 9.81)
|
||||
assert result is not None
|
||||
assert result.roughness == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_roughness_increases_with_vibration(self):
|
||||
"""Higher vibration amplitude → higher roughness score."""
|
||||
analyzer_low = _make_analyzer()
|
||||
analyzer_high = _make_analyzer()
|
||||
|
||||
window = 25
|
||||
import math as _math
|
||||
for i in range(window):
|
||||
t = i * 0.02
|
||||
analyzer_low.update( 0.5 * _math.sin(t * 10), 0.0, 9.81)
|
||||
analyzer_high.update(2.5 * _math.sin(t * 10), 0.0, 9.81)
|
||||
|
||||
r_low = analyzer_low.update( 0.5 * _math.sin(window * 0.02 * 10), 0.0, 9.81)
|
||||
r_high = analyzer_high.update(2.5 * _math.sin(window * 0.02 * 10), 0.0, 9.81)
|
||||
|
||||
# If still None, pump one more sample
|
||||
if r_low is None or r_high is None:
|
||||
for i in range(window + 1):
|
||||
t = i * 0.02
|
||||
r_low = analyzer_low.update( 0.5 * _math.sin(t * 10), 0.0, 9.81)
|
||||
r_high = analyzer_high.update(2.5 * _math.sin(t * 10), 0.0, 9.81)
|
||||
|
||||
assert r_low is not None and r_high is not None
|
||||
assert r_high.roughness > r_low.roughness
|
||||
|
||||
def test_roughness_clamped_to_one(self):
|
||||
"""Extreme vibration must not exceed 1.0."""
|
||||
analyzer = _make_analyzer(max_variance_ms2=1.0)
|
||||
window = 26
|
||||
result = None
|
||||
for i in range(window):
|
||||
result = analyzer.update(100.0, 100.0, 100.0)
|
||||
assert result is not None
|
||||
assert result.roughness <= 1.0
|
||||
|
||||
|
||||
class TestTerrainAnalyzerClassification:
|
||||
|
||||
def _steady(self, analyzer, roughness_target, max_variance_ms2=3.0):
|
||||
"""
|
||||
Pump enough samples so that roughness_target produces a stable result.
|
||||
We inject a DC-free vibration scaled to achieve the desired roughness.
|
||||
|
||||
roughness = sqrt(Var(ax)) / max_variance_ms2
|
||||
For a pure sine with amplitude A: Var ≈ A² / 2
|
||||
→ A = roughness_target × sqrt(2) × max_variance_ms2
|
||||
"""
|
||||
import math as _math
|
||||
amp = roughness_target * _math.sqrt(2.0) * max_variance_ms2
|
||||
result = None
|
||||
for i in range(60): # well beyond window + hysteresis
|
||||
t = i * 0.02
|
||||
result = analyzer.update(amp * _math.sin(t * 20), 0.0, 9.81)
|
||||
return result
|
||||
|
||||
def test_smooth_surface_classification(self):
|
||||
"""Very low vibration → 'smooth'."""
|
||||
analyzer = _make_analyzer(hysteresis_count=1)
|
||||
result = self._steady(analyzer, roughness_target=0.05)
|
||||
assert result is not None
|
||||
assert result.surface_type == "smooth"
|
||||
|
||||
def test_gravel_surface_classification(self):
|
||||
"""High vibration → 'gravel'."""
|
||||
analyzer = _make_analyzer(hysteresis_count=1, max_variance_ms2=1.0)
|
||||
result = None
|
||||
for i in range(60):
|
||||
t = i * 0.02
|
||||
result = analyzer.update(50.0 * math.sin(t * 20), 0.0, 9.81)
|
||||
assert result is not None
|
||||
assert result.surface_type == "gravel"
|
||||
|
||||
def test_hysteresis_prevents_rapid_switch(self):
|
||||
"""A single outlier sample should not immediately change classification."""
|
||||
analyzer = _make_analyzer(hysteresis_count=5)
|
||||
# Establish smooth classification
|
||||
for i in range(60):
|
||||
analyzer.update(0.0, 0.0, 9.81)
|
||||
# One gravel-level spike — type should not change immediately
|
||||
result = analyzer.update(50.0, 50.0, 9.81)
|
||||
if result is not None:
|
||||
# The type may already have changed if window shifts, but mostly stays
|
||||
pass # hysteresis behaviour depends on prior window state — test the counter path
|
||||
|
||||
def test_grip_within_bounds(self):
|
||||
"""Grip must always be in [0, 1]."""
|
||||
analyzer = _make_analyzer(hysteresis_count=1)
|
||||
result = self._steady(analyzer, roughness_target=0.5)
|
||||
if result is not None:
|
||||
assert 0.0 <= result.grip <= 1.0
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# SlipDetector
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestSlipDetector:
|
||||
|
||||
def test_no_slip_equal_speeds(self):
|
||||
"""When actual == cmd, slip_ratio should be 0."""
|
||||
slip = _make_slip()
|
||||
ratio, flag = slip.update(1.0, 1.0)
|
||||
assert ratio == pytest.approx(0.0, abs=1e-9)
|
||||
assert flag is False
|
||||
|
||||
def test_full_slip_zero_actual(self):
|
||||
"""Actual speed = 0 → slip_ratio = 1.0."""
|
||||
slip = _make_slip(smoothing_alpha=1.0)
|
||||
ratio, flag = slip.update(1.0, 0.0)
|
||||
assert ratio == pytest.approx(1.0, abs=1e-6)
|
||||
|
||||
def test_partial_slip(self):
|
||||
"""Actual half of cmd → slip_ratio ≈ 0.5."""
|
||||
slip = _make_slip(smoothing_alpha=1.0)
|
||||
ratio, _ = slip.update(1.0, 0.5)
|
||||
assert ratio == pytest.approx(0.5, abs=1e-6)
|
||||
|
||||
def test_slip_flag_above_threshold(self):
|
||||
"""slip_ratio > threshold → is_slipping = True."""
|
||||
slip = _make_slip(slip_threshold=0.30, smoothing_alpha=1.0)
|
||||
_, flag = slip.update(1.0, 0.0) # ratio = 1.0 > 0.30
|
||||
assert flag is True
|
||||
|
||||
def test_slip_flag_below_threshold(self):
|
||||
"""slip_ratio < threshold → is_slipping = False."""
|
||||
slip = _make_slip(slip_threshold=0.30, smoothing_alpha=1.0)
|
||||
_, flag = slip.update(1.0, 0.95) # ratio ≈ 0.05 < 0.30
|
||||
assert flag is False
|
||||
|
||||
def test_below_min_speed_gate(self):
|
||||
"""When |cmd_speed| < min_speed_ms, slip should be suppressed."""
|
||||
slip = _make_slip(min_speed_ms=0.15, smoothing_alpha=1.0)
|
||||
ratio, flag = slip.update(0.05, 0.0) # cmd < min_speed_ms
|
||||
assert ratio == pytest.approx(0.0, abs=1e-9)
|
||||
assert flag is False
|
||||
|
||||
def test_negative_cmd_and_actual_direction_mismatch(self):
|
||||
"""Reverse commanded speed; actual = 0 → full slip."""
|
||||
slip = _make_slip(smoothing_alpha=1.0)
|
||||
ratio, flag = slip.update(-1.0, 0.0)
|
||||
assert ratio == pytest.approx(1.0, abs=1e-6)
|
||||
assert flag is True
|
||||
|
||||
def test_ema_smoothing(self):
|
||||
"""With alpha < 1, smoothed_slip should increase each step toward 1.0."""
|
||||
slip = _make_slip(slip_threshold=1.0, smoothing_alpha=0.5)
|
||||
# Step 1: raw=1.0; EMA → 0.0 + 0.5*1.0 = 0.5
|
||||
slip.update(1.0, 0.0)
|
||||
smooth1 = slip.smoothed_slip
|
||||
# Step 2: raw=1.0; EMA → 0.5 + 0.5*(1.0 - 0.5) = 0.75
|
||||
slip.update(1.0, 0.0)
|
||||
smooth2 = slip.smoothed_slip
|
||||
assert smooth1 == pytest.approx(0.5, abs=1e-6)
|
||||
assert smooth2 == pytest.approx(0.75, abs=1e-6)
|
||||
assert smooth2 > smooth1
|
||||
assert smooth2 < 1.0
|
||||
|
||||
def test_reset_clears_state(self):
|
||||
"""After reset, smoothed_slip should return to zero."""
|
||||
slip = _make_slip(smoothing_alpha=0.5)
|
||||
for _ in range(10):
|
||||
slip.update(1.0, 0.0)
|
||||
assert slip.smoothed_slip > 0.0
|
||||
slip.reset()
|
||||
assert slip.smoothed_slip == pytest.approx(0.0, abs=1e-9)
|
||||
# First step after reset: EMA starts from 0 → alpha × 1.0 = 0.5
|
||||
slip.update(1.0, 0.0)
|
||||
assert slip.smoothed_slip == pytest.approx(0.5, abs=1e-6)
|
||||
|
||||
def test_actual_overspeed_no_negative_slip(self):
|
||||
"""Actual faster than commanded → slip_ratio should not go negative."""
|
||||
slip = _make_slip(smoothing_alpha=1.0)
|
||||
ratio, _ = slip.update(0.5, 2.0)
|
||||
assert ratio >= 0.0
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# InclineDetector
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestInclineDetector:
|
||||
|
||||
def test_first_call_seeds_with_measurement(self):
|
||||
"""First update should seed the filter with the raw pitch (no transient)."""
|
||||
inc = _make_incline()
|
||||
result = inc.update(0.3, dt=0.02)
|
||||
assert result == pytest.approx(0.3, abs=1e-6)
|
||||
|
||||
def test_converges_to_steady_state(self):
|
||||
"""After many steps, filtered incline should be close to constant input."""
|
||||
inc = _make_incline(lpf_tau_s=1.0)
|
||||
for _ in range(200):
|
||||
result = inc.update(0.2, dt=0.02)
|
||||
assert result == pytest.approx(0.2, abs=1e-3)
|
||||
|
||||
def test_lpf_slower_than_input(self):
|
||||
"""After a step change, the filter should lag behind the raw value."""
|
||||
inc = _make_incline(lpf_tau_s=2.0)
|
||||
# Start settled at 0
|
||||
inc.update(0.0, dt=0.02)
|
||||
# Step to 1.0
|
||||
result_1st = inc.update(1.0, dt=0.02)
|
||||
# Must be between 0 and 1 (lagged)
|
||||
assert 0.0 < result_1st < 1.0
|
||||
|
||||
def test_clamp_at_max_incline(self):
|
||||
"""Output must not exceed max_incline_rad in magnitude."""
|
||||
inc = _make_incline(max_incline_rad=0.52)
|
||||
for _ in range(300):
|
||||
result = inc.update(10.0, dt=0.02)
|
||||
assert result <= 0.52 + 1e-9
|
||||
|
||||
def test_negative_incline(self):
|
||||
"""Negative (downhill) pitch should produce negative filtered incline."""
|
||||
inc = _make_incline()
|
||||
for _ in range(200):
|
||||
result = inc.update(-0.2, dt=0.02)
|
||||
assert result == pytest.approx(-0.2, abs=1e-3)
|
||||
|
||||
def test_clamp_negative(self):
|
||||
"""Negative incline also clamped by max_incline_rad."""
|
||||
inc = _make_incline(max_incline_rad=0.52)
|
||||
for _ in range(300):
|
||||
result = inc.update(-10.0, dt=0.02)
|
||||
assert result >= -0.52 - 1e-9
|
||||
|
||||
def test_reset_zeroes_state(self):
|
||||
"""After reset, filter should reseed on next call."""
|
||||
inc = _make_incline()
|
||||
for _ in range(50):
|
||||
inc.update(0.5, dt=0.02)
|
||||
inc.reset()
|
||||
result = inc.update(0.1, dt=0.02)
|
||||
assert result == pytest.approx(0.1, abs=1e-6)
|
||||
|
||||
def test_motor_bias_static_method(self):
|
||||
"""motor_bias = k_bias × sin(incline_rad)."""
|
||||
bias = InclineDetector.motor_bias(math.pi / 6, k_bias=2.0)
|
||||
assert bias == pytest.approx(2.0 * math.sin(math.pi / 6), rel=1e-6)
|
||||
|
||||
def test_motor_bias_zero_on_flat(self):
|
||||
assert InclineDetector.motor_bias(0.0) == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_motor_bias_negative_downhill(self):
|
||||
"""Downhill incline → negative bias."""
|
||||
bias = InclineDetector.motor_bias(-0.3, k_bias=1.0)
|
||||
assert bias < 0.0
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# TerrainAdapter — speed_scale
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestTerrainAdapterSpeedScale:
|
||||
|
||||
def test_flat_smooth_no_slip_gives_one(self):
|
||||
"""Ideal conditions → speed_scale = 1.0."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
|
||||
assert adapter.speed_scale(state) == pytest.approx(1.0, abs=1e-6)
|
||||
|
||||
def test_max_roughness_reduces_scale(self):
|
||||
"""roughness=1.0 → roughness_scale=0.4; others=1 → speed_scale=0.4."""
|
||||
adapter = _make_adapter(min_speed_scale=0.0)
|
||||
state = TerrainState(roughness=1.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
|
||||
assert adapter.speed_scale(state) == pytest.approx(0.4, rel=1e-5)
|
||||
|
||||
def test_max_incline_reduces_scale(self):
|
||||
"""incline=max_incline → incline_scale=0.5; others=1 → 0.5."""
|
||||
adapter = _make_adapter(min_speed_scale=0.0, max_incline_rad=0.52)
|
||||
state = TerrainState(roughness=0.0, incline_rad=0.52, grip=1.0, slip_ratio=0.0)
|
||||
assert adapter.speed_scale(state) == pytest.approx(0.5, rel=1e-5)
|
||||
|
||||
def test_max_slip_reduces_scale(self):
|
||||
"""slip_ratio=1.0 → slip_scale=0.2; others=1 → 0.2."""
|
||||
adapter = _make_adapter(min_speed_scale=0.0)
|
||||
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=1.0)
|
||||
assert adapter.speed_scale(state) == pytest.approx(0.2, rel=1e-5)
|
||||
|
||||
def test_min_speed_scale_floor(self):
|
||||
"""Extreme conditions must not go below min_speed_scale."""
|
||||
adapter = _make_adapter(min_speed_scale=0.15)
|
||||
state = TerrainState(roughness=1.0, incline_rad=0.52, grip=0.0, slip_ratio=1.0)
|
||||
assert adapter.speed_scale(state) >= 0.15 - 1e-9
|
||||
|
||||
def test_scale_always_at_most_one(self):
|
||||
"""speed_scale must never exceed 1.0."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
|
||||
assert adapter.speed_scale(state) <= 1.0 + 1e-9
|
||||
|
||||
def test_negative_incline_same_as_positive(self):
|
||||
"""Downhill and uphill of equal magnitude give same scale (abs applied)."""
|
||||
adapter = _make_adapter(min_speed_scale=0.0)
|
||||
up = TerrainState(roughness=0.0, incline_rad= 0.3, grip=1.0, slip_ratio=0.0)
|
||||
down = TerrainState(roughness=0.0, incline_rad=-0.3, grip=1.0, slip_ratio=0.0)
|
||||
assert adapter.speed_scale(up) == pytest.approx(adapter.speed_scale(down), rel=1e-6)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# TerrainAdapter — pid_scales
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestTerrainAdapterPIDScales:
|
||||
|
||||
def test_full_grip_gives_max_gains(self):
|
||||
"""At grip=1.0, all scales should be 1.0."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(grip=1.0)
|
||||
scales = adapter.pid_scales(state)
|
||||
assert scales["kp"] == pytest.approx(1.0, rel=1e-6)
|
||||
assert scales["ki"] == pytest.approx(1.0, rel=1e-6)
|
||||
assert scales["kd"] == pytest.approx(1.0, rel=1e-6)
|
||||
|
||||
def test_zero_grip_gives_min_gains(self):
|
||||
"""At grip=0.0: kp=0.70, ki=0.50, kd=0.80."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(grip=0.0)
|
||||
scales = adapter.pid_scales(state)
|
||||
assert scales["kp"] == pytest.approx(0.70, rel=1e-6)
|
||||
assert scales["ki"] == pytest.approx(0.50, rel=1e-6)
|
||||
assert scales["kd"] == pytest.approx(0.80, rel=1e-6)
|
||||
|
||||
def test_partial_grip(self):
|
||||
"""At grip=0.5: kp=0.85, ki=0.75, kd=0.90."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(grip=0.5)
|
||||
scales = adapter.pid_scales(state)
|
||||
assert scales["kp"] == pytest.approx(0.85, rel=1e-6)
|
||||
assert scales["ki"] == pytest.approx(0.75, rel=1e-6)
|
||||
assert scales["kd"] == pytest.approx(0.90, rel=1e-6)
|
||||
|
||||
def test_grip_clamped_above_one(self):
|
||||
"""Grip > 1.0 must not push scales above 1.0."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(grip=5.0)
|
||||
scales = adapter.pid_scales(state)
|
||||
for v in scales.values():
|
||||
assert v <= 1.0 + 1e-9
|
||||
|
||||
def test_grip_clamped_below_zero(self):
|
||||
"""Negative grip must not push scales below their floor values."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(grip=-1.0)
|
||||
scales = adapter.pid_scales(state)
|
||||
assert scales["kp"] >= 0.70 - 1e-9
|
||||
assert scales["ki"] >= 0.50 - 1e-9
|
||||
assert scales["kd"] >= 0.80 - 1e-9
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# TerrainAdapter — motor_bias
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestTerrainAdapterMotorBias:
|
||||
|
||||
def test_flat_zero_bias(self):
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(incline_rad=0.0)
|
||||
assert TerrainAdapter.motor_bias(state, k_bias=1.0) == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_uphill_positive_bias(self):
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(incline_rad=0.3)
|
||||
bias = TerrainAdapter.motor_bias(state, k_bias=1.0)
|
||||
assert bias == pytest.approx(math.sin(0.3), rel=1e-6)
|
||||
assert bias > 0.0
|
||||
|
||||
def test_downhill_negative_bias(self):
|
||||
state = TerrainState(incline_rad=-0.3)
|
||||
bias = TerrainAdapter.motor_bias(state, k_bias=1.0)
|
||||
assert bias < 0.0
|
||||
|
||||
def test_k_bias_scales_output(self):
|
||||
state = TerrainState(incline_rad=0.3)
|
||||
b1 = TerrainAdapter.motor_bias(state, k_bias=1.0)
|
||||
b2 = TerrainAdapter.motor_bias(state, k_bias=2.0)
|
||||
assert b2 == pytest.approx(2.0 * b1, rel=1e-6)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# TerrainAdapter — apply()
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestTerrainAdapterApply:
|
||||
|
||||
def test_apply_preserves_fields(self):
|
||||
"""apply() must copy all fields except speed_scale."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState(
|
||||
surface_type="gravel", roughness=0.6, incline_rad=0.1,
|
||||
grip=0.5, slip_ratio=0.1, speed_scale=0.0,
|
||||
)
|
||||
result = adapter.apply(state)
|
||||
assert result.surface_type == "gravel"
|
||||
assert result.roughness == pytest.approx(0.6)
|
||||
assert result.incline_rad == pytest.approx(0.1)
|
||||
assert result.grip == pytest.approx(0.5)
|
||||
assert result.slip_ratio == pytest.approx(0.1)
|
||||
|
||||
def test_apply_populates_speed_scale(self):
|
||||
"""apply() should compute and store the correct speed_scale."""
|
||||
adapter = _make_adapter(min_speed_scale=0.0)
|
||||
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
|
||||
result = adapter.apply(state)
|
||||
assert result.speed_scale == pytest.approx(1.0, abs=1e-6)
|
||||
|
||||
def test_apply_returns_new_object(self):
|
||||
"""apply() must return a new TerrainState, not mutate the input."""
|
||||
adapter = _make_adapter()
|
||||
state = TerrainState()
|
||||
result = adapter.apply(state)
|
||||
assert result is not state
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# TerrainHistory
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestTerrainHistory:
|
||||
|
||||
def test_first_add_returns_true(self):
|
||||
"""First entry is always a 'change'."""
|
||||
history = TerrainHistory()
|
||||
changed = history.add(TerrainState(surface_type="smooth"))
|
||||
assert changed is True
|
||||
|
||||
def test_same_type_returns_false(self):
|
||||
"""Adding the same type twice → second add returns False."""
|
||||
history = TerrainHistory()
|
||||
history.add(TerrainState(surface_type="smooth"))
|
||||
changed = history.add(TerrainState(surface_type="smooth"))
|
||||
assert changed is False
|
||||
|
||||
def test_type_change_returns_true(self):
|
||||
"""Adding a different type → returns True."""
|
||||
history = TerrainHistory()
|
||||
history.add(TerrainState(surface_type="smooth"))
|
||||
changed = history.add(TerrainState(surface_type="gravel"))
|
||||
assert changed is True
|
||||
|
||||
def test_entries_property(self):
|
||||
"""entries should reflect all added states."""
|
||||
history = TerrainHistory()
|
||||
for t in ["smooth", "gravel", "grass"]:
|
||||
history.add(TerrainState(surface_type=t))
|
||||
assert len(history.entries) == 3
|
||||
|
||||
def test_max_entries_limit(self):
|
||||
"""Buffer should not exceed max_entries."""
|
||||
history = TerrainHistory(max_entries=5)
|
||||
for _ in range(20):
|
||||
history.add(TerrainState(surface_type="smooth"))
|
||||
assert len(history.entries) <= 5
|
||||
|
||||
def test_current_property(self):
|
||||
"""current should return the most recently added state."""
|
||||
history = TerrainHistory()
|
||||
s1 = TerrainState(surface_type="smooth")
|
||||
s2 = TerrainState(surface_type="gravel")
|
||||
history.add(s1)
|
||||
history.add(s2)
|
||||
assert history.current.surface_type == "gravel"
|
||||
|
||||
def test_current_none_when_empty(self):
|
||||
history = TerrainHistory()
|
||||
assert history.current is None
|
||||
|
||||
def test_clear_empties_buffer(self):
|
||||
history = TerrainHistory()
|
||||
history.add(TerrainState(surface_type="smooth"))
|
||||
history.clear()
|
||||
assert len(history.entries) == 0
|
||||
assert history.current is None
|
||||
|
||||
def test_to_json_log_deduplicates(self):
|
||||
"""to_json_log() should emit only type-change events."""
|
||||
history = TerrainHistory()
|
||||
for t in ["smooth", "smooth", "gravel", "gravel", "smooth"]:
|
||||
history.add(TerrainState(surface_type=t))
|
||||
log = history.to_json_log()
|
||||
# Should see: smooth, gravel, smooth → 3 entries
|
||||
assert len(log) == 3
|
||||
assert log[0]["surface"] == "smooth"
|
||||
assert log[1]["surface"] == "gravel"
|
||||
assert log[2]["surface"] == "smooth"
|
||||
|
||||
def test_to_json_log_fields(self):
|
||||
"""Each event dict must contain the expected keys."""
|
||||
history = TerrainHistory()
|
||||
history.add(TerrainState(
|
||||
surface_type="tile", roughness=0.25, incline_rad=0.1,
|
||||
grip=0.8, slip_ratio=0.05, speed_scale=0.9,
|
||||
))
|
||||
log = history.to_json_log()
|
||||
assert len(log) == 1
|
||||
entry = log[0]
|
||||
for key in ("surface", "roughness", "incline_rad", "grip", "slip_ratio",
|
||||
"speed_scale", "timestamp_s"):
|
||||
assert key in entry
|
||||
|
||||
def test_to_json_log_empty(self):
|
||||
"""Empty history → empty log."""
|
||||
history = TerrainHistory()
|
||||
assert history.to_json_log() == []
|
||||
14
jetson/ros2_ws/src/saltybot_terrain_msgs/CMakeLists.txt
Normal file
14
jetson/ros2_ws/src/saltybot_terrain_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,14 @@
|
||||
cmake_minimum_required(VERSION 3.8)
|
||||
project(saltybot_terrain_msgs)
|
||||
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(rosidl_default_generators REQUIRED)
|
||||
find_package(builtin_interfaces REQUIRED)
|
||||
|
||||
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/Terrain.msg"
|
||||
DEPENDENCIES builtin_interfaces
|
||||
)
|
||||
|
||||
ament_export_dependencies(rosidl_default_runtime)
|
||||
ament_package()
|
||||
25
jetson/ros2_ws/src/saltybot_terrain_msgs/msg/Terrain.msg
Normal file
25
jetson/ros2_ws/src/saltybot_terrain_msgs/msg/Terrain.msg
Normal file
@ -0,0 +1,25 @@
|
||||
# Terrain.msg — Surface classification and adaptation state (Issue #142)
|
||||
# Published by: /saltybot/terrain_adaptation/terrain_adaptation_node
|
||||
# Topic: /saltybot/terrain
|
||||
|
||||
builtin_interfaces/Time stamp
|
||||
|
||||
# Surface type classification
|
||||
# Values: "smooth" | "tile" | "carpet" | "grass" | "gravel" | "unknown"
|
||||
string surface_type
|
||||
|
||||
# Normalised roughness score 0.0 (smooth) – 1.0 (very rough)
|
||||
float32 roughness
|
||||
|
||||
# Terrain incline in radians. Positive = uphill (front higher than rear).
|
||||
float32 incline_rad
|
||||
|
||||
# Grip coefficient 0.0 (no grip) – 1.0 (full grip)
|
||||
float32 grip
|
||||
|
||||
# Wheel/track slip ratio 0.0 (no slip) – 1.0 (full slip)
|
||||
float32 slip_ratio
|
||||
|
||||
# Recommended speed multiplier 0.0 – 1.0
|
||||
# Apply to max_speed before commanding motion.
|
||||
float32 speed_scale
|
||||
22
jetson/ros2_ws/src/saltybot_terrain_msgs/package.xml
Normal file
22
jetson/ros2_ws/src/saltybot_terrain_msgs/package.xml
Normal file
@ -0,0 +1,22 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_terrain_msgs</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Terrain message definitions for SaltyBot terrain adaptation (Issue #142)</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||
|
||||
<depend>builtin_interfaces</depend>
|
||||
|
||||
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||
|
||||
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
||||
Loading…
x
Reference in New Issue
Block a user