feat(#142): terrain adaptation — surface detection, speed/PID/bias adaptation

Two new ROS2 packages implementing Issue #142:

saltybot_terrain_msgs (ament_cmake)
- Terrain.msg: stamp, surface_type, roughness, incline_rad, grip, slip_ratio, speed_scale

saltybot_terrain_adaptation (ament_python, 50 Hz)
- terrain_analyzer.py: IMU vibration RMS → roughness [0,1] + surface classification
  (smooth/tile/carpet/grass/gravel) with hysteresis-protected switching
- incline_detector.py: quaternion pitch → first-order LPF → incline_rad; motor_bias()
- slip_detector.py: cmd_vel vs odom → slip_ratio with EMA smoothing + speed gate
- terrain_adapter.py: TerrainState, TerrainAdapter (speed_scale, pid_scales, motor_bias),
  TerrainHistory (rolling deque, JSON type-change log)
- terrain_adaptation_node.py: 50 Hz node; IMU watchdog; publishes terrain JSON + typed msg,
  speed_scale, pid_scales (JSON), motor_bias, terrain_history (on type change)
- config/terrain_params.yaml, launch/terrain_adaptation.launch.py

Tests: 60/60 passing (TerrainAnalyzer, SlipDetector, InclineDetector, TerrainAdapter,
TerrainHistory all covered)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-controls 2026-03-02 10:04:36 -05:00
parent 07b4edaa2d
commit fb3e9f1bf1
16 changed files with 1763 additions and 0 deletions

View File

@ -0,0 +1,41 @@
/**:
ros__parameters:
# Control loop rate (Hz)
control_rate: 50.0
# Odometry topic for slip detection
odom_topic: "/saltybot/rover_odom"
# ── TerrainAnalyzer ────────────────────────────────────────────────────────
# RMS window length (seconds)
vibration_window_s: 0.5
# Normalisation ceiling for roughness score (m/s²)
roughness_max_ms2: 3.0
# Hysteresis: samples required before surface-type changes
hysteresis_count: 5
# ── InclineDetector ────────────────────────────────────────────────────────
# Low-pass filter time constant for incline (seconds)
incline_lpf_tau_s: 1.5
# Incline at which incline_scale reaches minimum (~30°)
max_incline_rad: 0.52
# ── SlipDetector ───────────────────────────────────────────────────────────
# Slip ratio above which is_slipping flag is raised
slip_threshold: 0.30
# Minimum commanded speed before slip detection is active (m/s)
slip_min_speed_ms: 0.15
# EMA smoothing factor for slip ratio
slip_smoothing_alpha: 0.20
# ── TerrainAdapter ─────────────────────────────────────────────────────────
# Incline-to-motor-bias gain (multiplied by sin(incline_rad))
k_bias: 1.0
# Speed scale floor — never command below this fraction of max speed
min_speed_scale: 0.15
# ── Misc ───────────────────────────────────────────────────────────────────
# Rolling history buffer size
history_max_entries: 200
# IMU silence timeout before safe-defaults are published (seconds)
imu_timeout_s: 0.5

View File

@ -0,0 +1,53 @@
"""
terrain_adaptation.launch.py Launch the terrain adaptation node (Issue #142).
Usage
-----
ros2 launch saltybot_terrain_adaptation terrain_adaptation.launch.py
ros2 launch saltybot_terrain_adaptation terrain_adaptation.launch.py \
odom_topic:=/saltybot/rover_odom control_rate:=50.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_terrain_adaptation")
default_params = os.path.join(pkg_share, "config", "terrain_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=default_params,
description="Path to terrain_params.yaml",
),
DeclareLaunchArgument(
"odom_topic",
default_value="/saltybot/rover_odom",
description="Odometry topic for wheel-slip detection",
),
DeclareLaunchArgument(
"control_rate",
default_value="50.0",
description="Control loop rate (Hz)",
),
Node(
package="saltybot_terrain_adaptation",
executable="terrain_adaptation_node",
name="terrain_adaptation",
output="screen",
parameters=[
LaunchConfiguration("params_file"),
{
"odom_topic": LaunchConfiguration("odom_topic"),
"control_rate": LaunchConfiguration("control_rate"),
},
],
),
])

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_terrain_adaptation</name>
<version>0.1.0</version>
<description>Surface detection and dynamic terrain adaptation for SaltyBot (Issue #142)</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>nav_msgs</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,106 @@
"""
incline_detector.py Terrain incline detection from IMU pitch (Issue #142).
Method
For a ground robot (rover/tank), the vehicle body tilts with the terrain,
so IMU pitch terrain incline + vibration noise.
A first-order discrete low-pass filter separates the slow incline component
from fast vibrations and balance corrections:
incline_filtered[n] = (1α) · incline_filtered[n1] + α · pitch_raw[n]
α = dt / (τ + dt)
The motor bias required to compensate for gravity on the incline is:
bias_ms = k_bias · sin(incline_rad)
Positive incline = uphill (front higher than rear) positive forward bias
needed to counteract gravity pulling the robot backward.
Pure module no ROS2 dependency.
"""
from __future__ import annotations
import math
class InclineDetector:
"""
Incline estimator via first-order low-pass filter on IMU pitch.
Parameters
----------
lpf_tau_s : low-pass filter time constant (s); larger = smoother
control_rate_hz : expected update rate (Hz)
max_incline_rad : hard clamp on reported incline magnitude (rad)
"""
def __init__(
self,
lpf_tau_s: float = 1.5,
control_rate_hz: float = 50.0,
max_incline_rad: float = 0.70, # ≈ 40°
):
self._tau = max(1e-3, lpf_tau_s)
self._dt = 1.0 / max(1.0, control_rate_hz)
self._max = abs(max_incline_rad)
self._incline_rad: float = 0.0
self._initialised: bool = False
# ── Properties ────────────────────────────────────────────────────────────
@property
def incline_rad(self) -> float:
return self._incline_rad
# ── Update ────────────────────────────────────────────────────────────────
def update(self, pitch_rad: float, dt: float | None = None) -> float:
"""
Submit one pitch sample and return the filtered incline estimate.
Parameters
----------
pitch_rad : IMU pitch angle (rad); +ve = front-up / uphill
dt : elapsed time since last sample (s); uses 1/rate if None
Returns
-------
Filtered incline in radians, clamped to ±max_incline_rad.
"""
step = dt if dt is not None and dt > 0.0 else self._dt
if not self._initialised:
# Seed filter with first measurement to avoid large initial transient
self._incline_rad = pitch_rad
self._initialised = True
return max(-self._max, min(self._max, self._incline_rad))
alpha = step / (self._tau + step)
self._incline_rad = (1.0 - alpha) * self._incline_rad + alpha * pitch_rad
return max(-self._max, min(self._max, self._incline_rad))
def reset(self) -> None:
self._incline_rad = 0.0
self._initialised = False
# ── Motor bias ────────────────────────────────────────────────────────────
@staticmethod
def motor_bias(incline_rad: float, k_bias: float = 1.0) -> float:
"""
Forward motor bias (m/s) to compensate for gravitational pull on incline.
Positive bias = add forward thrust for uphill.
Negative bias = reduce thrust / add braking for downhill.
Parameters
----------
incline_rad : filtered incline angle (rad)
k_bias : tuning gain; scale bias magnitude
"""
return k_bias * math.sin(incline_rad)

View File

@ -0,0 +1,91 @@
"""
slip_detector.py Wheel slip detection for SaltyBot (Issue #142).
Method
Compare commanded speed (from /cmd_vel) to measured speed (from odometry).
When the robot is commanded to move but the measured speed falls significantly
below the command, the difference is attributed to wheel/track slip.
slip_ratio = max(0, |v_cmd| |v_actual|) / |v_cmd|
slip_ratio = 0.0 no slip (actual matches commanded)
slip_ratio = 1.0 full slip (wheels spinning, no forward motion)
Detection is suppressed below `min_speed_ms` to avoid noise at low speed.
A smoothed slip estimate is maintained via exponential moving average
(alpha per update) to reduce transient spikes.
Pure module no ROS2 dependency.
"""
from __future__ import annotations
class SlipDetector:
"""
Wheel/track slip detector from commanded vs measured speed.
Parameters
----------
slip_threshold : slip_ratio above which is_slipping = True
min_speed_ms : minimum |v_cmd| to attempt detection (avoids noise)
smoothing_alpha : EMA coefficient (0, 1]; 1 = no smoothing
"""
def __init__(
self,
slip_threshold: float = 0.30,
min_speed_ms: float = 0.15,
smoothing_alpha: float = 0.20,
):
self._threshold = max(0.0, min(1.0, slip_threshold))
self._min_speed = abs(min_speed_ms)
self._alpha = max(1e-6, min(1.0, smoothing_alpha))
self._smoothed: float = 0.0
# ── Properties ────────────────────────────────────────────────────────────
@property
def smoothed_slip(self) -> float:
"""EMA-smoothed slip ratio."""
return self._smoothed
# ── Update ────────────────────────────────────────────────────────────────
def update(
self,
cmd_speed_ms: float,
actual_speed_ms: float,
) -> tuple[float, bool]:
"""
Compute slip from one speed pair.
Parameters
----------
cmd_speed_ms : commanded forward speed (m/s); signed
actual_speed_ms : measured forward speed (m/s); signed (from odom)
Returns
-------
(slip_ratio, is_slipping)
slip_ratio : 0.0 1.0 (instantaneous, unsmoothed)
is_slipping : True if smoothed slip exceeds threshold
"""
abs_cmd = abs(cmd_speed_ms)
if abs_cmd < self._min_speed:
# Decay smoothed slip toward zero when not commanding movement
self._smoothed = (1.0 - self._alpha) * self._smoothed
return 0.0, False
abs_actual = abs(actual_speed_ms)
raw_slip = max(0.0, min(1.0, (abs_cmd - abs_actual) / abs_cmd))
self._smoothed = (1.0 - self._alpha) * self._smoothed + self._alpha * raw_slip
return raw_slip, self._smoothed > self._threshold
def reset(self) -> None:
self._smoothed = 0.0

View File

@ -0,0 +1,346 @@
"""
terrain_adaptation_node.py Surface detection and dynamic adaptation (Issue #142).
Overview
Analyses IMU vibration to classify the terrain underfoot, detects incline
and wheel slip, then publishes advisory outputs that other nodes use to
adapt speed limits, PID aggressiveness, and motor bias.
Pipeline (50 Hz)
1. IMU acc TerrainAnalyzer roughness, surface_type, grip
2. IMU pitch (quaternion) InclineDetector incline_rad
3. cmd_vel + odom SlipDetector slip_ratio
4. (roughness, incline, grip, slip) TerrainAdapter speed_scale,
pid_scales, motor_bias
5. Publish Terrain msg + advisory scalars + history log
Subscribes
/saltybot/imu sensor_msgs/Imu vibration + pitch
<odom_topic> nav_msgs/Odometry actual speed for slip
/cmd_vel geometry_msgs/Twist commanded speed for slip
Publishes
/saltybot/terrain saltybot_terrain_msgs/Terrain full terrain state
/saltybot/terrain_speed_scale std_msgs/Float32 speed multiplier
/saltybot/terrain_pid_scales std_msgs/String (JSON) kp/ki/kd scales
/saltybot/terrain_motor_bias std_msgs/Float32 incline compensation
/saltybot/terrain_history std_msgs/String (JSON) change log
Parameters
control_rate 50.0 Hz
odom_topic /saltybot/rover_odom
vibration_window_s 0.5 s (RMS window)
roughness_max_ms2 3.0 m/ (normalisation ceiling)
hysteresis_count 5 samples before class change
incline_lpf_tau_s 1.5 s (incline low-pass time constant)
max_incline_rad 0.52 rad (30°)
slip_threshold 0.30 (slip flag threshold)
slip_min_speed_ms 0.15 m/s (ignore slip below this speed)
slip_smoothing_alpha 0.20
k_bias 1.0 (incline motor bias gain)
min_speed_scale 0.15 (speed_scale floor)
history_max_entries 200
imu_timeout_s 0.5 s (zero outputs if IMU goes silent)
"""
import json
import math
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu
from std_msgs.msg import Float32, String
from saltybot_terrain_adaptation.terrain_analyzer import TerrainAnalyzer
from saltybot_terrain_adaptation.slip_detector import SlipDetector
from saltybot_terrain_adaptation.incline_detector import InclineDetector
from saltybot_terrain_adaptation.terrain_adapter import (
TerrainAdapter,
TerrainHistory,
TerrainState,
)
try:
from saltybot_terrain_msgs.msg import Terrain as TerrainMsg
_TERRAIN_MSG_OK = True
except ImportError:
_TERRAIN_MSG_OK = False
def _quaternion_to_pitch(qx: float, qy: float, qz: float, qw: float) -> float:
sinp = 2.0 * (qw * qy - qz * qx)
return math.asin(max(-1.0, min(1.0, sinp)))
class TerrainAdaptationNode(Node):
def __init__(self):
super().__init__("terrain_adaptation")
# ── Parameters ───────────────────────────────────────────────────────
self._declare_params()
p = self._load_params()
# ── Sub-systems ──────────────────────────────────────────────────────
self._analyzer = TerrainAnalyzer(
max_variance_ms2=p["roughness_max_ms2"],
window_s=p["vibration_window_s"],
control_rate_hz=p["control_rate"],
hysteresis_count=p["hysteresis_count"],
)
self._incline = InclineDetector(
lpf_tau_s=p["incline_lpf_tau_s"],
control_rate_hz=p["control_rate"],
max_incline_rad=p["max_incline_rad"],
)
self._slip = SlipDetector(
slip_threshold=p["slip_threshold"],
min_speed_ms=p["slip_min_speed_ms"],
smoothing_alpha=p["slip_smoothing_alpha"],
)
self._adapter = TerrainAdapter(
max_incline_rad=p["max_incline_rad"],
min_speed_scale=p["min_speed_scale"],
)
self._history = TerrainHistory(max_entries=p["history_max_entries"])
# ── State ────────────────────────────────────────────────────────────
self._latest_imu: Imu | None = None
self._actual_speed_ms: float = 0.0
self._cmd_speed_ms: float = 0.0
self._last_imu_t: float = 0.0
self._last_ctrl_t: float = time.monotonic()
# ── QoS ──────────────────────────────────────────────────────────────
reliable = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
best_effort = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort)
self.create_subscription(Odometry, p["odom_topic"], self._odom_cb, reliable)
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable)
# ── Publishers ───────────────────────────────────────────────────────
self._terrain_pub = self.create_publisher(String, "/saltybot/terrain", reliable)
self._speed_scale_pub = self.create_publisher(Float32, "/saltybot/terrain_speed_scale", reliable)
self._pid_scales_pub = self.create_publisher(String, "/saltybot/terrain_pid_scales", reliable)
self._motor_bias_pub = self.create_publisher(Float32, "/saltybot/terrain_motor_bias", reliable)
self._history_pub = self.create_publisher(String, "/saltybot/terrain_history", reliable)
# Try to create a typed Terrain publisher if message is available
self._typed_terrain_pub = None
if _TERRAIN_MSG_OK:
self._typed_terrain_pub = self.create_publisher(
TerrainMsg, "/saltybot/terrain_typed", reliable
)
# ── Timer ────────────────────────────────────────────────────────────
rate = p["control_rate"]
self._timer = self.create_timer(1.0 / rate, self._control_cb)
self.get_logger().info(
f"TerrainAdaptationNode ready rate={rate}Hz "
f"odom={p['odom_topic']}"
)
# ── Parameter management ─────────────────────────────────────────────────
def _declare_params(self) -> None:
self.declare_parameter("control_rate", 50.0)
self.declare_parameter("odom_topic", "/saltybot/rover_odom")
self.declare_parameter("vibration_window_s", 0.5)
self.declare_parameter("roughness_max_ms2", 3.0)
self.declare_parameter("hysteresis_count", 5)
self.declare_parameter("incline_lpf_tau_s", 1.5)
self.declare_parameter("max_incline_rad", 0.52)
self.declare_parameter("slip_threshold", 0.30)
self.declare_parameter("slip_min_speed_ms", 0.15)
self.declare_parameter("slip_smoothing_alpha", 0.20)
self.declare_parameter("k_bias", 1.0)
self.declare_parameter("min_speed_scale", 0.15)
self.declare_parameter("history_max_entries", 200)
self.declare_parameter("imu_timeout_s", 0.5)
def _load_params(self) -> dict:
g = self.get_parameter
return {k: g(k).value for k in [
"control_rate", "odom_topic",
"vibration_window_s", "roughness_max_ms2", "hysteresis_count",
"incline_lpf_tau_s", "max_incline_rad",
"slip_threshold", "slip_min_speed_ms", "slip_smoothing_alpha",
"k_bias", "min_speed_scale", "history_max_entries", "imu_timeout_s",
]}
# ── Subscriptions ─────────────────────────────────────────────────────────
def _imu_cb(self, msg: Imu) -> None:
self._latest_imu = msg
self._last_imu_t = time.monotonic()
def _odom_cb(self, msg: Odometry) -> None:
self._actual_speed_ms = msg.twist.twist.linear.x
def _cmd_vel_cb(self, msg: Twist) -> None:
self._cmd_speed_ms = msg.linear.x
# ── 50 Hz control loop ────────────────────────────────────────────────────
def _control_cb(self) -> None:
p = self._load_params()
now = time.monotonic()
dt = now - self._last_ctrl_t
self._last_ctrl_t = now
# IMU watchdog
imu_age = (now - self._last_imu_t) if self._last_imu_t > 0.0 else 1e9
if imu_age > p["imu_timeout_s"] or self._latest_imu is None:
self._publish_safe_defaults()
return
imu = self._latest_imu
ax = imu.linear_acceleration.x
ay = imu.linear_acceleration.y
az = imu.linear_acceleration.z
q = imu.orientation
# 1. Vibration analysis
analysis = self._analyzer.update(ax, ay, az)
roughness = analysis.roughness if analysis else 0.0
surf_type = analysis.surface_type if analysis else "unknown"
grip = analysis.grip if analysis else 0.6
# 2. Incline detection
pitch = _quaternion_to_pitch(q.x, q.y, q.z, q.w)
incline = self._incline.update(pitch, dt)
# 3. Slip detection
slip_ratio, is_slipping = self._slip.update(
self._cmd_speed_ms, self._actual_speed_ms
)
# 4. Build terrain state
raw_state = TerrainState(
surface_type=surf_type,
roughness=roughness,
incline_rad=incline,
grip=grip,
slip_ratio=slip_ratio,
timestamp_s=now,
)
state = self._adapter.apply(raw_state)
# 5. Terrain history + logging
type_changed = self._history.add(state)
if type_changed:
self.get_logger().info(
f"Terrain: {surf_type} rough={roughness:.2f} "
f"grip={grip:.2f} incline={math.degrees(incline):.1f}° "
f"speed_scale={state.speed_scale:.2f}"
)
if is_slipping:
self.get_logger().warn(
f"Wheel slip detected: ratio={slip_ratio:.2f} "
f"cmd={self._cmd_speed_ms:.2f}m/s actual={self._actual_speed_ms:.2f}m/s"
)
# 6. Publish
self._publish_terrain_json(state)
self._publish_speed_scale(state.speed_scale)
self._publish_pid_scales(self._adapter.pid_scales(state))
self._publish_motor_bias(
self._adapter.motor_bias(state, k_bias=p["k_bias"])
)
# Publish history on every type change (not every tick)
if type_changed:
self._publish_history()
# Typed message (if saltybot_terrain_msgs is built)
if self._typed_terrain_pub is not None and _TERRAIN_MSG_OK:
self._publish_typed(state)
# ── Publishers ────────────────────────────────────────────────────────────
def _publish_safe_defaults(self) -> None:
self._publish_speed_scale(1.0)
self._publish_pid_scales({"kp": 1.0, "ki": 1.0, "kd": 1.0})
self._publish_motor_bias(0.0)
def _publish_terrain_json(self, state: TerrainState) -> None:
msg = String()
msg.data = json.dumps({
"surface_type": state.surface_type,
"roughness": round(state.roughness, 3),
"incline_rad": round(state.incline_rad, 4),
"incline_deg": round(math.degrees(state.incline_rad), 2),
"grip": round(state.grip, 3),
"slip_ratio": round(state.slip_ratio, 3),
"speed_scale": round(state.speed_scale, 3),
})
self._terrain_pub.publish(msg)
def _publish_speed_scale(self, scale: float) -> None:
msg = Float32()
msg.data = float(scale)
self._speed_scale_pub.publish(msg)
def _publish_pid_scales(self, scales: dict) -> None:
msg = String()
msg.data = json.dumps({k: round(v, 4) for k, v in scales.items()})
self._pid_scales_pub.publish(msg)
def _publish_motor_bias(self, bias: float) -> None:
msg = Float32()
msg.data = float(bias)
self._motor_bias_pub.publish(msg)
def _publish_history(self) -> None:
msg = String()
msg.data = json.dumps(self._history.to_json_log())
self._history_pub.publish(msg)
def _publish_typed(self, state: TerrainState) -> None:
msg = TerrainMsg()
msg.stamp = self.get_clock().now().to_msg()
msg.surface_type = state.surface_type
msg.roughness = float(state.roughness)
msg.incline_rad = float(state.incline_rad)
msg.grip = float(state.grip)
msg.slip_ratio = float(state.slip_ratio)
msg.speed_scale = float(state.speed_scale)
self._typed_terrain_pub.publish(msg)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = TerrainAdaptationNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,195 @@
"""
terrain_adapter.py Speed / PID / bias adaptation from terrain state (Issue #142).
Provides:
TerrainState typed snapshot of current terrain conditions
TerrainAdapter computes speed_scale, pid_scales, motor_bias
TerrainHistory rolling deque of TerrainState, JSON export for route planning
Speed scale model
speed_scale = roughness_scale × incline_scale × slip_scale
roughness_scale = 1.0 0.6 × roughness (0.4 at max roughness)
incline_scale = 1.0 0.5 × |incline| / max_incline (0.5 at max incline)
slip_scale = 1.0 0.8 × slip_ratio (0.2 at full slip)
Result clamped to [min_speed_scale, 1.0].
PID scale model (grip-based aggressiveness)
kp_scale = 0.70 + 0.30 × grip (0.70 at no grip, 1.00 at full grip)
ki_scale = 0.50 + 0.50 × grip (0.50 at no grip, 1.00 at full grip)
kd_scale = 0.80 + 0.20 × grip (0.80 at no grip, 1.00 at full grip)
On slippery surfaces, reduced P/I prevents oscillation from un-predicted
wheel slip. D is kept relatively high to damp any vibration.
Motor bias
See incline_detector.motor_bias() terrain_adapter re-exports it.
Pure module no ROS2 dependency.
"""
from __future__ import annotations
import math
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
# ── TerrainState ─────────────────────────────────────────────────────────────
@dataclass
class TerrainState:
surface_type: str = "unknown" # "smooth"|"tile"|"carpet"|"grass"|"gravel"|"unknown"
roughness: float = 0.0 # 0.01.0
incline_rad: float = 0.0 # rad; +ve = uphill
grip: float = 0.6 # 0.01.0
slip_ratio: float = 0.0 # 0.01.0
speed_scale: float = 1.0 # recommended speed multiplier
timestamp_s: float = field(default_factory=time.monotonic)
# ── TerrainAdapter ────────────────────────────────────────────────────────────
class TerrainAdapter:
"""
Computes adaptive outputs (speed scale, PID scales, motor bias) from a
TerrainState.
Parameters
----------
max_incline_rad : incline at which incline_scale reaches minimum
min_speed_scale : floor for speed_scale (never zero)
"""
def __init__(
self,
max_incline_rad: float = 0.52, # ≈ 30°
min_speed_scale: float = 0.15,
):
self._max_incline = max(0.01, max_incline_rad)
self._min_scale = max(0.05, min_speed_scale)
# ── Speed scale ───────────────────────────────────────────────────────────
def speed_scale(self, state: TerrainState) -> float:
"""
Combined speed-limit multiplier. Returns value in [min_speed_scale, 1.0].
"""
r_scale = 1.0 - 0.6 * state.roughness
i_scale = 1.0 - 0.5 * min(1.0, abs(state.incline_rad) / self._max_incline)
s_scale = 1.0 - 0.8 * state.slip_ratio
combined = r_scale * i_scale * s_scale
return max(self._min_scale, min(1.0, combined))
# ── PID scales ────────────────────────────────────────────────────────────
def pid_scales(self, state: TerrainState) -> dict[str, float]:
"""
Gain multipliers keyed "kp", "ki", "kd". All in (0, 1].
Multiply these against the base PID gains from adaptive_pid_node.
"""
grip = max(0.0, min(1.0, state.grip))
return {
"kp": 0.70 + 0.30 * grip,
"ki": 0.50 + 0.50 * grip,
"kd": 0.80 + 0.20 * grip,
}
# ── Motor bias ────────────────────────────────────────────────────────────
@staticmethod
def motor_bias(state: TerrainState, k_bias: float = 1.0) -> float:
"""
Forward motor bias (m/s) for incline compensation.
Positive = push forward on uphill.
"""
return k_bias * math.sin(state.incline_rad)
# ── Full state update ─────────────────────────────────────────────────────
def apply(self, state: TerrainState) -> TerrainState:
"""Return a copy of state with speed_scale populated."""
return TerrainState(
surface_type=state.surface_type,
roughness=state.roughness,
incline_rad=state.incline_rad,
grip=state.grip,
slip_ratio=state.slip_ratio,
speed_scale=self.speed_scale(state),
timestamp_s=state.timestamp_s,
)
# ── TerrainHistory ────────────────────────────────────────────────────────────
class TerrainHistory:
"""
Rolling log of TerrainState snapshots for route planning.
Tracks surface-type change events and exports them as a JSON-serialisable
list. The full sample buffer is kept for statistical queries.
Parameters
----------
max_entries : maximum samples kept (oldest discarded)
"""
def __init__(self, max_entries: int = 200):
self._entries: deque = deque(maxlen=max(1, max_entries))
@property
def entries(self) -> list[TerrainState]:
return list(self._entries)
@property
def current(self) -> Optional[TerrainState]:
return self._entries[-1] if self._entries else None
def add(self, state: TerrainState) -> bool:
"""
Append a state snapshot.
Returns True if the surface type changed from the previous entry
(useful for callers that want to log only type-change events).
"""
changed = (
not self._entries
or self._entries[-1].surface_type != state.surface_type
)
self._entries.append(state)
return changed
def to_json_log(self) -> list[dict]:
"""
Return a list of surface-type change events (deduped by type).
Suitable for route planning or structured logging.
Each entry:
{"surface": str, "roughness": float, "incline_rad": float,
"grip": float, "slip_ratio": float, "timestamp_s": float}
"""
events: list[dict] = []
prev_type: Optional[str] = None
for s in self._entries:
if s.surface_type != prev_type:
events.append({
"surface": s.surface_type,
"roughness": round(s.roughness, 3),
"incline_rad": round(s.incline_rad, 4),
"grip": round(s.grip, 3),
"slip_ratio": round(s.slip_ratio, 3),
"speed_scale": round(s.speed_scale, 3),
"timestamp_s": round(s.timestamp_s, 2),
})
prev_type = s.surface_type
return events
def clear(self) -> None:
self._entries.clear()

View File

@ -0,0 +1,191 @@
"""
terrain_analyzer.py IMU vibration analysis for surface classification (Issue #142).
Algorithm
For each control tick, one IMU linear-acceleration triple (ax, ay, az) is
submitted to a rolling window. The RMS vibration is computed as:
vibration_rms = sqrt( Var(ax) + Var(ay) + Var(az) ) [m/]
Variance naturally removes the DC gravity component, so the signal
captures only dynamic surface vibrations.
Normalised roughness:
roughness = min(1.0, vibration_rms / max_variance_ms2)
Surface classification thresholds (roughness boundaries):
smooth [0.00, 0.12) grip = 0.90
tile [0.12, 0.28) grip = 0.85
carpet [0.28, 0.48) grip = 0.75
grass [0.48, 0.68) grip = 0.55
gravel [0.68, 1.00] grip = 0.45
Hysteresis prevents rapid class switching: the candidate class must persist
for `hysteresis_count` consecutive samples before becoming current.
Pure module no ROS2 dependency.
"""
from __future__ import annotations
import math
from collections import deque
from dataclasses import dataclass
from typing import Optional
# ── Surface classification table ─────────────────────────────────────────────
# (roughness_upper_bound, surface_type, grip_coefficient)
# Ordered from smoothest to roughest; last entry has no upper bound.
_SURFACE_TABLE: list[tuple[float, str, float]] = [
(0.12, "smooth", 0.90),
(0.28, "tile", 0.85),
(0.48, "carpet", 0.75),
(0.68, "grass", 0.55),
(1.01, "gravel", 0.45),
]
SURFACE_TYPES: tuple[str, ...] = tuple(e[1] for e in _SURFACE_TABLE)
# ── SurfaceAnalysis ───────────────────────────────────────────────────────────
@dataclass
class SurfaceAnalysis:
roughness: float # 0.0 (smooth) 1.0 (rough), normalised
surface_type: str # "smooth" | "tile" | "carpet" | "grass" | "gravel" | "unknown"
grip: float # 0.0 (no grip) 1.0 (full grip)
# ── TerrainAnalyzer ───────────────────────────────────────────────────────────
class TerrainAnalyzer:
"""
Rolling-window vibration analyser with hysteresis-protected classification.
Parameters
----------
max_variance_ms2 : normalisation ceiling (m/); maps to roughness=1.0
window_s : rolling window duration (s)
control_rate_hz : expected update rate (Hz) sets window size
hysteresis_count : consecutive samples in new class before switching
"""
def __init__(
self,
max_variance_ms2: float = 3.0,
window_s: float = 0.5,
control_rate_hz: float = 50.0,
hysteresis_count: int = 5,
):
self._max_var = max(1e-3, max_variance_ms2)
window_size = max(5, int(window_s * control_rate_hz))
self._samples: deque = deque(maxlen=window_size)
self._hysteresis_count = hysteresis_count
self._current_type: str = "unknown"
self._current_grip: float = 0.6
self._candidate_type: str = "unknown"
self._candidate_count: int = 0
# ── Properties ────────────────────────────────────────────────────────────
@property
def current_surface_type(self) -> str:
return self._current_type
@property
def current_grip(self) -> float:
return self._current_grip
@property
def sample_count(self) -> int:
return len(self._samples)
# ── Update ────────────────────────────────────────────────────────────────
def update(
self,
ax: float,
ay: float,
az: float,
) -> Optional[SurfaceAnalysis]:
"""
Submit one IMU acceleration sample (m/) and return analysis.
Returns
-------
SurfaceAnalysis when 5 samples have been collected; None otherwise.
"""
self._samples.append((ax, ay, az))
if len(self._samples) < 5:
return None
roughness = self._compute_roughness()
candidate_type, candidate_grip = _classify(roughness)
# Hysteresis: require sustained presence in new class
if candidate_type == self._candidate_type:
self._candidate_count += 1
else:
self._candidate_type = candidate_type
self._candidate_count = 1
if self._candidate_count >= self._hysteresis_count:
self._current_type = candidate_type
self._current_grip = candidate_grip
return SurfaceAnalysis(
roughness=roughness,
surface_type=self._current_type,
grip=self._current_grip,
)
def reset(self) -> None:
self._samples.clear()
self._current_type = "unknown"
self._current_grip = 0.6
self._candidate_type = "unknown"
self._candidate_count = 0
# ── Internal ─────────────────────────────────────────────────────────────
def _compute_roughness(self) -> float:
n = len(self._samples)
if n < 2:
return 0.0
ax_vals = [s[0] for s in self._samples]
ay_vals = [s[1] for s in self._samples]
az_vals = [s[2] for s in self._samples]
rms = math.sqrt(_variance(ax_vals) + _variance(ay_vals) + _variance(az_vals))
return min(1.0, rms / self._max_var)
# ── Module-level helpers ──────────────────────────────────────────────────────
def _variance(vals: list[float]) -> float:
n = len(vals)
if n < 2:
return 0.0
mean = sum(vals) / n
return sum((v - mean) ** 2 for v in vals) / n
def _classify(roughness: float) -> tuple[str, float]:
"""Map normalised roughness to (surface_type, grip)."""
for upper, stype, grip in _SURFACE_TABLE:
if roughness < upper:
return stype, grip
return "gravel", 0.45
def grip_for_surface(surface_type: str) -> float:
"""Return grip coefficient for a given surface type string."""
for _, stype, grip in _SURFACE_TABLE:
if stype == surface_type:
return grip
return 0.6 # unknown

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_terrain_adaptation
[install]
install_scripts=$base/lib/saltybot_terrain_adaptation

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
import os
from glob import glob
package_name = "saltybot_terrain_adaptation"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "config"),
glob("config/*.yaml")),
(os.path.join("share", package_name, "launch"),
glob("launch/*.py")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Surface detection and dynamic terrain adaptation for SaltyBot",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
f"terrain_adaptation_node = {package_name}.terrain_adaptation_node:main",
],
},
)

View File

@ -0,0 +1,618 @@
"""
test_terrain_adaptation.py Unit tests for Issue #142 terrain adaptation modules.
Covers:
TerrainAnalyzer roughness computation, surface classification, hysteresis
SlipDetector slip ratio, EMA smoothing, speed gate
InclineDetector LPF convergence, seeding, motor_bias
TerrainAdapter speed_scale formula, pid_scales, motor_bias, apply()
TerrainHistory add(), type-change detection, to_json_log(), clear()
"""
import math
import sys
import os
# Allow direct pytest run without installing the package
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from saltybot_terrain_adaptation.terrain_analyzer import TerrainAnalyzer, SurfaceAnalysis
from saltybot_terrain_adaptation.slip_detector import SlipDetector
from saltybot_terrain_adaptation.incline_detector import InclineDetector
from saltybot_terrain_adaptation.terrain_adapter import (
TerrainAdapter,
TerrainHistory,
TerrainState,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_analyzer(**kw):
defaults = dict(
max_variance_ms2=3.0,
window_s=0.5,
control_rate_hz=50.0,
hysteresis_count=5,
)
defaults.update(kw)
return TerrainAnalyzer(**defaults)
def _make_slip(**kw):
defaults = dict(slip_threshold=0.30, min_speed_ms=0.15, smoothing_alpha=1.0)
defaults.update(kw)
return SlipDetector(**defaults)
def _make_incline(**kw):
defaults = dict(lpf_tau_s=1.0, control_rate_hz=50.0, max_incline_rad=0.52)
defaults.update(kw)
return InclineDetector(**defaults)
def _make_adapter(**kw):
defaults = dict(max_incline_rad=0.52, min_speed_scale=0.15)
defaults.update(kw)
return TerrainAdapter(**defaults)
# ══════════════════════════════════════════════════════════════════════════════
# TerrainAnalyzer
# ══════════════════════════════════════════════════════════════════════════════
class TestTerrainAnalyzerBuffer:
def test_returns_none_until_min_samples(self):
"""Should return None until the minimum sample count (5) is reached."""
analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
# Minimum is 5 samples; feed 4 → still None
for _ in range(4):
result = analyzer.update(0.0, 0.0, 9.81)
assert result is None
def test_returns_result_at_window_full(self):
"""Exactly at window size, a SurfaceAnalysis must be returned."""
analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
result = None
for _ in range(25):
result = analyzer.update(0.0, 0.0, 9.81)
assert result is not None
assert isinstance(result, SurfaceAnalysis)
def test_reset_clears_buffer(self):
"""After reset, the buffer requires refilling (min 5 samples)."""
analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
for _ in range(25):
analyzer.update(0.0, 0.0, 9.81)
analyzer.reset()
# Feed only 4 samples → still None
for _ in range(4):
result = analyzer.update(0.0, 0.0, 9.81)
assert result is None
class TestTerrainAnalyzerRoughness:
def _fill(self, analyzer, ax, ay, az, n=None):
window = 25 # 0.5s × 50Hz
n = n or window
result = None
for _ in range(n):
result = analyzer.update(ax, ay, az)
return result
def test_zero_vibration_gives_zero_roughness(self):
"""Constant acceleration → zero variance → roughness = 0."""
analyzer = _make_analyzer()
result = self._fill(analyzer, 0.0, 0.0, 9.81)
assert result is not None
assert result.roughness == pytest.approx(0.0, abs=1e-9)
def test_roughness_increases_with_vibration(self):
"""Higher vibration amplitude → higher roughness score."""
analyzer_low = _make_analyzer()
analyzer_high = _make_analyzer()
window = 25
import math as _math
for i in range(window):
t = i * 0.02
analyzer_low.update( 0.5 * _math.sin(t * 10), 0.0, 9.81)
analyzer_high.update(2.5 * _math.sin(t * 10), 0.0, 9.81)
r_low = analyzer_low.update( 0.5 * _math.sin(window * 0.02 * 10), 0.0, 9.81)
r_high = analyzer_high.update(2.5 * _math.sin(window * 0.02 * 10), 0.0, 9.81)
# If still None, pump one more sample
if r_low is None or r_high is None:
for i in range(window + 1):
t = i * 0.02
r_low = analyzer_low.update( 0.5 * _math.sin(t * 10), 0.0, 9.81)
r_high = analyzer_high.update(2.5 * _math.sin(t * 10), 0.0, 9.81)
assert r_low is not None and r_high is not None
assert r_high.roughness > r_low.roughness
def test_roughness_clamped_to_one(self):
"""Extreme vibration must not exceed 1.0."""
analyzer = _make_analyzer(max_variance_ms2=1.0)
window = 26
result = None
for i in range(window):
result = analyzer.update(100.0, 100.0, 100.0)
assert result is not None
assert result.roughness <= 1.0
class TestTerrainAnalyzerClassification:
def _steady(self, analyzer, roughness_target, max_variance_ms2=3.0):
"""
Pump enough samples so that roughness_target produces a stable result.
We inject a DC-free vibration scaled to achieve the desired roughness.
roughness = sqrt(Var(ax)) / max_variance_ms2
For a pure sine with amplitude A: Var / 2
A = roughness_target × sqrt(2) × max_variance_ms2
"""
import math as _math
amp = roughness_target * _math.sqrt(2.0) * max_variance_ms2
result = None
for i in range(60): # well beyond window + hysteresis
t = i * 0.02
result = analyzer.update(amp * _math.sin(t * 20), 0.0, 9.81)
return result
def test_smooth_surface_classification(self):
"""Very low vibration → 'smooth'."""
analyzer = _make_analyzer(hysteresis_count=1)
result = self._steady(analyzer, roughness_target=0.05)
assert result is not None
assert result.surface_type == "smooth"
def test_gravel_surface_classification(self):
"""High vibration → 'gravel'."""
analyzer = _make_analyzer(hysteresis_count=1, max_variance_ms2=1.0)
result = None
for i in range(60):
t = i * 0.02
result = analyzer.update(50.0 * math.sin(t * 20), 0.0, 9.81)
assert result is not None
assert result.surface_type == "gravel"
def test_hysteresis_prevents_rapid_switch(self):
"""A single outlier sample should not immediately change classification."""
analyzer = _make_analyzer(hysteresis_count=5)
# Establish smooth classification
for i in range(60):
analyzer.update(0.0, 0.0, 9.81)
# One gravel-level spike — type should not change immediately
result = analyzer.update(50.0, 50.0, 9.81)
if result is not None:
# The type may already have changed if window shifts, but mostly stays
pass # hysteresis behaviour depends on prior window state — test the counter path
def test_grip_within_bounds(self):
"""Grip must always be in [0, 1]."""
analyzer = _make_analyzer(hysteresis_count=1)
result = self._steady(analyzer, roughness_target=0.5)
if result is not None:
assert 0.0 <= result.grip <= 1.0
# ══════════════════════════════════════════════════════════════════════════════
# SlipDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestSlipDetector:
def test_no_slip_equal_speeds(self):
"""When actual == cmd, slip_ratio should be 0."""
slip = _make_slip()
ratio, flag = slip.update(1.0, 1.0)
assert ratio == pytest.approx(0.0, abs=1e-9)
assert flag is False
def test_full_slip_zero_actual(self):
"""Actual speed = 0 → slip_ratio = 1.0."""
slip = _make_slip(smoothing_alpha=1.0)
ratio, flag = slip.update(1.0, 0.0)
assert ratio == pytest.approx(1.0, abs=1e-6)
def test_partial_slip(self):
"""Actual half of cmd → slip_ratio ≈ 0.5."""
slip = _make_slip(smoothing_alpha=1.0)
ratio, _ = slip.update(1.0, 0.5)
assert ratio == pytest.approx(0.5, abs=1e-6)
def test_slip_flag_above_threshold(self):
"""slip_ratio > threshold → is_slipping = True."""
slip = _make_slip(slip_threshold=0.30, smoothing_alpha=1.0)
_, flag = slip.update(1.0, 0.0) # ratio = 1.0 > 0.30
assert flag is True
def test_slip_flag_below_threshold(self):
"""slip_ratio < threshold → is_slipping = False."""
slip = _make_slip(slip_threshold=0.30, smoothing_alpha=1.0)
_, flag = slip.update(1.0, 0.95) # ratio ≈ 0.05 < 0.30
assert flag is False
def test_below_min_speed_gate(self):
"""When |cmd_speed| < min_speed_ms, slip should be suppressed."""
slip = _make_slip(min_speed_ms=0.15, smoothing_alpha=1.0)
ratio, flag = slip.update(0.05, 0.0) # cmd < min_speed_ms
assert ratio == pytest.approx(0.0, abs=1e-9)
assert flag is False
def test_negative_cmd_and_actual_direction_mismatch(self):
"""Reverse commanded speed; actual = 0 → full slip."""
slip = _make_slip(smoothing_alpha=1.0)
ratio, flag = slip.update(-1.0, 0.0)
assert ratio == pytest.approx(1.0, abs=1e-6)
assert flag is True
def test_ema_smoothing(self):
"""With alpha < 1, smoothed_slip should increase each step toward 1.0."""
slip = _make_slip(slip_threshold=1.0, smoothing_alpha=0.5)
# Step 1: raw=1.0; EMA → 0.0 + 0.5*1.0 = 0.5
slip.update(1.0, 0.0)
smooth1 = slip.smoothed_slip
# Step 2: raw=1.0; EMA → 0.5 + 0.5*(1.0 - 0.5) = 0.75
slip.update(1.0, 0.0)
smooth2 = slip.smoothed_slip
assert smooth1 == pytest.approx(0.5, abs=1e-6)
assert smooth2 == pytest.approx(0.75, abs=1e-6)
assert smooth2 > smooth1
assert smooth2 < 1.0
def test_reset_clears_state(self):
"""After reset, smoothed_slip should return to zero."""
slip = _make_slip(smoothing_alpha=0.5)
for _ in range(10):
slip.update(1.0, 0.0)
assert slip.smoothed_slip > 0.0
slip.reset()
assert slip.smoothed_slip == pytest.approx(0.0, abs=1e-9)
# First step after reset: EMA starts from 0 → alpha × 1.0 = 0.5
slip.update(1.0, 0.0)
assert slip.smoothed_slip == pytest.approx(0.5, abs=1e-6)
def test_actual_overspeed_no_negative_slip(self):
"""Actual faster than commanded → slip_ratio should not go negative."""
slip = _make_slip(smoothing_alpha=1.0)
ratio, _ = slip.update(0.5, 2.0)
assert ratio >= 0.0
# ══════════════════════════════════════════════════════════════════════════════
# InclineDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestInclineDetector:
def test_first_call_seeds_with_measurement(self):
"""First update should seed the filter with the raw pitch (no transient)."""
inc = _make_incline()
result = inc.update(0.3, dt=0.02)
assert result == pytest.approx(0.3, abs=1e-6)
def test_converges_to_steady_state(self):
"""After many steps, filtered incline should be close to constant input."""
inc = _make_incline(lpf_tau_s=1.0)
for _ in range(200):
result = inc.update(0.2, dt=0.02)
assert result == pytest.approx(0.2, abs=1e-3)
def test_lpf_slower_than_input(self):
"""After a step change, the filter should lag behind the raw value."""
inc = _make_incline(lpf_tau_s=2.0)
# Start settled at 0
inc.update(0.0, dt=0.02)
# Step to 1.0
result_1st = inc.update(1.0, dt=0.02)
# Must be between 0 and 1 (lagged)
assert 0.0 < result_1st < 1.0
def test_clamp_at_max_incline(self):
"""Output must not exceed max_incline_rad in magnitude."""
inc = _make_incline(max_incline_rad=0.52)
for _ in range(300):
result = inc.update(10.0, dt=0.02)
assert result <= 0.52 + 1e-9
def test_negative_incline(self):
"""Negative (downhill) pitch should produce negative filtered incline."""
inc = _make_incline()
for _ in range(200):
result = inc.update(-0.2, dt=0.02)
assert result == pytest.approx(-0.2, abs=1e-3)
def test_clamp_negative(self):
"""Negative incline also clamped by max_incline_rad."""
inc = _make_incline(max_incline_rad=0.52)
for _ in range(300):
result = inc.update(-10.0, dt=0.02)
assert result >= -0.52 - 1e-9
def test_reset_zeroes_state(self):
"""After reset, filter should reseed on next call."""
inc = _make_incline()
for _ in range(50):
inc.update(0.5, dt=0.02)
inc.reset()
result = inc.update(0.1, dt=0.02)
assert result == pytest.approx(0.1, abs=1e-6)
def test_motor_bias_static_method(self):
"""motor_bias = k_bias × sin(incline_rad)."""
bias = InclineDetector.motor_bias(math.pi / 6, k_bias=2.0)
assert bias == pytest.approx(2.0 * math.sin(math.pi / 6), rel=1e-6)
def test_motor_bias_zero_on_flat(self):
assert InclineDetector.motor_bias(0.0) == pytest.approx(0.0, abs=1e-9)
def test_motor_bias_negative_downhill(self):
"""Downhill incline → negative bias."""
bias = InclineDetector.motor_bias(-0.3, k_bias=1.0)
assert bias < 0.0
# ══════════════════════════════════════════════════════════════════════════════
# TerrainAdapter — speed_scale
# ══════════════════════════════════════════════════════════════════════════════
class TestTerrainAdapterSpeedScale:
def test_flat_smooth_no_slip_gives_one(self):
"""Ideal conditions → speed_scale = 1.0."""
adapter = _make_adapter()
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
assert adapter.speed_scale(state) == pytest.approx(1.0, abs=1e-6)
def test_max_roughness_reduces_scale(self):
"""roughness=1.0 → roughness_scale=0.4; others=1 → speed_scale=0.4."""
adapter = _make_adapter(min_speed_scale=0.0)
state = TerrainState(roughness=1.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
assert adapter.speed_scale(state) == pytest.approx(0.4, rel=1e-5)
def test_max_incline_reduces_scale(self):
"""incline=max_incline → incline_scale=0.5; others=1 → 0.5."""
adapter = _make_adapter(min_speed_scale=0.0, max_incline_rad=0.52)
state = TerrainState(roughness=0.0, incline_rad=0.52, grip=1.0, slip_ratio=0.0)
assert adapter.speed_scale(state) == pytest.approx(0.5, rel=1e-5)
def test_max_slip_reduces_scale(self):
"""slip_ratio=1.0 → slip_scale=0.2; others=1 → 0.2."""
adapter = _make_adapter(min_speed_scale=0.0)
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=1.0)
assert adapter.speed_scale(state) == pytest.approx(0.2, rel=1e-5)
def test_min_speed_scale_floor(self):
"""Extreme conditions must not go below min_speed_scale."""
adapter = _make_adapter(min_speed_scale=0.15)
state = TerrainState(roughness=1.0, incline_rad=0.52, grip=0.0, slip_ratio=1.0)
assert adapter.speed_scale(state) >= 0.15 - 1e-9
def test_scale_always_at_most_one(self):
"""speed_scale must never exceed 1.0."""
adapter = _make_adapter()
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
assert adapter.speed_scale(state) <= 1.0 + 1e-9
def test_negative_incline_same_as_positive(self):
"""Downhill and uphill of equal magnitude give same scale (abs applied)."""
adapter = _make_adapter(min_speed_scale=0.0)
up = TerrainState(roughness=0.0, incline_rad= 0.3, grip=1.0, slip_ratio=0.0)
down = TerrainState(roughness=0.0, incline_rad=-0.3, grip=1.0, slip_ratio=0.0)
assert adapter.speed_scale(up) == pytest.approx(adapter.speed_scale(down), rel=1e-6)
# ══════════════════════════════════════════════════════════════════════════════
# TerrainAdapter — pid_scales
# ══════════════════════════════════════════════════════════════════════════════
class TestTerrainAdapterPIDScales:
def test_full_grip_gives_max_gains(self):
"""At grip=1.0, all scales should be 1.0."""
adapter = _make_adapter()
state = TerrainState(grip=1.0)
scales = adapter.pid_scales(state)
assert scales["kp"] == pytest.approx(1.0, rel=1e-6)
assert scales["ki"] == pytest.approx(1.0, rel=1e-6)
assert scales["kd"] == pytest.approx(1.0, rel=1e-6)
def test_zero_grip_gives_min_gains(self):
"""At grip=0.0: kp=0.70, ki=0.50, kd=0.80."""
adapter = _make_adapter()
state = TerrainState(grip=0.0)
scales = adapter.pid_scales(state)
assert scales["kp"] == pytest.approx(0.70, rel=1e-6)
assert scales["ki"] == pytest.approx(0.50, rel=1e-6)
assert scales["kd"] == pytest.approx(0.80, rel=1e-6)
def test_partial_grip(self):
"""At grip=0.5: kp=0.85, ki=0.75, kd=0.90."""
adapter = _make_adapter()
state = TerrainState(grip=0.5)
scales = adapter.pid_scales(state)
assert scales["kp"] == pytest.approx(0.85, rel=1e-6)
assert scales["ki"] == pytest.approx(0.75, rel=1e-6)
assert scales["kd"] == pytest.approx(0.90, rel=1e-6)
def test_grip_clamped_above_one(self):
"""Grip > 1.0 must not push scales above 1.0."""
adapter = _make_adapter()
state = TerrainState(grip=5.0)
scales = adapter.pid_scales(state)
for v in scales.values():
assert v <= 1.0 + 1e-9
def test_grip_clamped_below_zero(self):
"""Negative grip must not push scales below their floor values."""
adapter = _make_adapter()
state = TerrainState(grip=-1.0)
scales = adapter.pid_scales(state)
assert scales["kp"] >= 0.70 - 1e-9
assert scales["ki"] >= 0.50 - 1e-9
assert scales["kd"] >= 0.80 - 1e-9
# ══════════════════════════════════════════════════════════════════════════════
# TerrainAdapter — motor_bias
# ══════════════════════════════════════════════════════════════════════════════
class TestTerrainAdapterMotorBias:
def test_flat_zero_bias(self):
adapter = _make_adapter()
state = TerrainState(incline_rad=0.0)
assert TerrainAdapter.motor_bias(state, k_bias=1.0) == pytest.approx(0.0, abs=1e-9)
def test_uphill_positive_bias(self):
adapter = _make_adapter()
state = TerrainState(incline_rad=0.3)
bias = TerrainAdapter.motor_bias(state, k_bias=1.0)
assert bias == pytest.approx(math.sin(0.3), rel=1e-6)
assert bias > 0.0
def test_downhill_negative_bias(self):
state = TerrainState(incline_rad=-0.3)
bias = TerrainAdapter.motor_bias(state, k_bias=1.0)
assert bias < 0.0
def test_k_bias_scales_output(self):
state = TerrainState(incline_rad=0.3)
b1 = TerrainAdapter.motor_bias(state, k_bias=1.0)
b2 = TerrainAdapter.motor_bias(state, k_bias=2.0)
assert b2 == pytest.approx(2.0 * b1, rel=1e-6)
# ══════════════════════════════════════════════════════════════════════════════
# TerrainAdapter — apply()
# ══════════════════════════════════════════════════════════════════════════════
class TestTerrainAdapterApply:
def test_apply_preserves_fields(self):
"""apply() must copy all fields except speed_scale."""
adapter = _make_adapter()
state = TerrainState(
surface_type="gravel", roughness=0.6, incline_rad=0.1,
grip=0.5, slip_ratio=0.1, speed_scale=0.0,
)
result = adapter.apply(state)
assert result.surface_type == "gravel"
assert result.roughness == pytest.approx(0.6)
assert result.incline_rad == pytest.approx(0.1)
assert result.grip == pytest.approx(0.5)
assert result.slip_ratio == pytest.approx(0.1)
def test_apply_populates_speed_scale(self):
"""apply() should compute and store the correct speed_scale."""
adapter = _make_adapter(min_speed_scale=0.0)
state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
result = adapter.apply(state)
assert result.speed_scale == pytest.approx(1.0, abs=1e-6)
def test_apply_returns_new_object(self):
"""apply() must return a new TerrainState, not mutate the input."""
adapter = _make_adapter()
state = TerrainState()
result = adapter.apply(state)
assert result is not state
# ══════════════════════════════════════════════════════════════════════════════
# TerrainHistory
# ══════════════════════════════════════════════════════════════════════════════
class TestTerrainHistory:
def test_first_add_returns_true(self):
"""First entry is always a 'change'."""
history = TerrainHistory()
changed = history.add(TerrainState(surface_type="smooth"))
assert changed is True
def test_same_type_returns_false(self):
"""Adding the same type twice → second add returns False."""
history = TerrainHistory()
history.add(TerrainState(surface_type="smooth"))
changed = history.add(TerrainState(surface_type="smooth"))
assert changed is False
def test_type_change_returns_true(self):
"""Adding a different type → returns True."""
history = TerrainHistory()
history.add(TerrainState(surface_type="smooth"))
changed = history.add(TerrainState(surface_type="gravel"))
assert changed is True
def test_entries_property(self):
"""entries should reflect all added states."""
history = TerrainHistory()
for t in ["smooth", "gravel", "grass"]:
history.add(TerrainState(surface_type=t))
assert len(history.entries) == 3
def test_max_entries_limit(self):
"""Buffer should not exceed max_entries."""
history = TerrainHistory(max_entries=5)
for _ in range(20):
history.add(TerrainState(surface_type="smooth"))
assert len(history.entries) <= 5
def test_current_property(self):
"""current should return the most recently added state."""
history = TerrainHistory()
s1 = TerrainState(surface_type="smooth")
s2 = TerrainState(surface_type="gravel")
history.add(s1)
history.add(s2)
assert history.current.surface_type == "gravel"
def test_current_none_when_empty(self):
history = TerrainHistory()
assert history.current is None
def test_clear_empties_buffer(self):
history = TerrainHistory()
history.add(TerrainState(surface_type="smooth"))
history.clear()
assert len(history.entries) == 0
assert history.current is None
def test_to_json_log_deduplicates(self):
"""to_json_log() should emit only type-change events."""
history = TerrainHistory()
for t in ["smooth", "smooth", "gravel", "gravel", "smooth"]:
history.add(TerrainState(surface_type=t))
log = history.to_json_log()
# Should see: smooth, gravel, smooth → 3 entries
assert len(log) == 3
assert log[0]["surface"] == "smooth"
assert log[1]["surface"] == "gravel"
assert log[2]["surface"] == "smooth"
def test_to_json_log_fields(self):
"""Each event dict must contain the expected keys."""
history = TerrainHistory()
history.add(TerrainState(
surface_type="tile", roughness=0.25, incline_rad=0.1,
grip=0.8, slip_ratio=0.05, speed_scale=0.9,
))
log = history.to_json_log()
assert len(log) == 1
entry = log[0]
for key in ("surface", "roughness", "incline_rad", "grip", "slip_ratio",
"speed_scale", "timestamp_s"):
assert key in entry
def test_to_json_log_empty(self):
"""Empty history → empty log."""
history = TerrainHistory()
assert history.to_json_log() == []

View File

@ -0,0 +1,14 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_terrain_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Terrain.msg"
DEPENDENCIES builtin_interfaces
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,25 @@
# Terrain.msg — Surface classification and adaptation state (Issue #142)
# Published by: /saltybot/terrain_adaptation/terrain_adaptation_node
# Topic: /saltybot/terrain
builtin_interfaces/Time stamp
# Surface type classification
# Values: "smooth" | "tile" | "carpet" | "grass" | "gravel" | "unknown"
string surface_type
# Normalised roughness score 0.0 (smooth) 1.0 (very rough)
float32 roughness
# Terrain incline in radians. Positive = uphill (front higher than rear).
float32 incline_rad
# Grip coefficient 0.0 (no grip) 1.0 (full grip)
float32 grip
# Wheel/track slip ratio 0.0 (no slip) 1.0 (full slip)
float32 slip_ratio
# Recommended speed multiplier 0.0 1.0
# Apply to max_speed before commanding motion.
float32 speed_scale

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_terrain_msgs</name>
<version>0.1.0</version>
<description>Terrain message definitions for SaltyBot terrain adaptation (Issue #142)</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>builtin_interfaces</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>