diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/config/terrain_params.yaml b/jetson/ros2_ws/src/saltybot_terrain_adaptation/config/terrain_params.yaml
new file mode 100644
index 0000000..389201b
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/config/terrain_params.yaml
@@ -0,0 +1,41 @@
+/**:
+ ros__parameters:
+ # Control loop rate (Hz)
+ control_rate: 50.0
+
+ # Odometry topic for slip detection
+ odom_topic: "/saltybot/rover_odom"
+
+ # ── TerrainAnalyzer ────────────────────────────────────────────────────────
+ # RMS window length (seconds)
+ vibration_window_s: 0.5
+ # Normalisation ceiling for roughness score (m/s²)
+ roughness_max_ms2: 3.0
+ # Hysteresis: samples required before surface-type changes
+ hysteresis_count: 5
+
+ # ── InclineDetector ────────────────────────────────────────────────────────
+ # Low-pass filter time constant for incline (seconds)
+ incline_lpf_tau_s: 1.5
+ # Incline at which incline_scale reaches minimum (~30°)
+ max_incline_rad: 0.52
+
+ # ── SlipDetector ───────────────────────────────────────────────────────────
+ # Slip ratio above which is_slipping flag is raised
+ slip_threshold: 0.30
+ # Minimum commanded speed before slip detection is active (m/s)
+ slip_min_speed_ms: 0.15
+ # EMA smoothing factor for slip ratio
+ slip_smoothing_alpha: 0.20
+
+ # ── TerrainAdapter ─────────────────────────────────────────────────────────
+ # Incline-to-motor-bias gain (multiplied by sin(incline_rad))
+ k_bias: 1.0
+ # Speed scale floor — never command below this fraction of max speed
+ min_speed_scale: 0.15
+
+ # ── Misc ───────────────────────────────────────────────────────────────────
+ # Rolling history buffer size
+ history_max_entries: 200
+ # IMU silence timeout before safe-defaults are published (seconds)
+ imu_timeout_s: 0.5
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/launch/terrain_adaptation.launch.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/launch/terrain_adaptation.launch.py
new file mode 100644
index 0000000..146a051
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/launch/terrain_adaptation.launch.py
@@ -0,0 +1,53 @@
+"""
+terrain_adaptation.launch.py — Launch the terrain adaptation node (Issue #142).
+
+Usage
+-----
+ ros2 launch saltybot_terrain_adaptation terrain_adaptation.launch.py
+ ros2 launch saltybot_terrain_adaptation terrain_adaptation.launch.py \
+ odom_topic:=/saltybot/rover_odom control_rate:=50.0
+"""
+
+import os
+
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg_share = get_package_share_directory("saltybot_terrain_adaptation")
+ default_params = os.path.join(pkg_share, "config", "terrain_params.yaml")
+
+ return LaunchDescription([
+ DeclareLaunchArgument(
+ "params_file",
+ default_value=default_params,
+ description="Path to terrain_params.yaml",
+ ),
+ DeclareLaunchArgument(
+ "odom_topic",
+ default_value="/saltybot/rover_odom",
+ description="Odometry topic for wheel-slip detection",
+ ),
+ DeclareLaunchArgument(
+ "control_rate",
+ default_value="50.0",
+ description="Control loop rate (Hz)",
+ ),
+ Node(
+ package="saltybot_terrain_adaptation",
+ executable="terrain_adaptation_node",
+ name="terrain_adaptation",
+ output="screen",
+ parameters=[
+ LaunchConfiguration("params_file"),
+ {
+ "odom_topic": LaunchConfiguration("odom_topic"),
+ "control_rate": LaunchConfiguration("control_rate"),
+ },
+ ],
+ ),
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/package.xml b/jetson/ros2_ws/src/saltybot_terrain_adaptation/package.xml
new file mode 100644
index 0000000..1c8802d
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/package.xml
@@ -0,0 +1,24 @@
+
+
+
+ saltybot_terrain_adaptation
+ 0.1.0
+ Surface detection and dynamic terrain adaptation for SaltyBot (Issue #142)
+ sl-controls
+ MIT
+
+ rclpy
+ sensor_msgs
+ nav_msgs
+ geometry_msgs
+ std_msgs
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/resource/saltybot_terrain_adaptation b/jetson/ros2_ws/src/saltybot_terrain_adaptation/resource/saltybot_terrain_adaptation
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/__init__.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/incline_detector.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/incline_detector.py
new file mode 100644
index 0000000..a334590
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/incline_detector.py
@@ -0,0 +1,106 @@
+"""
+incline_detector.py — Terrain incline detection from IMU pitch (Issue #142).
+
+Method
+──────
+ For a ground robot (rover/tank), the vehicle body tilts with the terrain,
+ so IMU pitch ≈ terrain incline + vibration noise.
+
+ A first-order discrete low-pass filter separates the slow incline component
+ from fast vibrations and balance corrections:
+
+ incline_filtered[n] = (1−α) · incline_filtered[n−1] + α · pitch_raw[n]
+ α = dt / (τ + dt)
+
+ The motor bias required to compensate for gravity on the incline is:
+
+ bias_ms = k_bias · sin(incline_rad)
+
+ Positive incline = uphill (front higher than rear) → positive forward bias
+ needed to counteract gravity pulling the robot backward.
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+import math
+
+
+class InclineDetector:
+ """
+ Incline estimator via first-order low-pass filter on IMU pitch.
+
+ Parameters
+ ----------
+ lpf_tau_s : low-pass filter time constant (s); larger = smoother
+ control_rate_hz : expected update rate (Hz)
+ max_incline_rad : hard clamp on reported incline magnitude (rad)
+ """
+
+ def __init__(
+ self,
+ lpf_tau_s: float = 1.5,
+ control_rate_hz: float = 50.0,
+ max_incline_rad: float = 0.70, # ≈ 40°
+ ):
+ self._tau = max(1e-3, lpf_tau_s)
+ self._dt = 1.0 / max(1.0, control_rate_hz)
+ self._max = abs(max_incline_rad)
+
+ self._incline_rad: float = 0.0
+ self._initialised: bool = False
+
+ # ── Properties ────────────────────────────────────────────────────────────
+
+ @property
+ def incline_rad(self) -> float:
+ return self._incline_rad
+
+ # ── Update ────────────────────────────────────────────────────────────────
+
+ def update(self, pitch_rad: float, dt: float | None = None) -> float:
+ """
+ Submit one pitch sample and return the filtered incline estimate.
+
+ Parameters
+ ----------
+ pitch_rad : IMU pitch angle (rad); +ve = front-up / uphill
+ dt : elapsed time since last sample (s); uses 1/rate if None
+
+ Returns
+ -------
+ Filtered incline in radians, clamped to ±max_incline_rad.
+ """
+ step = dt if dt is not None and dt > 0.0 else self._dt
+
+ if not self._initialised:
+ # Seed filter with first measurement to avoid large initial transient
+ self._incline_rad = pitch_rad
+ self._initialised = True
+ return max(-self._max, min(self._max, self._incline_rad))
+
+ alpha = step / (self._tau + step)
+ self._incline_rad = (1.0 - alpha) * self._incline_rad + alpha * pitch_rad
+ return max(-self._max, min(self._max, self._incline_rad))
+
+ def reset(self) -> None:
+ self._incline_rad = 0.0
+ self._initialised = False
+
+ # ── Motor bias ────────────────────────────────────────────────────────────
+
+ @staticmethod
+ def motor_bias(incline_rad: float, k_bias: float = 1.0) -> float:
+ """
+ Forward motor bias (m/s) to compensate for gravitational pull on incline.
+
+ Positive bias = add forward thrust for uphill.
+ Negative bias = reduce thrust / add braking for downhill.
+
+ Parameters
+ ----------
+ incline_rad : filtered incline angle (rad)
+ k_bias : tuning gain; scale bias magnitude
+ """
+ return k_bias * math.sin(incline_rad)
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/slip_detector.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/slip_detector.py
new file mode 100644
index 0000000..db0a3c1
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/slip_detector.py
@@ -0,0 +1,91 @@
+"""
+slip_detector.py — Wheel slip detection for SaltyBot (Issue #142).
+
+Method
+──────
+ Compare commanded speed (from /cmd_vel) to measured speed (from odometry).
+ When the robot is commanded to move but the measured speed falls significantly
+ below the command, the difference is attributed to wheel/track slip.
+
+ slip_ratio = max(0, |v_cmd| − |v_actual|) / |v_cmd|
+
+ slip_ratio = 0.0 → no slip (actual matches commanded)
+ slip_ratio = 1.0 → full slip (wheels spinning, no forward motion)
+
+ Detection is suppressed below `min_speed_ms` to avoid noise at low speed.
+
+ A smoothed slip estimate is maintained via exponential moving average
+ (alpha per update) to reduce transient spikes.
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+
+class SlipDetector:
+ """
+ Wheel/track slip detector from commanded vs measured speed.
+
+ Parameters
+ ----------
+ slip_threshold : slip_ratio above which is_slipping = True
+ min_speed_ms : minimum |v_cmd| to attempt detection (avoids noise)
+ smoothing_alpha : EMA coefficient ∈ (0, 1]; 1 = no smoothing
+ """
+
+ def __init__(
+ self,
+ slip_threshold: float = 0.30,
+ min_speed_ms: float = 0.15,
+ smoothing_alpha: float = 0.20,
+ ):
+ self._threshold = max(0.0, min(1.0, slip_threshold))
+ self._min_speed = abs(min_speed_ms)
+ self._alpha = max(1e-6, min(1.0, smoothing_alpha))
+ self._smoothed: float = 0.0
+
+ # ── Properties ────────────────────────────────────────────────────────────
+
+ @property
+ def smoothed_slip(self) -> float:
+ """EMA-smoothed slip ratio."""
+ return self._smoothed
+
+ # ── Update ────────────────────────────────────────────────────────────────
+
+ def update(
+ self,
+ cmd_speed_ms: float,
+ actual_speed_ms: float,
+ ) -> tuple[float, bool]:
+ """
+ Compute slip from one speed pair.
+
+ Parameters
+ ----------
+ cmd_speed_ms : commanded forward speed (m/s); signed
+ actual_speed_ms : measured forward speed (m/s); signed (from odom)
+
+ Returns
+ -------
+ (slip_ratio, is_slipping)
+ slip_ratio : 0.0 – 1.0 (instantaneous, unsmoothed)
+ is_slipping : True if smoothed slip exceeds threshold
+ """
+ abs_cmd = abs(cmd_speed_ms)
+
+ if abs_cmd < self._min_speed:
+ # Decay smoothed slip toward zero when not commanding movement
+ self._smoothed = (1.0 - self._alpha) * self._smoothed
+ return 0.0, False
+
+ abs_actual = abs(actual_speed_ms)
+ raw_slip = max(0.0, min(1.0, (abs_cmd - abs_actual) / abs_cmd))
+
+ self._smoothed = (1.0 - self._alpha) * self._smoothed + self._alpha * raw_slip
+
+ return raw_slip, self._smoothed > self._threshold
+
+ def reset(self) -> None:
+ self._smoothed = 0.0
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_adaptation_node.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_adaptation_node.py
new file mode 100644
index 0000000..3396318
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_adaptation_node.py
@@ -0,0 +1,346 @@
+"""
+terrain_adaptation_node.py — Surface detection and dynamic adaptation (Issue #142).
+
+Overview
+────────
+ Analyses IMU vibration to classify the terrain underfoot, detects incline
+ and wheel slip, then publishes advisory outputs that other nodes use to
+ adapt speed limits, PID aggressiveness, and motor bias.
+
+Pipeline (50 Hz)
+────────────────
+ 1. IMU acc → TerrainAnalyzer → roughness, surface_type, grip
+ 2. IMU pitch (quaternion) → InclineDetector → incline_rad
+ 3. cmd_vel + odom → SlipDetector → slip_ratio
+ 4. (roughness, incline, grip, slip) → TerrainAdapter → speed_scale,
+ pid_scales, motor_bias
+ 5. Publish Terrain msg + advisory scalars + history log
+
+Subscribes
+──────────
+ /saltybot/imu sensor_msgs/Imu — vibration + pitch
+ nav_msgs/Odometry — actual speed for slip
+ /cmd_vel geometry_msgs/Twist — commanded speed for slip
+
+Publishes
+─────────
+ /saltybot/terrain saltybot_terrain_msgs/Terrain — full terrain state
+ /saltybot/terrain_speed_scale std_msgs/Float32 — speed multiplier
+ /saltybot/terrain_pid_scales std_msgs/String (JSON) — kp/ki/kd scales
+ /saltybot/terrain_motor_bias std_msgs/Float32 — incline compensation
+ /saltybot/terrain_history std_msgs/String (JSON) — change log
+
+Parameters
+──────────
+ control_rate 50.0 Hz
+ odom_topic /saltybot/rover_odom
+ vibration_window_s 0.5 s (RMS window)
+ roughness_max_ms2 3.0 m/s² (normalisation ceiling)
+ hysteresis_count 5 samples before class change
+ incline_lpf_tau_s 1.5 s (incline low-pass time constant)
+ max_incline_rad 0.52 rad (≈30°)
+ slip_threshold 0.30 (slip flag threshold)
+ slip_min_speed_ms 0.15 m/s (ignore slip below this speed)
+ slip_smoothing_alpha 0.20
+ k_bias 1.0 (incline motor bias gain)
+ min_speed_scale 0.15 (speed_scale floor)
+ history_max_entries 200
+ imu_timeout_s 0.5 s (zero outputs if IMU goes silent)
+"""
+
+import json
+import math
+import time
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
+
+from geometry_msgs.msg import Twist
+from nav_msgs.msg import Odometry
+from sensor_msgs.msg import Imu
+from std_msgs.msg import Float32, String
+
+from saltybot_terrain_adaptation.terrain_analyzer import TerrainAnalyzer
+from saltybot_terrain_adaptation.slip_detector import SlipDetector
+from saltybot_terrain_adaptation.incline_detector import InclineDetector
+from saltybot_terrain_adaptation.terrain_adapter import (
+ TerrainAdapter,
+ TerrainHistory,
+ TerrainState,
+)
+
+try:
+ from saltybot_terrain_msgs.msg import Terrain as TerrainMsg
+ _TERRAIN_MSG_OK = True
+except ImportError:
+ _TERRAIN_MSG_OK = False
+
+
+def _quaternion_to_pitch(qx: float, qy: float, qz: float, qw: float) -> float:
+ sinp = 2.0 * (qw * qy - qz * qx)
+ return math.asin(max(-1.0, min(1.0, sinp)))
+
+
+class TerrainAdaptationNode(Node):
+
+ def __init__(self):
+ super().__init__("terrain_adaptation")
+
+ # ── Parameters ───────────────────────────────────────────────────────
+ self._declare_params()
+ p = self._load_params()
+
+ # ── Sub-systems ──────────────────────────────────────────────────────
+ self._analyzer = TerrainAnalyzer(
+ max_variance_ms2=p["roughness_max_ms2"],
+ window_s=p["vibration_window_s"],
+ control_rate_hz=p["control_rate"],
+ hysteresis_count=p["hysteresis_count"],
+ )
+ self._incline = InclineDetector(
+ lpf_tau_s=p["incline_lpf_tau_s"],
+ control_rate_hz=p["control_rate"],
+ max_incline_rad=p["max_incline_rad"],
+ )
+ self._slip = SlipDetector(
+ slip_threshold=p["slip_threshold"],
+ min_speed_ms=p["slip_min_speed_ms"],
+ smoothing_alpha=p["slip_smoothing_alpha"],
+ )
+ self._adapter = TerrainAdapter(
+ max_incline_rad=p["max_incline_rad"],
+ min_speed_scale=p["min_speed_scale"],
+ )
+ self._history = TerrainHistory(max_entries=p["history_max_entries"])
+
+ # ── State ────────────────────────────────────────────────────────────
+ self._latest_imu: Imu | None = None
+ self._actual_speed_ms: float = 0.0
+ self._cmd_speed_ms: float = 0.0
+ self._last_imu_t: float = 0.0
+ self._last_ctrl_t: float = time.monotonic()
+
+ # ── QoS ──────────────────────────────────────────────────────────────
+ reliable = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=10,
+ )
+ best_effort = QoSProfile(
+ reliability=ReliabilityPolicy.BEST_EFFORT,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=1,
+ )
+
+ # ── Subscriptions ────────────────────────────────────────────────────
+ self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort)
+ self.create_subscription(Odometry, p["odom_topic"], self._odom_cb, reliable)
+ self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable)
+
+ # ── Publishers ───────────────────────────────────────────────────────
+ self._terrain_pub = self.create_publisher(String, "/saltybot/terrain", reliable)
+ self._speed_scale_pub = self.create_publisher(Float32, "/saltybot/terrain_speed_scale", reliable)
+ self._pid_scales_pub = self.create_publisher(String, "/saltybot/terrain_pid_scales", reliable)
+ self._motor_bias_pub = self.create_publisher(Float32, "/saltybot/terrain_motor_bias", reliable)
+ self._history_pub = self.create_publisher(String, "/saltybot/terrain_history", reliable)
+
+ # Try to create a typed Terrain publisher if message is available
+ self._typed_terrain_pub = None
+ if _TERRAIN_MSG_OK:
+ self._typed_terrain_pub = self.create_publisher(
+ TerrainMsg, "/saltybot/terrain_typed", reliable
+ )
+
+ # ── Timer ────────────────────────────────────────────────────────────
+ rate = p["control_rate"]
+ self._timer = self.create_timer(1.0 / rate, self._control_cb)
+
+ self.get_logger().info(
+ f"TerrainAdaptationNode ready rate={rate}Hz "
+ f"odom={p['odom_topic']}"
+ )
+
+ # ── Parameter management ─────────────────────────────────────────────────
+
+ def _declare_params(self) -> None:
+ self.declare_parameter("control_rate", 50.0)
+ self.declare_parameter("odom_topic", "/saltybot/rover_odom")
+ self.declare_parameter("vibration_window_s", 0.5)
+ self.declare_parameter("roughness_max_ms2", 3.0)
+ self.declare_parameter("hysteresis_count", 5)
+ self.declare_parameter("incline_lpf_tau_s", 1.5)
+ self.declare_parameter("max_incline_rad", 0.52)
+ self.declare_parameter("slip_threshold", 0.30)
+ self.declare_parameter("slip_min_speed_ms", 0.15)
+ self.declare_parameter("slip_smoothing_alpha", 0.20)
+ self.declare_parameter("k_bias", 1.0)
+ self.declare_parameter("min_speed_scale", 0.15)
+ self.declare_parameter("history_max_entries", 200)
+ self.declare_parameter("imu_timeout_s", 0.5)
+
+ def _load_params(self) -> dict:
+ g = self.get_parameter
+ return {k: g(k).value for k in [
+ "control_rate", "odom_topic",
+ "vibration_window_s", "roughness_max_ms2", "hysteresis_count",
+ "incline_lpf_tau_s", "max_incline_rad",
+ "slip_threshold", "slip_min_speed_ms", "slip_smoothing_alpha",
+ "k_bias", "min_speed_scale", "history_max_entries", "imu_timeout_s",
+ ]}
+
+ # ── Subscriptions ─────────────────────────────────────────────────────────
+
+ def _imu_cb(self, msg: Imu) -> None:
+ self._latest_imu = msg
+ self._last_imu_t = time.monotonic()
+
+ def _odom_cb(self, msg: Odometry) -> None:
+ self._actual_speed_ms = msg.twist.twist.linear.x
+
+ def _cmd_vel_cb(self, msg: Twist) -> None:
+ self._cmd_speed_ms = msg.linear.x
+
+ # ── 50 Hz control loop ────────────────────────────────────────────────────
+
+ def _control_cb(self) -> None:
+ p = self._load_params()
+ now = time.monotonic()
+ dt = now - self._last_ctrl_t
+ self._last_ctrl_t = now
+
+ # IMU watchdog
+ imu_age = (now - self._last_imu_t) if self._last_imu_t > 0.0 else 1e9
+ if imu_age > p["imu_timeout_s"] or self._latest_imu is None:
+ self._publish_safe_defaults()
+ return
+
+ imu = self._latest_imu
+ ax = imu.linear_acceleration.x
+ ay = imu.linear_acceleration.y
+ az = imu.linear_acceleration.z
+ q = imu.orientation
+
+ # 1. Vibration analysis
+ analysis = self._analyzer.update(ax, ay, az)
+ roughness = analysis.roughness if analysis else 0.0
+ surf_type = analysis.surface_type if analysis else "unknown"
+ grip = analysis.grip if analysis else 0.6
+
+ # 2. Incline detection
+ pitch = _quaternion_to_pitch(q.x, q.y, q.z, q.w)
+ incline = self._incline.update(pitch, dt)
+
+ # 3. Slip detection
+ slip_ratio, is_slipping = self._slip.update(
+ self._cmd_speed_ms, self._actual_speed_ms
+ )
+
+ # 4. Build terrain state
+ raw_state = TerrainState(
+ surface_type=surf_type,
+ roughness=roughness,
+ incline_rad=incline,
+ grip=grip,
+ slip_ratio=slip_ratio,
+ timestamp_s=now,
+ )
+ state = self._adapter.apply(raw_state)
+
+ # 5. Terrain history + logging
+ type_changed = self._history.add(state)
+ if type_changed:
+ self.get_logger().info(
+ f"Terrain: {surf_type} rough={roughness:.2f} "
+ f"grip={grip:.2f} incline={math.degrees(incline):.1f}° "
+ f"speed_scale={state.speed_scale:.2f}"
+ )
+ if is_slipping:
+ self.get_logger().warn(
+ f"Wheel slip detected: ratio={slip_ratio:.2f} "
+ f"cmd={self._cmd_speed_ms:.2f}m/s actual={self._actual_speed_ms:.2f}m/s"
+ )
+
+ # 6. Publish
+ self._publish_terrain_json(state)
+ self._publish_speed_scale(state.speed_scale)
+ self._publish_pid_scales(self._adapter.pid_scales(state))
+ self._publish_motor_bias(
+ self._adapter.motor_bias(state, k_bias=p["k_bias"])
+ )
+ # Publish history on every type change (not every tick)
+ if type_changed:
+ self._publish_history()
+
+ # Typed message (if saltybot_terrain_msgs is built)
+ if self._typed_terrain_pub is not None and _TERRAIN_MSG_OK:
+ self._publish_typed(state)
+
+ # ── Publishers ────────────────────────────────────────────────────────────
+
+ def _publish_safe_defaults(self) -> None:
+ self._publish_speed_scale(1.0)
+ self._publish_pid_scales({"kp": 1.0, "ki": 1.0, "kd": 1.0})
+ self._publish_motor_bias(0.0)
+
+ def _publish_terrain_json(self, state: TerrainState) -> None:
+ msg = String()
+ msg.data = json.dumps({
+ "surface_type": state.surface_type,
+ "roughness": round(state.roughness, 3),
+ "incline_rad": round(state.incline_rad, 4),
+ "incline_deg": round(math.degrees(state.incline_rad), 2),
+ "grip": round(state.grip, 3),
+ "slip_ratio": round(state.slip_ratio, 3),
+ "speed_scale": round(state.speed_scale, 3),
+ })
+ self._terrain_pub.publish(msg)
+
+ def _publish_speed_scale(self, scale: float) -> None:
+ msg = Float32()
+ msg.data = float(scale)
+ self._speed_scale_pub.publish(msg)
+
+ def _publish_pid_scales(self, scales: dict) -> None:
+ msg = String()
+ msg.data = json.dumps({k: round(v, 4) for k, v in scales.items()})
+ self._pid_scales_pub.publish(msg)
+
+ def _publish_motor_bias(self, bias: float) -> None:
+ msg = Float32()
+ msg.data = float(bias)
+ self._motor_bias_pub.publish(msg)
+
+ def _publish_history(self) -> None:
+ msg = String()
+ msg.data = json.dumps(self._history.to_json_log())
+ self._history_pub.publish(msg)
+
+ def _publish_typed(self, state: TerrainState) -> None:
+ msg = TerrainMsg()
+ msg.stamp = self.get_clock().now().to_msg()
+ msg.surface_type = state.surface_type
+ msg.roughness = float(state.roughness)
+ msg.incline_rad = float(state.incline_rad)
+ msg.grip = float(state.grip)
+ msg.slip_ratio = float(state.slip_ratio)
+ msg.speed_scale = float(state.speed_scale)
+ self._typed_terrain_pub.publish(msg)
+
+
+# ── Entry point ───────────────────────────────────────────────────────────────
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = TerrainAdaptationNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.try_shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_adapter.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_adapter.py
new file mode 100644
index 0000000..1fd2110
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_adapter.py
@@ -0,0 +1,195 @@
+"""
+terrain_adapter.py — Speed / PID / bias adaptation from terrain state (Issue #142).
+
+Provides:
+ TerrainState — typed snapshot of current terrain conditions
+ TerrainAdapter — computes speed_scale, pid_scales, motor_bias
+ TerrainHistory — rolling deque of TerrainState, JSON export for route planning
+
+Speed scale model
+─────────────────
+ speed_scale = roughness_scale × incline_scale × slip_scale
+
+ roughness_scale = 1.0 − 0.6 × roughness (0.4 at max roughness)
+ incline_scale = 1.0 − 0.5 × |incline| / max_incline (0.5 at max incline)
+ slip_scale = 1.0 − 0.8 × slip_ratio (0.2 at full slip)
+
+ Result clamped to [min_speed_scale, 1.0].
+
+PID scale model (grip-based aggressiveness)
+──────────────────────────────────────────
+ kp_scale = 0.70 + 0.30 × grip (0.70 at no grip, 1.00 at full grip)
+ ki_scale = 0.50 + 0.50 × grip (0.50 at no grip, 1.00 at full grip)
+ kd_scale = 0.80 + 0.20 × grip (0.80 at no grip, 1.00 at full grip)
+
+ On slippery surfaces, reduced P/I prevents oscillation from un-predicted
+ wheel slip. D is kept relatively high to damp any vibration.
+
+Motor bias
+──────────
+ See incline_detector.motor_bias() — terrain_adapter re-exports it.
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+import math
+import time
+from collections import deque
+from dataclasses import dataclass, field
+from typing import Optional
+
+
+# ── TerrainState ─────────────────────────────────────────────────────────────
+
+@dataclass
+class TerrainState:
+ surface_type: str = "unknown" # "smooth"|"tile"|"carpet"|"grass"|"gravel"|"unknown"
+ roughness: float = 0.0 # 0.0–1.0
+ incline_rad: float = 0.0 # rad; +ve = uphill
+ grip: float = 0.6 # 0.0–1.0
+ slip_ratio: float = 0.0 # 0.0–1.0
+ speed_scale: float = 1.0 # recommended speed multiplier
+ timestamp_s: float = field(default_factory=time.monotonic)
+
+
+# ── TerrainAdapter ────────────────────────────────────────────────────────────
+
+class TerrainAdapter:
+ """
+ Computes adaptive outputs (speed scale, PID scales, motor bias) from a
+ TerrainState.
+
+ Parameters
+ ----------
+ max_incline_rad : incline at which incline_scale reaches minimum
+ min_speed_scale : floor for speed_scale (never zero)
+ """
+
+ def __init__(
+ self,
+ max_incline_rad: float = 0.52, # ≈ 30°
+ min_speed_scale: float = 0.15,
+ ):
+ self._max_incline = max(0.01, max_incline_rad)
+ self._min_scale = max(0.05, min_speed_scale)
+
+ # ── Speed scale ───────────────────────────────────────────────────────────
+
+ def speed_scale(self, state: TerrainState) -> float:
+ """
+ Combined speed-limit multiplier. Returns value in [min_speed_scale, 1.0].
+ """
+ r_scale = 1.0 - 0.6 * state.roughness
+ i_scale = 1.0 - 0.5 * min(1.0, abs(state.incline_rad) / self._max_incline)
+ s_scale = 1.0 - 0.8 * state.slip_ratio
+
+ combined = r_scale * i_scale * s_scale
+ return max(self._min_scale, min(1.0, combined))
+
+ # ── PID scales ────────────────────────────────────────────────────────────
+
+ def pid_scales(self, state: TerrainState) -> dict[str, float]:
+ """
+ Gain multipliers keyed "kp", "ki", "kd". All in (0, 1].
+ Multiply these against the base PID gains from adaptive_pid_node.
+ """
+ grip = max(0.0, min(1.0, state.grip))
+ return {
+ "kp": 0.70 + 0.30 * grip,
+ "ki": 0.50 + 0.50 * grip,
+ "kd": 0.80 + 0.20 * grip,
+ }
+
+ # ── Motor bias ────────────────────────────────────────────────────────────
+
+ @staticmethod
+ def motor_bias(state: TerrainState, k_bias: float = 1.0) -> float:
+ """
+ Forward motor bias (m/s) for incline compensation.
+ Positive = push forward on uphill.
+ """
+ return k_bias * math.sin(state.incline_rad)
+
+ # ── Full state update ─────────────────────────────────────────────────────
+
+ def apply(self, state: TerrainState) -> TerrainState:
+ """Return a copy of state with speed_scale populated."""
+ return TerrainState(
+ surface_type=state.surface_type,
+ roughness=state.roughness,
+ incline_rad=state.incline_rad,
+ grip=state.grip,
+ slip_ratio=state.slip_ratio,
+ speed_scale=self.speed_scale(state),
+ timestamp_s=state.timestamp_s,
+ )
+
+
+# ── TerrainHistory ────────────────────────────────────────────────────────────
+
+class TerrainHistory:
+ """
+ Rolling log of TerrainState snapshots for route planning.
+
+ Tracks surface-type change events and exports them as a JSON-serialisable
+ list. The full sample buffer is kept for statistical queries.
+
+ Parameters
+ ----------
+ max_entries : maximum samples kept (oldest discarded)
+ """
+
+ def __init__(self, max_entries: int = 200):
+ self._entries: deque = deque(maxlen=max(1, max_entries))
+
+ @property
+ def entries(self) -> list[TerrainState]:
+ return list(self._entries)
+
+ @property
+ def current(self) -> Optional[TerrainState]:
+ return self._entries[-1] if self._entries else None
+
+ def add(self, state: TerrainState) -> bool:
+ """
+ Append a state snapshot.
+
+ Returns True if the surface type changed from the previous entry
+ (useful for callers that want to log only type-change events).
+ """
+ changed = (
+ not self._entries
+ or self._entries[-1].surface_type != state.surface_type
+ )
+ self._entries.append(state)
+ return changed
+
+ def to_json_log(self) -> list[dict]:
+ """
+ Return a list of surface-type change events (deduped by type).
+ Suitable for route planning or structured logging.
+
+ Each entry:
+ {"surface": str, "roughness": float, "incline_rad": float,
+ "grip": float, "slip_ratio": float, "timestamp_s": float}
+ """
+ events: list[dict] = []
+ prev_type: Optional[str] = None
+ for s in self._entries:
+ if s.surface_type != prev_type:
+ events.append({
+ "surface": s.surface_type,
+ "roughness": round(s.roughness, 3),
+ "incline_rad": round(s.incline_rad, 4),
+ "grip": round(s.grip, 3),
+ "slip_ratio": round(s.slip_ratio, 3),
+ "speed_scale": round(s.speed_scale, 3),
+ "timestamp_s": round(s.timestamp_s, 2),
+ })
+ prev_type = s.surface_type
+ return events
+
+ def clear(self) -> None:
+ self._entries.clear()
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_analyzer.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_analyzer.py
new file mode 100644
index 0000000..00182f2
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/saltybot_terrain_adaptation/terrain_analyzer.py
@@ -0,0 +1,191 @@
+"""
+terrain_analyzer.py — IMU vibration analysis for surface classification (Issue #142).
+
+Algorithm
+─────────
+ For each control tick, one IMU linear-acceleration triple (ax, ay, az) is
+ submitted to a rolling window. The RMS vibration is computed as:
+
+ vibration_rms = sqrt( Var(ax) + Var(ay) + Var(az) ) [m/s²]
+
+ Variance naturally removes the DC gravity component, so the signal
+ captures only dynamic surface vibrations.
+
+ Normalised roughness:
+ roughness = min(1.0, vibration_rms / max_variance_ms2)
+
+ Surface classification thresholds (roughness boundaries):
+ smooth [0.00, 0.12) grip = 0.90
+ tile [0.12, 0.28) grip = 0.85
+ carpet [0.28, 0.48) grip = 0.75
+ grass [0.48, 0.68) grip = 0.55
+ gravel [0.68, 1.00] grip = 0.45
+
+ Hysteresis prevents rapid class switching: the candidate class must persist
+ for `hysteresis_count` consecutive samples before becoming current.
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+import math
+from collections import deque
+from dataclasses import dataclass
+from typing import Optional
+
+
+# ── Surface classification table ─────────────────────────────────────────────
+# (roughness_upper_bound, surface_type, grip_coefficient)
+# Ordered from smoothest to roughest; last entry has no upper bound.
+_SURFACE_TABLE: list[tuple[float, str, float]] = [
+ (0.12, "smooth", 0.90),
+ (0.28, "tile", 0.85),
+ (0.48, "carpet", 0.75),
+ (0.68, "grass", 0.55),
+ (1.01, "gravel", 0.45),
+]
+
+SURFACE_TYPES: tuple[str, ...] = tuple(e[1] for e in _SURFACE_TABLE)
+
+
+# ── SurfaceAnalysis ───────────────────────────────────────────────────────────
+
+@dataclass
+class SurfaceAnalysis:
+ roughness: float # 0.0 (smooth) – 1.0 (rough), normalised
+ surface_type: str # "smooth" | "tile" | "carpet" | "grass" | "gravel" | "unknown"
+ grip: float # 0.0 (no grip) – 1.0 (full grip)
+
+
+# ── TerrainAnalyzer ───────────────────────────────────────────────────────────
+
+class TerrainAnalyzer:
+ """
+ Rolling-window vibration analyser with hysteresis-protected classification.
+
+ Parameters
+ ----------
+ max_variance_ms2 : normalisation ceiling (m/s²); maps to roughness=1.0
+ window_s : rolling window duration (s)
+ control_rate_hz : expected update rate (Hz) — sets window size
+ hysteresis_count : consecutive samples in new class before switching
+ """
+
+ def __init__(
+ self,
+ max_variance_ms2: float = 3.0,
+ window_s: float = 0.5,
+ control_rate_hz: float = 50.0,
+ hysteresis_count: int = 5,
+ ):
+ self._max_var = max(1e-3, max_variance_ms2)
+ window_size = max(5, int(window_s * control_rate_hz))
+ self._samples: deque = deque(maxlen=window_size)
+ self._hysteresis_count = hysteresis_count
+
+ self._current_type: str = "unknown"
+ self._current_grip: float = 0.6
+ self._candidate_type: str = "unknown"
+ self._candidate_count: int = 0
+
+ # ── Properties ────────────────────────────────────────────────────────────
+
+ @property
+ def current_surface_type(self) -> str:
+ return self._current_type
+
+ @property
+ def current_grip(self) -> float:
+ return self._current_grip
+
+ @property
+ def sample_count(self) -> int:
+ return len(self._samples)
+
+ # ── Update ────────────────────────────────────────────────────────────────
+
+ def update(
+ self,
+ ax: float,
+ ay: float,
+ az: float,
+ ) -> Optional[SurfaceAnalysis]:
+ """
+ Submit one IMU acceleration sample (m/s²) and return analysis.
+
+ Returns
+ -------
+ SurfaceAnalysis when ≥ 5 samples have been collected; None otherwise.
+ """
+ self._samples.append((ax, ay, az))
+
+ if len(self._samples) < 5:
+ return None
+
+ roughness = self._compute_roughness()
+ candidate_type, candidate_grip = _classify(roughness)
+
+ # Hysteresis: require sustained presence in new class
+ if candidate_type == self._candidate_type:
+ self._candidate_count += 1
+ else:
+ self._candidate_type = candidate_type
+ self._candidate_count = 1
+
+ if self._candidate_count >= self._hysteresis_count:
+ self._current_type = candidate_type
+ self._current_grip = candidate_grip
+
+ return SurfaceAnalysis(
+ roughness=roughness,
+ surface_type=self._current_type,
+ grip=self._current_grip,
+ )
+
+ def reset(self) -> None:
+ self._samples.clear()
+ self._current_type = "unknown"
+ self._current_grip = 0.6
+ self._candidate_type = "unknown"
+ self._candidate_count = 0
+
+ # ── Internal ─────────────────────────────────────────────────────────────
+
+ def _compute_roughness(self) -> float:
+ n = len(self._samples)
+ if n < 2:
+ return 0.0
+
+ ax_vals = [s[0] for s in self._samples]
+ ay_vals = [s[1] for s in self._samples]
+ az_vals = [s[2] for s in self._samples]
+
+ rms = math.sqrt(_variance(ax_vals) + _variance(ay_vals) + _variance(az_vals))
+ return min(1.0, rms / self._max_var)
+
+
+# ── Module-level helpers ──────────────────────────────────────────────────────
+
+def _variance(vals: list[float]) -> float:
+ n = len(vals)
+ if n < 2:
+ return 0.0
+ mean = sum(vals) / n
+ return sum((v - mean) ** 2 for v in vals) / n
+
+
+def _classify(roughness: float) -> tuple[str, float]:
+ """Map normalised roughness to (surface_type, grip)."""
+ for upper, stype, grip in _SURFACE_TABLE:
+ if roughness < upper:
+ return stype, grip
+ return "gravel", 0.45
+
+
+def grip_for_surface(surface_type: str) -> float:
+ """Return grip coefficient for a given surface type string."""
+ for _, stype, grip in _SURFACE_TABLE:
+ if stype == surface_type:
+ return grip
+ return 0.6 # unknown
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.cfg b/jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.cfg
new file mode 100644
index 0000000..64ce754
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.cfg
@@ -0,0 +1,5 @@
+[develop]
+script_dir=$base/lib/saltybot_terrain_adaptation
+
+[install]
+install_scripts=$base/lib/saltybot_terrain_adaptation
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.py
new file mode 100644
index 0000000..a5181f0
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/setup.py
@@ -0,0 +1,32 @@
+from setuptools import setup, find_packages
+import os
+from glob import glob
+
+package_name = "saltybot_terrain_adaptation"
+
+setup(
+ name=package_name,
+ version="0.1.0",
+ packages=find_packages(exclude=["test"]),
+ data_files=[
+ ("share/ament_index/resource_index/packages",
+ [f"resource/{package_name}"]),
+ (f"share/{package_name}", ["package.xml"]),
+ (os.path.join("share", package_name, "config"),
+ glob("config/*.yaml")),
+ (os.path.join("share", package_name, "launch"),
+ glob("launch/*.py")),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="sl-controls",
+ maintainer_email="sl-controls@saltylab.local",
+ description="Surface detection and dynamic terrain adaptation for SaltyBot",
+ license="MIT",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ f"terrain_adaptation_node = {package_name}.terrain_adaptation_node:main",
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_terrain_adaptation/test/test_terrain_adaptation.py b/jetson/ros2_ws/src/saltybot_terrain_adaptation/test/test_terrain_adaptation.py
new file mode 100644
index 0000000..fb85634
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_adaptation/test/test_terrain_adaptation.py
@@ -0,0 +1,618 @@
+"""
+test_terrain_adaptation.py — Unit tests for Issue #142 terrain adaptation modules.
+
+Covers:
+ TerrainAnalyzer — roughness computation, surface classification, hysteresis
+ SlipDetector — slip ratio, EMA smoothing, speed gate
+ InclineDetector — LPF convergence, seeding, motor_bias
+ TerrainAdapter — speed_scale formula, pid_scales, motor_bias, apply()
+ TerrainHistory — add(), type-change detection, to_json_log(), clear()
+"""
+
+import math
+import sys
+import os
+
+# Allow direct pytest run without installing the package
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+import pytest
+
+from saltybot_terrain_adaptation.terrain_analyzer import TerrainAnalyzer, SurfaceAnalysis
+from saltybot_terrain_adaptation.slip_detector import SlipDetector
+from saltybot_terrain_adaptation.incline_detector import InclineDetector
+from saltybot_terrain_adaptation.terrain_adapter import (
+ TerrainAdapter,
+ TerrainHistory,
+ TerrainState,
+)
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _make_analyzer(**kw):
+ defaults = dict(
+ max_variance_ms2=3.0,
+ window_s=0.5,
+ control_rate_hz=50.0,
+ hysteresis_count=5,
+ )
+ defaults.update(kw)
+ return TerrainAnalyzer(**defaults)
+
+
+def _make_slip(**kw):
+ defaults = dict(slip_threshold=0.30, min_speed_ms=0.15, smoothing_alpha=1.0)
+ defaults.update(kw)
+ return SlipDetector(**defaults)
+
+
+def _make_incline(**kw):
+ defaults = dict(lpf_tau_s=1.0, control_rate_hz=50.0, max_incline_rad=0.52)
+ defaults.update(kw)
+ return InclineDetector(**defaults)
+
+
+def _make_adapter(**kw):
+ defaults = dict(max_incline_rad=0.52, min_speed_scale=0.15)
+ defaults.update(kw)
+ return TerrainAdapter(**defaults)
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# TerrainAnalyzer
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestTerrainAnalyzerBuffer:
+
+ def test_returns_none_until_min_samples(self):
+ """Should return None until the minimum sample count (5) is reached."""
+ analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
+ # Minimum is 5 samples; feed 4 → still None
+ for _ in range(4):
+ result = analyzer.update(0.0, 0.0, 9.81)
+ assert result is None
+
+ def test_returns_result_at_window_full(self):
+ """Exactly at window size, a SurfaceAnalysis must be returned."""
+ analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
+ result = None
+ for _ in range(25):
+ result = analyzer.update(0.0, 0.0, 9.81)
+ assert result is not None
+ assert isinstance(result, SurfaceAnalysis)
+
+ def test_reset_clears_buffer(self):
+ """After reset, the buffer requires refilling (min 5 samples)."""
+ analyzer = _make_analyzer(window_s=0.5, control_rate_hz=50.0)
+ for _ in range(25):
+ analyzer.update(0.0, 0.0, 9.81)
+ analyzer.reset()
+ # Feed only 4 samples → still None
+ for _ in range(4):
+ result = analyzer.update(0.0, 0.0, 9.81)
+ assert result is None
+
+
+class TestTerrainAnalyzerRoughness:
+
+ def _fill(self, analyzer, ax, ay, az, n=None):
+ window = 25 # 0.5s × 50Hz
+ n = n or window
+ result = None
+ for _ in range(n):
+ result = analyzer.update(ax, ay, az)
+ return result
+
+ def test_zero_vibration_gives_zero_roughness(self):
+ """Constant acceleration → zero variance → roughness = 0."""
+ analyzer = _make_analyzer()
+ result = self._fill(analyzer, 0.0, 0.0, 9.81)
+ assert result is not None
+ assert result.roughness == pytest.approx(0.0, abs=1e-9)
+
+ def test_roughness_increases_with_vibration(self):
+ """Higher vibration amplitude → higher roughness score."""
+ analyzer_low = _make_analyzer()
+ analyzer_high = _make_analyzer()
+
+ window = 25
+ import math as _math
+ for i in range(window):
+ t = i * 0.02
+ analyzer_low.update( 0.5 * _math.sin(t * 10), 0.0, 9.81)
+ analyzer_high.update(2.5 * _math.sin(t * 10), 0.0, 9.81)
+
+ r_low = analyzer_low.update( 0.5 * _math.sin(window * 0.02 * 10), 0.0, 9.81)
+ r_high = analyzer_high.update(2.5 * _math.sin(window * 0.02 * 10), 0.0, 9.81)
+
+ # If still None, pump one more sample
+ if r_low is None or r_high is None:
+ for i in range(window + 1):
+ t = i * 0.02
+ r_low = analyzer_low.update( 0.5 * _math.sin(t * 10), 0.0, 9.81)
+ r_high = analyzer_high.update(2.5 * _math.sin(t * 10), 0.0, 9.81)
+
+ assert r_low is not None and r_high is not None
+ assert r_high.roughness > r_low.roughness
+
+ def test_roughness_clamped_to_one(self):
+ """Extreme vibration must not exceed 1.0."""
+ analyzer = _make_analyzer(max_variance_ms2=1.0)
+ window = 26
+ result = None
+ for i in range(window):
+ result = analyzer.update(100.0, 100.0, 100.0)
+ assert result is not None
+ assert result.roughness <= 1.0
+
+
+class TestTerrainAnalyzerClassification:
+
+ def _steady(self, analyzer, roughness_target, max_variance_ms2=3.0):
+ """
+ Pump enough samples so that roughness_target produces a stable result.
+ We inject a DC-free vibration scaled to achieve the desired roughness.
+
+ roughness = sqrt(Var(ax)) / max_variance_ms2
+ For a pure sine with amplitude A: Var ≈ A² / 2
+ → A = roughness_target × sqrt(2) × max_variance_ms2
+ """
+ import math as _math
+ amp = roughness_target * _math.sqrt(2.0) * max_variance_ms2
+ result = None
+ for i in range(60): # well beyond window + hysteresis
+ t = i * 0.02
+ result = analyzer.update(amp * _math.sin(t * 20), 0.0, 9.81)
+ return result
+
+ def test_smooth_surface_classification(self):
+ """Very low vibration → 'smooth'."""
+ analyzer = _make_analyzer(hysteresis_count=1)
+ result = self._steady(analyzer, roughness_target=0.05)
+ assert result is not None
+ assert result.surface_type == "smooth"
+
+ def test_gravel_surface_classification(self):
+ """High vibration → 'gravel'."""
+ analyzer = _make_analyzer(hysteresis_count=1, max_variance_ms2=1.0)
+ result = None
+ for i in range(60):
+ t = i * 0.02
+ result = analyzer.update(50.0 * math.sin(t * 20), 0.0, 9.81)
+ assert result is not None
+ assert result.surface_type == "gravel"
+
+ def test_hysteresis_prevents_rapid_switch(self):
+ """A single outlier sample should not immediately change classification."""
+ analyzer = _make_analyzer(hysteresis_count=5)
+ # Establish smooth classification
+ for i in range(60):
+ analyzer.update(0.0, 0.0, 9.81)
+ # One gravel-level spike — type should not change immediately
+ result = analyzer.update(50.0, 50.0, 9.81)
+ if result is not None:
+ # The type may already have changed if window shifts, but mostly stays
+ pass # hysteresis behaviour depends on prior window state — test the counter path
+
+ def test_grip_within_bounds(self):
+ """Grip must always be in [0, 1]."""
+ analyzer = _make_analyzer(hysteresis_count=1)
+ result = self._steady(analyzer, roughness_target=0.5)
+ if result is not None:
+ assert 0.0 <= result.grip <= 1.0
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# SlipDetector
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestSlipDetector:
+
+ def test_no_slip_equal_speeds(self):
+ """When actual == cmd, slip_ratio should be 0."""
+ slip = _make_slip()
+ ratio, flag = slip.update(1.0, 1.0)
+ assert ratio == pytest.approx(0.0, abs=1e-9)
+ assert flag is False
+
+ def test_full_slip_zero_actual(self):
+ """Actual speed = 0 → slip_ratio = 1.0."""
+ slip = _make_slip(smoothing_alpha=1.0)
+ ratio, flag = slip.update(1.0, 0.0)
+ assert ratio == pytest.approx(1.0, abs=1e-6)
+
+ def test_partial_slip(self):
+ """Actual half of cmd → slip_ratio ≈ 0.5."""
+ slip = _make_slip(smoothing_alpha=1.0)
+ ratio, _ = slip.update(1.0, 0.5)
+ assert ratio == pytest.approx(0.5, abs=1e-6)
+
+ def test_slip_flag_above_threshold(self):
+ """slip_ratio > threshold → is_slipping = True."""
+ slip = _make_slip(slip_threshold=0.30, smoothing_alpha=1.0)
+ _, flag = slip.update(1.0, 0.0) # ratio = 1.0 > 0.30
+ assert flag is True
+
+ def test_slip_flag_below_threshold(self):
+ """slip_ratio < threshold → is_slipping = False."""
+ slip = _make_slip(slip_threshold=0.30, smoothing_alpha=1.0)
+ _, flag = slip.update(1.0, 0.95) # ratio ≈ 0.05 < 0.30
+ assert flag is False
+
+ def test_below_min_speed_gate(self):
+ """When |cmd_speed| < min_speed_ms, slip should be suppressed."""
+ slip = _make_slip(min_speed_ms=0.15, smoothing_alpha=1.0)
+ ratio, flag = slip.update(0.05, 0.0) # cmd < min_speed_ms
+ assert ratio == pytest.approx(0.0, abs=1e-9)
+ assert flag is False
+
+ def test_negative_cmd_and_actual_direction_mismatch(self):
+ """Reverse commanded speed; actual = 0 → full slip."""
+ slip = _make_slip(smoothing_alpha=1.0)
+ ratio, flag = slip.update(-1.0, 0.0)
+ assert ratio == pytest.approx(1.0, abs=1e-6)
+ assert flag is True
+
+ def test_ema_smoothing(self):
+ """With alpha < 1, smoothed_slip should increase each step toward 1.0."""
+ slip = _make_slip(slip_threshold=1.0, smoothing_alpha=0.5)
+ # Step 1: raw=1.0; EMA → 0.0 + 0.5*1.0 = 0.5
+ slip.update(1.0, 0.0)
+ smooth1 = slip.smoothed_slip
+ # Step 2: raw=1.0; EMA → 0.5 + 0.5*(1.0 - 0.5) = 0.75
+ slip.update(1.0, 0.0)
+ smooth2 = slip.smoothed_slip
+ assert smooth1 == pytest.approx(0.5, abs=1e-6)
+ assert smooth2 == pytest.approx(0.75, abs=1e-6)
+ assert smooth2 > smooth1
+ assert smooth2 < 1.0
+
+ def test_reset_clears_state(self):
+ """After reset, smoothed_slip should return to zero."""
+ slip = _make_slip(smoothing_alpha=0.5)
+ for _ in range(10):
+ slip.update(1.0, 0.0)
+ assert slip.smoothed_slip > 0.0
+ slip.reset()
+ assert slip.smoothed_slip == pytest.approx(0.0, abs=1e-9)
+ # First step after reset: EMA starts from 0 → alpha × 1.0 = 0.5
+ slip.update(1.0, 0.0)
+ assert slip.smoothed_slip == pytest.approx(0.5, abs=1e-6)
+
+ def test_actual_overspeed_no_negative_slip(self):
+ """Actual faster than commanded → slip_ratio should not go negative."""
+ slip = _make_slip(smoothing_alpha=1.0)
+ ratio, _ = slip.update(0.5, 2.0)
+ assert ratio >= 0.0
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# InclineDetector
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestInclineDetector:
+
+ def test_first_call_seeds_with_measurement(self):
+ """First update should seed the filter with the raw pitch (no transient)."""
+ inc = _make_incline()
+ result = inc.update(0.3, dt=0.02)
+ assert result == pytest.approx(0.3, abs=1e-6)
+
+ def test_converges_to_steady_state(self):
+ """After many steps, filtered incline should be close to constant input."""
+ inc = _make_incline(lpf_tau_s=1.0)
+ for _ in range(200):
+ result = inc.update(0.2, dt=0.02)
+ assert result == pytest.approx(0.2, abs=1e-3)
+
+ def test_lpf_slower_than_input(self):
+ """After a step change, the filter should lag behind the raw value."""
+ inc = _make_incline(lpf_tau_s=2.0)
+ # Start settled at 0
+ inc.update(0.0, dt=0.02)
+ # Step to 1.0
+ result_1st = inc.update(1.0, dt=0.02)
+ # Must be between 0 and 1 (lagged)
+ assert 0.0 < result_1st < 1.0
+
+ def test_clamp_at_max_incline(self):
+ """Output must not exceed max_incline_rad in magnitude."""
+ inc = _make_incline(max_incline_rad=0.52)
+ for _ in range(300):
+ result = inc.update(10.0, dt=0.02)
+ assert result <= 0.52 + 1e-9
+
+ def test_negative_incline(self):
+ """Negative (downhill) pitch should produce negative filtered incline."""
+ inc = _make_incline()
+ for _ in range(200):
+ result = inc.update(-0.2, dt=0.02)
+ assert result == pytest.approx(-0.2, abs=1e-3)
+
+ def test_clamp_negative(self):
+ """Negative incline also clamped by max_incline_rad."""
+ inc = _make_incline(max_incline_rad=0.52)
+ for _ in range(300):
+ result = inc.update(-10.0, dt=0.02)
+ assert result >= -0.52 - 1e-9
+
+ def test_reset_zeroes_state(self):
+ """After reset, filter should reseed on next call."""
+ inc = _make_incline()
+ for _ in range(50):
+ inc.update(0.5, dt=0.02)
+ inc.reset()
+ result = inc.update(0.1, dt=0.02)
+ assert result == pytest.approx(0.1, abs=1e-6)
+
+ def test_motor_bias_static_method(self):
+ """motor_bias = k_bias × sin(incline_rad)."""
+ bias = InclineDetector.motor_bias(math.pi / 6, k_bias=2.0)
+ assert bias == pytest.approx(2.0 * math.sin(math.pi / 6), rel=1e-6)
+
+ def test_motor_bias_zero_on_flat(self):
+ assert InclineDetector.motor_bias(0.0) == pytest.approx(0.0, abs=1e-9)
+
+ def test_motor_bias_negative_downhill(self):
+ """Downhill incline → negative bias."""
+ bias = InclineDetector.motor_bias(-0.3, k_bias=1.0)
+ assert bias < 0.0
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# TerrainAdapter — speed_scale
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestTerrainAdapterSpeedScale:
+
+ def test_flat_smooth_no_slip_gives_one(self):
+ """Ideal conditions → speed_scale = 1.0."""
+ adapter = _make_adapter()
+ state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
+ assert adapter.speed_scale(state) == pytest.approx(1.0, abs=1e-6)
+
+ def test_max_roughness_reduces_scale(self):
+ """roughness=1.0 → roughness_scale=0.4; others=1 → speed_scale=0.4."""
+ adapter = _make_adapter(min_speed_scale=0.0)
+ state = TerrainState(roughness=1.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
+ assert adapter.speed_scale(state) == pytest.approx(0.4, rel=1e-5)
+
+ def test_max_incline_reduces_scale(self):
+ """incline=max_incline → incline_scale=0.5; others=1 → 0.5."""
+ adapter = _make_adapter(min_speed_scale=0.0, max_incline_rad=0.52)
+ state = TerrainState(roughness=0.0, incline_rad=0.52, grip=1.0, slip_ratio=0.0)
+ assert adapter.speed_scale(state) == pytest.approx(0.5, rel=1e-5)
+
+ def test_max_slip_reduces_scale(self):
+ """slip_ratio=1.0 → slip_scale=0.2; others=1 → 0.2."""
+ adapter = _make_adapter(min_speed_scale=0.0)
+ state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=1.0)
+ assert adapter.speed_scale(state) == pytest.approx(0.2, rel=1e-5)
+
+ def test_min_speed_scale_floor(self):
+ """Extreme conditions must not go below min_speed_scale."""
+ adapter = _make_adapter(min_speed_scale=0.15)
+ state = TerrainState(roughness=1.0, incline_rad=0.52, grip=0.0, slip_ratio=1.0)
+ assert adapter.speed_scale(state) >= 0.15 - 1e-9
+
+ def test_scale_always_at_most_one(self):
+ """speed_scale must never exceed 1.0."""
+ adapter = _make_adapter()
+ state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
+ assert adapter.speed_scale(state) <= 1.0 + 1e-9
+
+ def test_negative_incline_same_as_positive(self):
+ """Downhill and uphill of equal magnitude give same scale (abs applied)."""
+ adapter = _make_adapter(min_speed_scale=0.0)
+ up = TerrainState(roughness=0.0, incline_rad= 0.3, grip=1.0, slip_ratio=0.0)
+ down = TerrainState(roughness=0.0, incline_rad=-0.3, grip=1.0, slip_ratio=0.0)
+ assert adapter.speed_scale(up) == pytest.approx(adapter.speed_scale(down), rel=1e-6)
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# TerrainAdapter — pid_scales
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestTerrainAdapterPIDScales:
+
+ def test_full_grip_gives_max_gains(self):
+ """At grip=1.0, all scales should be 1.0."""
+ adapter = _make_adapter()
+ state = TerrainState(grip=1.0)
+ scales = adapter.pid_scales(state)
+ assert scales["kp"] == pytest.approx(1.0, rel=1e-6)
+ assert scales["ki"] == pytest.approx(1.0, rel=1e-6)
+ assert scales["kd"] == pytest.approx(1.0, rel=1e-6)
+
+ def test_zero_grip_gives_min_gains(self):
+ """At grip=0.0: kp=0.70, ki=0.50, kd=0.80."""
+ adapter = _make_adapter()
+ state = TerrainState(grip=0.0)
+ scales = adapter.pid_scales(state)
+ assert scales["kp"] == pytest.approx(0.70, rel=1e-6)
+ assert scales["ki"] == pytest.approx(0.50, rel=1e-6)
+ assert scales["kd"] == pytest.approx(0.80, rel=1e-6)
+
+ def test_partial_grip(self):
+ """At grip=0.5: kp=0.85, ki=0.75, kd=0.90."""
+ adapter = _make_adapter()
+ state = TerrainState(grip=0.5)
+ scales = adapter.pid_scales(state)
+ assert scales["kp"] == pytest.approx(0.85, rel=1e-6)
+ assert scales["ki"] == pytest.approx(0.75, rel=1e-6)
+ assert scales["kd"] == pytest.approx(0.90, rel=1e-6)
+
+ def test_grip_clamped_above_one(self):
+ """Grip > 1.0 must not push scales above 1.0."""
+ adapter = _make_adapter()
+ state = TerrainState(grip=5.0)
+ scales = adapter.pid_scales(state)
+ for v in scales.values():
+ assert v <= 1.0 + 1e-9
+
+ def test_grip_clamped_below_zero(self):
+ """Negative grip must not push scales below their floor values."""
+ adapter = _make_adapter()
+ state = TerrainState(grip=-1.0)
+ scales = adapter.pid_scales(state)
+ assert scales["kp"] >= 0.70 - 1e-9
+ assert scales["ki"] >= 0.50 - 1e-9
+ assert scales["kd"] >= 0.80 - 1e-9
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# TerrainAdapter — motor_bias
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestTerrainAdapterMotorBias:
+
+ def test_flat_zero_bias(self):
+ adapter = _make_adapter()
+ state = TerrainState(incline_rad=0.0)
+ assert TerrainAdapter.motor_bias(state, k_bias=1.0) == pytest.approx(0.0, abs=1e-9)
+
+ def test_uphill_positive_bias(self):
+ adapter = _make_adapter()
+ state = TerrainState(incline_rad=0.3)
+ bias = TerrainAdapter.motor_bias(state, k_bias=1.0)
+ assert bias == pytest.approx(math.sin(0.3), rel=1e-6)
+ assert bias > 0.0
+
+ def test_downhill_negative_bias(self):
+ state = TerrainState(incline_rad=-0.3)
+ bias = TerrainAdapter.motor_bias(state, k_bias=1.0)
+ assert bias < 0.0
+
+ def test_k_bias_scales_output(self):
+ state = TerrainState(incline_rad=0.3)
+ b1 = TerrainAdapter.motor_bias(state, k_bias=1.0)
+ b2 = TerrainAdapter.motor_bias(state, k_bias=2.0)
+ assert b2 == pytest.approx(2.0 * b1, rel=1e-6)
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# TerrainAdapter — apply()
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestTerrainAdapterApply:
+
+ def test_apply_preserves_fields(self):
+ """apply() must copy all fields except speed_scale."""
+ adapter = _make_adapter()
+ state = TerrainState(
+ surface_type="gravel", roughness=0.6, incline_rad=0.1,
+ grip=0.5, slip_ratio=0.1, speed_scale=0.0,
+ )
+ result = adapter.apply(state)
+ assert result.surface_type == "gravel"
+ assert result.roughness == pytest.approx(0.6)
+ assert result.incline_rad == pytest.approx(0.1)
+ assert result.grip == pytest.approx(0.5)
+ assert result.slip_ratio == pytest.approx(0.1)
+
+ def test_apply_populates_speed_scale(self):
+ """apply() should compute and store the correct speed_scale."""
+ adapter = _make_adapter(min_speed_scale=0.0)
+ state = TerrainState(roughness=0.0, incline_rad=0.0, grip=1.0, slip_ratio=0.0)
+ result = adapter.apply(state)
+ assert result.speed_scale == pytest.approx(1.0, abs=1e-6)
+
+ def test_apply_returns_new_object(self):
+ """apply() must return a new TerrainState, not mutate the input."""
+ adapter = _make_adapter()
+ state = TerrainState()
+ result = adapter.apply(state)
+ assert result is not state
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# TerrainHistory
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestTerrainHistory:
+
+ def test_first_add_returns_true(self):
+ """First entry is always a 'change'."""
+ history = TerrainHistory()
+ changed = history.add(TerrainState(surface_type="smooth"))
+ assert changed is True
+
+ def test_same_type_returns_false(self):
+ """Adding the same type twice → second add returns False."""
+ history = TerrainHistory()
+ history.add(TerrainState(surface_type="smooth"))
+ changed = history.add(TerrainState(surface_type="smooth"))
+ assert changed is False
+
+ def test_type_change_returns_true(self):
+ """Adding a different type → returns True."""
+ history = TerrainHistory()
+ history.add(TerrainState(surface_type="smooth"))
+ changed = history.add(TerrainState(surface_type="gravel"))
+ assert changed is True
+
+ def test_entries_property(self):
+ """entries should reflect all added states."""
+ history = TerrainHistory()
+ for t in ["smooth", "gravel", "grass"]:
+ history.add(TerrainState(surface_type=t))
+ assert len(history.entries) == 3
+
+ def test_max_entries_limit(self):
+ """Buffer should not exceed max_entries."""
+ history = TerrainHistory(max_entries=5)
+ for _ in range(20):
+ history.add(TerrainState(surface_type="smooth"))
+ assert len(history.entries) <= 5
+
+ def test_current_property(self):
+ """current should return the most recently added state."""
+ history = TerrainHistory()
+ s1 = TerrainState(surface_type="smooth")
+ s2 = TerrainState(surface_type="gravel")
+ history.add(s1)
+ history.add(s2)
+ assert history.current.surface_type == "gravel"
+
+ def test_current_none_when_empty(self):
+ history = TerrainHistory()
+ assert history.current is None
+
+ def test_clear_empties_buffer(self):
+ history = TerrainHistory()
+ history.add(TerrainState(surface_type="smooth"))
+ history.clear()
+ assert len(history.entries) == 0
+ assert history.current is None
+
+ def test_to_json_log_deduplicates(self):
+ """to_json_log() should emit only type-change events."""
+ history = TerrainHistory()
+ for t in ["smooth", "smooth", "gravel", "gravel", "smooth"]:
+ history.add(TerrainState(surface_type=t))
+ log = history.to_json_log()
+ # Should see: smooth, gravel, smooth → 3 entries
+ assert len(log) == 3
+ assert log[0]["surface"] == "smooth"
+ assert log[1]["surface"] == "gravel"
+ assert log[2]["surface"] == "smooth"
+
+ def test_to_json_log_fields(self):
+ """Each event dict must contain the expected keys."""
+ history = TerrainHistory()
+ history.add(TerrainState(
+ surface_type="tile", roughness=0.25, incline_rad=0.1,
+ grip=0.8, slip_ratio=0.05, speed_scale=0.9,
+ ))
+ log = history.to_json_log()
+ assert len(log) == 1
+ entry = log[0]
+ for key in ("surface", "roughness", "incline_rad", "grip", "slip_ratio",
+ "speed_scale", "timestamp_s"):
+ assert key in entry
+
+ def test_to_json_log_empty(self):
+ """Empty history → empty log."""
+ history = TerrainHistory()
+ assert history.to_json_log() == []
diff --git a/jetson/ros2_ws/src/saltybot_terrain_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_terrain_msgs/CMakeLists.txt
new file mode 100644
index 0000000..f1c6c97
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_msgs/CMakeLists.txt
@@ -0,0 +1,14 @@
+cmake_minimum_required(VERSION 3.8)
+project(saltybot_terrain_msgs)
+
+find_package(ament_cmake REQUIRED)
+find_package(rosidl_default_generators REQUIRED)
+find_package(builtin_interfaces REQUIRED)
+
+rosidl_generate_interfaces(${PROJECT_NAME}
+ "msg/Terrain.msg"
+ DEPENDENCIES builtin_interfaces
+)
+
+ament_export_dependencies(rosidl_default_runtime)
+ament_package()
diff --git a/jetson/ros2_ws/src/saltybot_terrain_msgs/msg/Terrain.msg b/jetson/ros2_ws/src/saltybot_terrain_msgs/msg/Terrain.msg
new file mode 100644
index 0000000..98e38b0
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_msgs/msg/Terrain.msg
@@ -0,0 +1,25 @@
+# Terrain.msg — Surface classification and adaptation state (Issue #142)
+# Published by: /saltybot/terrain_adaptation/terrain_adaptation_node
+# Topic: /saltybot/terrain
+
+builtin_interfaces/Time stamp
+
+# Surface type classification
+# Values: "smooth" | "tile" | "carpet" | "grass" | "gravel" | "unknown"
+string surface_type
+
+# Normalised roughness score 0.0 (smooth) – 1.0 (very rough)
+float32 roughness
+
+# Terrain incline in radians. Positive = uphill (front higher than rear).
+float32 incline_rad
+
+# Grip coefficient 0.0 (no grip) – 1.0 (full grip)
+float32 grip
+
+# Wheel/track slip ratio 0.0 (no slip) – 1.0 (full slip)
+float32 slip_ratio
+
+# Recommended speed multiplier 0.0 – 1.0
+# Apply to max_speed before commanding motion.
+float32 speed_scale
diff --git a/jetson/ros2_ws/src/saltybot_terrain_msgs/package.xml b/jetson/ros2_ws/src/saltybot_terrain_msgs/package.xml
new file mode 100644
index 0000000..c7fae5f
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_terrain_msgs/package.xml
@@ -0,0 +1,22 @@
+
+
+
+ saltybot_terrain_msgs
+ 0.1.0
+ Terrain message definitions for SaltyBot terrain adaptation (Issue #142)
+ sl-controls
+ MIT
+
+ ament_cmake
+ rosidl_default_generators
+
+ builtin_interfaces
+
+ rosidl_default_runtime
+
+ rosidl_interface_packages
+
+
+ ament_cmake
+
+