From 4d2b008c48cfe83cad94534bce366d9c160e211b Mon Sep 17 00:00:00 2001 From: sl-controls Date: Mon, 2 Mar 2026 09:38:46 -0500 Subject: [PATCH] feat(controls): adaptive PID balance controller with gain scheduling (Issue #136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pure modules (no ROS2 dep, fully unit-tested): - pid_controller.py: GainSet — (kp,ki,kd) with safety clamp helper PIDController — anti-windup integral, D-on-error, output clamping GainScheduler — 3-class weight table (empty/light/heavy), exponential gain blending (alpha per tick), safety bounds clamping, manual override, immediate revert-to-defaults on instability InstabilityDetector — dual criteria: tilt threshold (>50% of window) + sign-reversal count (oscillation) - weight_estimator.py: WeightEstimator — rolling-window current→weight, steady-state gating (|tilt|≤threshold), change detection (threshold_kg) CalibrationRoutine — IDLE→ROCKING→SETTLING→DONE/FAILED state machine; sinusoidal rocking output, settling current sampling, weight estimate from avg current; abort() / restart supported - adaptive_pid_node.py: 100 Hz ROS2 node Sub: /saltybot/imu (Imu, pitch from quaternion), /saltybot/motor_current Pub: /saltybot/balance_effort (Float32), /saltybot/weight_estimate, /saltybot/pid_state (JSON: gains, class, weight_kg, flags) Srv: /saltybot/calibrate_balance (std_srvs/Trigger) IMU watchdog (0.5 s), dynamic reconfigure via override_enabled param, instability → revert + PID reset, structured INFO/WARN logging - config/adaptive_pid_params.yaml, launch/adaptive_pid.launch.py, package.xml, setup.py, setup.cfg - test/test_adaptive_pid.py: 68/68 unit tests passing Co-Authored-By: Claude Sonnet 4.6 --- .../config/adaptive_pid_params.yaml | 66 ++ .../launch/adaptive_pid.launch.py | 54 ++ .../src/saltybot_adaptive_pid/package.xml | 23 + .../resource/saltybot_adaptive_pid | 0 .../saltybot_adaptive_pid/__init__.py | 0 .../adaptive_pid_node.py | 459 ++++++++++++++ .../saltybot_adaptive_pid/pid_controller.py | 333 ++++++++++ .../saltybot_adaptive_pid/weight_estimator.py | 246 ++++++++ .../src/saltybot_adaptive_pid/setup.cfg | 5 + .../src/saltybot_adaptive_pid/setup.py | 32 + .../test/test_adaptive_pid.py | 591 ++++++++++++++++++ 11 files changed, 1809 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/config/adaptive_pid_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/launch/adaptive_pid.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/resource/saltybot_adaptive_pid create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/adaptive_pid_node.py create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/pid_controller.py create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/weight_estimator.py create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_adaptive_pid/test/test_adaptive_pid.py diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/config/adaptive_pid_params.yaml b/jetson/ros2_ws/src/saltybot_adaptive_pid/config/adaptive_pid_params.yaml new file mode 100644 index 0000000..3af32d0 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/config/adaptive_pid_params.yaml @@ -0,0 +1,66 @@ +adaptive_pid: + ros__parameters: + # Control loop + control_rate: 100.0 # Hz + balance_setpoint_rad: 0.0 # target tilt (rad); 0 = upright + + # PID gains per weight class + # Tune: heavier loads need more proportional gain to overcome inertia + kp_empty: 15.0 + ki_empty: 0.5 + kd_empty: 1.5 + + kp_light: 18.0 + ki_light: 0.6 + kd_light: 2.0 + + kp_heavy: 22.0 + ki_heavy: 0.8 + kd_heavy: 2.5 + + # Safety bounds — ALL gains are hard-clamped to these regardless of source + kp_min: 5.0 + kp_max: 40.0 + ki_min: 0.0 + ki_max: 5.0 + kd_min: 0.0 + kd_max: 10.0 + output_min: -50.0 # balance effort clamp + output_max: 50.0 + i_clamp: 10.0 # anti-windup integrator clamp + + # Weight classification thresholds (kg) + weight_empty_max_kg: 0.5 # ≤ 0.5 kg → empty + weight_light_max_kg: 2.0 # ≤ 2.0 kg → light; > 2.0 → heavy + + # Weight estimation from motor current + idle_current_a: 0.5 # motor current at rest with no payload (A) + current_per_kg_a: 0.3 # current increase per kg payload (A/kg) + weight_window_s: 3.0 # rolling-average window (s) + weight_change_threshold_kg: 0.3 # triggers gain reschedule when exceeded + steady_tilt_threshold_rad: 0.05 # max tilt to accept current sample + + # Gain blending + gain_blend_alpha: 0.05 # convergence rate per tick (0=no change, 1=instant) + # at 100 Hz: ~0.4 s to 50 % of target, ~0.9 s to 90 % + + # Instability detection + instability_tilt_threshold_rad: 0.4 # |tilt| exceeds this → suspect + instability_oscillation_count: 6 # sign reversals in window → unstable + instability_window_size: 100 # samples (~1 s at 100 Hz) + + # IMU watchdog + imu_timeout_s: 0.5 # zero effort if IMU goes silent + + # Dynamic reconfigure / manual override + # Set override_enabled=true + override_kp/ki/kd to bypass scheduling + override_enabled: false + override_kp: 15.0 + override_ki: 0.5 + override_kd: 1.5 + + # Calibration routine (rock-and-settle) + calibration_amplitude_rad: 0.05 # rocking amplitude + calibration_frequency_hz: 0.5 # rocking frequency + calibration_cycles: 3 # number of rock cycles (duration = cycles/freq) + calibration_settle_s: 2.0 # settling window after rocking diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/launch/adaptive_pid.launch.py b/jetson/ros2_ws/src/saltybot_adaptive_pid/launch/adaptive_pid.launch.py new file mode 100644 index 0000000..759f756 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/launch/adaptive_pid.launch.py @@ -0,0 +1,54 @@ +""" +adaptive_pid.launch.py — Launch the adaptive balance PID controller. + +Usage: + ros2 launch saltybot_adaptive_pid adaptive_pid.launch.py + ros2 launch saltybot_adaptive_pid adaptive_pid.launch.py balance_setpoint_rad:=0.02 + +Dynamic reconfigure (no restart needed): + ros2 param set /adaptive_pid gain_blend_alpha 0.1 + ros2 param set /adaptive_pid override_enabled true + ros2 param set /adaptive_pid override_kp 12.0 + ros2 param set /adaptive_pid override_ki 0.4 + ros2 param set /adaptive_pid override_kd 1.2 + ros2 param set /adaptive_pid override_enabled false + +Calibration: + ros2 service call /saltybot/calibrate_balance std_srvs/srv/Trigger +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_adaptive_pid") + params_file = os.path.join(pkg_share, "config", "adaptive_pid_params.yaml") + + setpoint_arg = DeclareLaunchArgument( + "balance_setpoint_rad", + default_value="0.0", + description="Target tilt angle in rad (0 = upright)", + ) + + adaptive_pid = Node( + package="saltybot_adaptive_pid", + executable="adaptive_pid_node", + name="adaptive_pid", + parameters=[ + params_file, + {"balance_setpoint_rad": LaunchConfiguration("balance_setpoint_rad")}, + ], + output="screen", + emulate_tty=True, + ) + + return LaunchDescription([ + setpoint_arg, + adaptive_pid, + ]) diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/package.xml b/jetson/ros2_ws/src/saltybot_adaptive_pid/package.xml new file mode 100644 index 0000000..ce6bb93 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/package.xml @@ -0,0 +1,23 @@ + + + + saltybot_adaptive_pid + 0.1.0 + Adaptive balance PID with weight estimation and gain scheduling (Issue #136) + sl-controls + MIT + + rclpy + sensor_msgs + std_msgs + std_srvs + + ament_copyright + ament_flake8 + ament_pep257 + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/resource/saltybot_adaptive_pid b/jetson/ros2_ws/src/saltybot_adaptive_pid/resource/saltybot_adaptive_pid new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/__init__.py b/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/adaptive_pid_node.py b/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/adaptive_pid_node.py new file mode 100644 index 0000000..750653b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/adaptive_pid_node.py @@ -0,0 +1,459 @@ +""" +adaptive_pid_node.py — Adaptive balance PID controller (Issue #136). + +Overview +──────── + Subscribes to IMU and motor-current topics, runs a PID balance loop at + `control_rate` Hz, and automatically adjusts gains based on estimated + payload weight. + +Weight estimation +───────────────── + Motor current at steady state (low tilt, no cmd_vel) is sampled into a + rolling window. When enough samples accumulate: + + weight_kg = max(0, (I_avg − I_idle) / k_per_kg) + + If the estimate shifts by ≥ weight_change_threshold_kg, gains are + re-scheduled immediately (subject to blending). + +Gain scheduling +─────────────── + Three weight classes (empty / light / heavy) each have a (kp, ki, kd) + table entry. Active gains blend exponentially toward the target class at + rate gain_blend_alpha per tick. + +Safety +────── + • All gains clamped to [kp_min..kp_max], [ki_min..ki_max], [kd_min..kd_max] + • InstabilityDetector monitors tilt amplitude and sign reversals; on trigger: + – gains reverted to table defaults (no blending) + – PID integrator reset + – WARN logged + • Watchdog: if /saltybot/imu goes silent > imu_timeout_s → effort = 0 + +Dynamic reconfigure +─────────────────── + Set `override_enabled=true` plus override_kp / override_ki / override_kd + via `ros2 param set` to bypass scheduling. Set `override_enabled=false` + to return to auto-scheduling. + +Calibration service +─────────────────── + Call `/saltybot/calibrate_balance` (std_srvs/Trigger) to start the + calibration routine. The robot rocks gently for `calibration_cycles / + calibration_frequency_hz` seconds, then settles. The weight estimate is + updated and logged on completion. + +Subscribes +────────── + /saltybot/imu sensor_msgs/Imu — tilt angle (pitch from quaternion) + /saltybot/motor_current std_msgs/Float32 — motor current (A) + +Publishes +───────── + /saltybot/balance_effort std_msgs/Float32 — PID output (balance correction) + /saltybot/weight_estimate std_msgs/Float32 — current weight estimate (kg) + /saltybot/pid_state std_msgs/String — JSON: gains, class, mode, flags + +Services +──────── + /saltybot/calibrate_balance std_srvs/Trigger + +Parameters (see config/adaptive_pid_params.yaml for defaults) +────────────────────────────────────────────────────────────── + control_rate, balance_setpoint_rad, + kp_empty/light/heavy, ki_*, kd_*, + kp_min/max, ki_min/max, kd_min/max, + weight_empty_max_kg, weight_light_max_kg, + idle_current_a, current_per_kg_a, + weight_window_s, weight_change_threshold_kg, + steady_tilt_threshold_rad, gain_blend_alpha, + instability_tilt_threshold_rad, instability_oscillation_count, + instability_window_size, imu_timeout_s, + override_enabled, override_kp, override_ki, override_kd, + calibration_amplitude_rad, calibration_frequency_hz, + calibration_cycles, calibration_settle_s, + output_min, output_max, i_clamp +""" + +import json +import math +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy + +from sensor_msgs.msg import Imu +from std_msgs.msg import Float32, String +from std_srvs.srv import Trigger + +from saltybot_adaptive_pid.pid_controller import ( + GainScheduler, + GainSet, + InstabilityDetector, + PIDController, +) +from saltybot_adaptive_pid.weight_estimator import ( + CalibrationRoutine, + CalibrationState, + WeightEstimator, +) + + +def _quaternion_to_pitch(qx: float, qy: float, qz: float, qw: float) -> float: + """Extract pitch (rotation about Y) from a unit quaternion.""" + sinp = 2.0 * (qw * qy - qz * qx) + return math.asin(max(-1.0, min(1.0, sinp))) + + +class AdaptivePIDNode(Node): + + def __init__(self): + super().__init__("adaptive_pid") + + # ── Parameters ─────────────────────────────────────────────────────── + self._declare_all_parameters() + p = self._load_params() + + # ── Sub-systems ────────────────────────────────────────────────────── + self._pid = PIDController( + kp=p["kp_empty"], ki=p["ki_empty"], kd=p["kd_empty"], + output_min=p["output_min"], output_max=p["output_max"], + i_clamp=p["i_clamp"], + ) + + self._scheduler = GainScheduler( + gain_table={ + "empty": GainSet(p["kp_empty"], p["ki_empty"], p["kd_empty"]), + "light": GainSet(p["kp_light"], p["ki_light"], p["kd_light"]), + "heavy": GainSet(p["kp_heavy"], p["ki_heavy"], p["kd_heavy"]), + }, + weight_thresholds={ + "empty_max": p["weight_empty_max_kg"], + "light_max": p["weight_light_max_kg"], + }, + blend_alpha=p["gain_blend_alpha"], + kp_min=p["kp_min"], kp_max=p["kp_max"], + ki_min=p["ki_min"], ki_max=p["ki_max"], + kd_min=p["kd_min"], kd_max=p["kd_max"], + ) + + self._weight_estimator = WeightEstimator( + idle_current_a=p["idle_current_a"], + current_per_kg_a=p["current_per_kg_a"], + window_s=p["weight_window_s"], + control_rate_hz=p["control_rate"], + steady_tilt_threshold_rad=p["steady_tilt_threshold_rad"], + change_threshold_kg=p["weight_change_threshold_kg"], + ) + + self._instability_detector = InstabilityDetector( + tilt_threshold_rad=p["instability_tilt_threshold_rad"], + oscillation_count=p["instability_oscillation_count"], + window_size=p["instability_window_size"], + ) + + self._calibration = CalibrationRoutine( + amplitude_rad=p["calibration_amplitude_rad"], + frequency_hz=p["calibration_frequency_hz"], + rock_cycles=p["calibration_cycles"], + settle_s=p["calibration_settle_s"], + settle_tilt_threshold_rad=p["steady_tilt_threshold_rad"], + idle_current_a=p["idle_current_a"], + current_per_kg_a=p["current_per_kg_a"], + ) + + # ── State ──────────────────────────────────────────────────────────── + self._tilt_rad: float = 0.0 + self._current_a: float = 0.0 + self._last_imu_t: float = 0.0 + self._weight_kg: float = 0.0 + self._unstable: bool = False + self._last_ctrl_t: float = time.monotonic() + + # ── QoS ────────────────────────────────────────────────────────────── + reliable = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + best_effort = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=1, + ) + + # ── Subscriptions ──────────────────────────────────────────────────── + self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort) + self.create_subscription(Float32, "/saltybot/motor_current", self._current_cb, reliable) + + # ── Publishers ─────────────────────────────────────────────────────── + self._effort_pub = self.create_publisher(Float32, "/saltybot/balance_effort", reliable) + self._weight_pub = self.create_publisher(Float32, "/saltybot/weight_estimate", reliable) + self._state_pub = self.create_publisher(String, "/saltybot/pid_state", reliable) + + # ── Service ────────────────────────────────────────────────────────── + self.create_service(Trigger, "/saltybot/calibrate_balance", self._calibrate_srv) + + # ── Control timer ──────────────────────────────────────────────────── + rate = p["control_rate"] + self._ctrl_timer = self.create_timer(1.0 / rate, self._control_cb) + + self.get_logger().info( + f"AdaptivePIDNode ready " + f"rate={rate}Hz " + f"kp_empty={p['kp_empty']} ki_empty={p['ki_empty']} kd_empty={p['kd_empty']}" + ) + + # ── Parameter declaration / load ───────────────────────────────────────── + + def _declare_all_parameters(self) -> None: + self.declare_parameter("control_rate", 100.0) + self.declare_parameter("balance_setpoint_rad", 0.0) + # Gains per class + self.declare_parameter("kp_empty", 15.0) + self.declare_parameter("ki_empty", 0.5) + self.declare_parameter("kd_empty", 1.5) + self.declare_parameter("kp_light", 18.0) + self.declare_parameter("ki_light", 0.6) + self.declare_parameter("kd_light", 2.0) + self.declare_parameter("kp_heavy", 22.0) + self.declare_parameter("ki_heavy", 0.8) + self.declare_parameter("kd_heavy", 2.5) + # Safety bounds + self.declare_parameter("kp_min", 5.0) + self.declare_parameter("kp_max", 40.0) + self.declare_parameter("ki_min", 0.0) + self.declare_parameter("ki_max", 5.0) + self.declare_parameter("kd_min", 0.0) + self.declare_parameter("kd_max", 10.0) + self.declare_parameter("output_min", -50.0) + self.declare_parameter("output_max", 50.0) + self.declare_parameter("i_clamp", 10.0) + # Weight classification + self.declare_parameter("weight_empty_max_kg", 0.5) + self.declare_parameter("weight_light_max_kg", 2.0) + # Weight estimation + self.declare_parameter("idle_current_a", 0.5) + self.declare_parameter("current_per_kg_a", 0.3) + self.declare_parameter("weight_window_s", 3.0) + self.declare_parameter("weight_change_threshold_kg", 0.3) + self.declare_parameter("steady_tilt_threshold_rad", 0.05) + # Gain blending + self.declare_parameter("gain_blend_alpha", 0.05) + # Instability detection + self.declare_parameter("instability_tilt_threshold_rad", 0.4) + self.declare_parameter("instability_oscillation_count", 6) + self.declare_parameter("instability_window_size", 100) + # Watchdog + self.declare_parameter("imu_timeout_s", 0.5) + # Dynamic reconfigure / manual override + self.declare_parameter("override_enabled", False) + self.declare_parameter("override_kp", 15.0) + self.declare_parameter("override_ki", 0.5) + self.declare_parameter("override_kd", 1.5) + # Calibration + self.declare_parameter("calibration_amplitude_rad", 0.05) + self.declare_parameter("calibration_frequency_hz", 0.5) + self.declare_parameter("calibration_cycles", 3) + self.declare_parameter("calibration_settle_s", 2.0) + + def _load_params(self) -> dict: + g = self.get_parameter + return {k: g(k).value for k in [ + "control_rate", "balance_setpoint_rad", + "kp_empty", "ki_empty", "kd_empty", + "kp_light", "ki_light", "kd_light", + "kp_heavy", "ki_heavy", "kd_heavy", + "kp_min", "kp_max", "ki_min", "ki_max", "kd_min", "kd_max", + "output_min", "output_max", "i_clamp", + "weight_empty_max_kg", "weight_light_max_kg", + "idle_current_a", "current_per_kg_a", + "weight_window_s", "weight_change_threshold_kg", + "steady_tilt_threshold_rad", "gain_blend_alpha", + "instability_tilt_threshold_rad", "instability_oscillation_count", + "instability_window_size", "imu_timeout_s", + "override_enabled", "override_kp", "override_ki", "override_kd", + "calibration_amplitude_rad", "calibration_frequency_hz", + "calibration_cycles", "calibration_settle_s", + ]} + + # ── Callbacks ──────────────────────────────────────────────────────────── + + def _imu_cb(self, msg: Imu) -> None: + q = msg.orientation + self._tilt_rad = _quaternion_to_pitch(q.x, q.y, q.z, q.w) + self._last_imu_t = time.monotonic() + + def _current_cb(self, msg: Float32) -> None: + self._current_a = msg.data + + def _calibrate_srv(self, request, response) -> Trigger.Response: + if self._calibration.is_active: + response.success = False + response.message = "Calibration already in progress" + else: + self._calibration.start() + self._pid.reset() + self._weight_estimator.reset() + response.success = True + response.message = "Calibration started" + self.get_logger().info("Calibration started") + return response + + # ── 100 Hz control loop ────────────────────────────────────────────────── + + def _control_cb(self) -> None: + now = time.monotonic() + dt = now - self._last_ctrl_t + self._last_ctrl_t = now + + p = self._load_params() + + # ── IMU watchdog ───────────────────────────────────────────────────── + imu_age = (now - self._last_imu_t) if self._last_imu_t > 0.0 else 1e9 + if imu_age > p["imu_timeout_s"]: + self._publish_effort(0.0, p, reason="imu_timeout") + return + + tilt = self._tilt_rad + current = self._current_a + + # ── Calibration ────────────────────────────────────────────────────── + cal_offset = self._calibration.step(current, tilt, dt) + if self._calibration.state == CalibrationState.DONE and \ + self._calibration.weight_estimate is not None: + cal_wt = self._calibration.weight_estimate + self._weight_kg = cal_wt + self._weight_estimator.reset() + self.get_logger().info( + f"Calibration done: weight={cal_wt:.2f} kg " + f"class={self._scheduler.classify(cal_wt)}" + ) + + if self._calibration.state == CalibrationState.FAILED: + self.get_logger().warn("Calibration failed: insufficient settled samples") + + # ── Dynamic reconfigure / override ──────────────────────────────────── + if p["override_enabled"]: + if not self._scheduler.override_active: + self._scheduler.override_gains( + p["override_kp"], p["override_ki"], p["override_kd"] + ) + self.get_logger().info( + f"Manual override: kp={p['override_kp']} " + f"ki={p['override_ki']} kd={p['override_kd']}" + ) + else: + if self._scheduler.override_active: + self._scheduler.clear_override() + self.get_logger().info("Manual override cleared, resuming scheduling") + + # ── Weight estimation ───────────────────────────────────────────────── + if not self._calibration.is_active: + weight_est, weight_changed = self._weight_estimator.update(current, tilt) + if weight_est is not None: + if weight_changed: + old_class = self._scheduler.active_class + self._weight_kg = weight_est + new_class = self._scheduler.classify(weight_est) + if old_class != new_class: + self.get_logger().info( + f"Weight changed: {weight_est:.2f} kg " + f"({old_class} → {new_class})" + ) + else: + self.get_logger().info( + f"Weight updated: {weight_est:.2f} kg class={new_class}" + ) + + # ── Gain scheduling ────────────────────────────────────────────────── + gains, wt_class, class_changed = self._scheduler.update(self._weight_kg) + + if class_changed: + self.get_logger().info( + f"Gains rescheduled → class={wt_class} " + f"kp={gains.kp:.3f} ki={gains.ki:.3f} kd={gains.kd:.3f}" + ) + + self._pid.update_gains(gains.kp, gains.ki, gains.kd) + + # ── Instability detection ───────────────────────────────────────────── + unstable = self._instability_detector.update(tilt) + if unstable and not self._unstable: + self._unstable = True + self.get_logger().warn( + f"Instability detected (tilt={tilt:.3f} rad) — " + f"reverting gains to defaults" + ) + self._scheduler.revert_to_defaults(self._weight_kg) + reverted = self._scheduler.active_gains + self._pid.update_gains(reverted.kp, reverted.ki, reverted.kd) + self._pid.reset() + elif not unstable: + self._unstable = False + + # ── PID step ───────────────────────────────────────────────────────── + setpoint = p["balance_setpoint_rad"] + cal_offset + error = setpoint - tilt + effort = self._pid.step(error, dt) + + self._publish_effort(effort, p) + + # ── Weight + state publish ──────────────────────────────────────────── + wt_msg = Float32() + wt_msg.data = float(self._weight_kg) + self._weight_pub.publish(wt_msg) + + state_msg = String() + state_msg.data = json.dumps({ + "kp": round(gains.kp, 4), + "ki": round(gains.ki, 4), + "kd": round(gains.kd, 4), + "weight_class": wt_class, + "weight_kg": round(self._weight_kg, 3), + "override_active": self._scheduler.override_active, + "unstable": self._unstable, + "calibrating": self._calibration.is_active, + "cal_state": self._calibration.state.value, + "tilt_rad": round(tilt, 4), + "effort": round(effort, 4), + }) + self._state_pub.publish(state_msg) + + def _publish_effort(self, effort: float, p: dict, reason: str = "") -> None: + msg = Float32() + msg.data = float(effort) + self._effort_pub.publish(msg) + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + # Zero effort on shutdown + try: + msg = Float32() + msg.data = 0.0 + self._effort_pub.publish(msg) + except Exception: + pass + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = AdaptivePIDNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/pid_controller.py b/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/pid_controller.py new file mode 100644 index 0000000..9ba19d6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/pid_controller.py @@ -0,0 +1,333 @@ +""" +pid_controller.py — PID controller, gain scheduler, and instability detector +for the SaltyBot adaptive balance controller (Issue #136). + +Pure module — no ROS2 dependency; fully unit-testable. + +Classes +─────── + GainSet — immutable (kp, ki, kd) triple with safety-clamp helper + PIDController — discrete PID with anti-windup and output clamping + GainScheduler — weight-class → gain table, exponential blending, + safety bounds, manual override, default revert + InstabilityDetector — monitors tilt for threshold violations and oscillations +""" + +from __future__ import annotations + +from collections import deque +from dataclasses import dataclass + + +# ── GainSet ────────────────────────────────────────────────────────────────── + +@dataclass +class GainSet: + kp: float + ki: float + kd: float + + def clamp( + self, + kp_min: float, kp_max: float, + ki_min: float, ki_max: float, + kd_min: float, kd_max: float, + ) -> "GainSet": + """Return a new GainSet with all gains clamped to the given bounds.""" + return GainSet( + kp=max(kp_min, min(kp_max, self.kp)), + ki=max(ki_min, min(ki_max, self.ki)), + kd=max(kd_min, min(kd_max, self.kd)), + ) + + +# ── PIDController ───────────────────────────────────────────────────────────── + +class PIDController: + """ + Discrete PID controller with: + - Anti-windup: integral clamped to ±i_clamp + - Output clamping: result clamped to [output_min, output_max] + - Derivative on error (D-on-error, not D-on-measurement) + - Zero derivative on first step (no derivative kick) + """ + + def __init__( + self, + kp: float, + ki: float, + kd: float, + output_min: float = -1e9, + output_max: float = 1e9, + i_clamp: float = 1e9, + ): + self._kp = kp + self._ki = ki + self._kd = kd + self._output_min = output_min + self._output_max = output_max + self._i_clamp = abs(i_clamp) + + self._integral: float = 0.0 + self._prev_error: float | None = None + + # ── Properties ──────────────────────────────────────────────────────────── + + @property + def kp(self) -> float: + return self._kp + + @property + def ki(self) -> float: + return self._ki + + @property + def kd(self) -> float: + return self._kd + + @property + def integral(self) -> float: + return self._integral + + # ── Mutators ────────────────────────────────────────────────────────────── + + def update_gains(self, kp: float, ki: float, kd: float) -> None: + """Update gains without resetting integrator or derivative state.""" + self._kp = kp + self._ki = ki + self._kd = kd + + def reset(self) -> None: + """Clear integrator and derivative memory (e.g., after mode change).""" + self._integral = 0.0 + self._prev_error = None + + # ── Step ────────────────────────────────────────────────────────────────── + + def step(self, error: float, dt: float) -> float: + """ + Compute one PID step. + + Parameters + ---------- + error : setpoint − measurement + dt : elapsed time since last step (s); must be > 0 + + Returns + ------- + Control output clamped to [output_min, output_max]. + Returns 0.0 if dt ≤ 0. + """ + if dt <= 0.0: + return 0.0 + + # Proportional + p = self._kp * error + + # Integral with anti-windup + self._integral += error * dt + self._integral = max(-self._i_clamp, min(self._i_clamp, self._integral)) + i = self._ki * self._integral + + # Derivative (zero on first call to avoid derivative kick) + if self._prev_error is None: + d = 0.0 + else: + d = self._kd * (error - self._prev_error) / dt + self._prev_error = error + + output = p + i + d + return max(self._output_min, min(self._output_max, output)) + + +# ── GainScheduler ──────────────────────────────────────────────────────────── + +class GainScheduler: + """ + Maps estimated robot weight to PID gains via a 3-class lookup table. + Transitions between classes are blended exponentially so the active gains + track the target smoothly. + + Weight classes (ascending): + "empty" — up to weight_thresholds["empty_max"] kg + "light" — up to weight_thresholds["light_max"] kg + "heavy" — above light_max + + Safety clamping is applied to every blended gain before returning. + """ + + CLASSES = ("empty", "light", "heavy") + + def __init__( + self, + gain_table: dict[str, GainSet], + weight_thresholds: dict[str, float], + blend_alpha: float = 0.05, + kp_min: float = 0.0, kp_max: float = 100.0, + ki_min: float = 0.0, ki_max: float = 20.0, + kd_min: float = 0.0, kd_max: float = 20.0, + ): + """ + Parameters + ---------- + gain_table : {"empty": GainSet, "light": GainSet, "heavy": GainSet} + weight_thresholds : {"empty_max": float, "light_max": float} + blend_alpha : convergence rate ∈ (0, 1] per control tick + """ + self._table = gain_table + self._thresholds = weight_thresholds + self._alpha = max(1e-6, min(1.0, blend_alpha)) + self._bounds = { + "kp_min": kp_min, "kp_max": kp_max, + "ki_min": ki_min, "ki_max": ki_max, + "kd_min": kd_min, "kd_max": kd_max, + } + + # Active (blended) gains start at "empty" table values + self._active_class: str = "empty" + self._override_active: bool = False + raw = self._table["empty"] + self._active = self._apply_bounds(GainSet(raw.kp, raw.ki, raw.kd)) + + # ── Properties ──────────────────────────────────────────────────────────── + + @property + def active_class(self) -> str: + return self._active_class + + @property + def active_gains(self) -> GainSet: + return self._active + + @property + def override_active(self) -> bool: + return self._override_active + + # ── Classification ──────────────────────────────────────────────────────── + + def classify(self, weight_kg: float) -> str: + """Map weight (kg) to class name.""" + if weight_kg <= self._thresholds.get("empty_max", 0.5): + return "empty" + if weight_kg <= self._thresholds.get("light_max", 2.0): + return "light" + return "heavy" + + # ── Update ──────────────────────────────────────────────────────────────── + + def update(self, weight_kg: float) -> tuple[GainSet, str, bool]: + """ + Blend active gains toward the target for the given weight. + Manual override bypasses scheduling. + + Returns + ------- + (active_gains, active_class, class_changed) + """ + if self._override_active: + return self._active, self._active_class, False + + target_class = self.classify(weight_kg) + class_changed = target_class != self._active_class + self._active_class = target_class + + target = self._table[target_class] + a = self._alpha + blended = GainSet( + kp=(1.0 - a) * self._active.kp + a * target.kp, + ki=(1.0 - a) * self._active.ki + a * target.ki, + kd=(1.0 - a) * self._active.kd + a * target.kd, + ) + self._active = self._apply_bounds(blended) + return self._active, self._active_class, class_changed + + # ── Override / revert ──────────────────────────────────────────────────── + + def override_gains(self, kp: float, ki: float, kd: float) -> None: + """ + Apply manual gain override (dynamic reconfigure / operator tuning). + Gains are clamped to safety bounds. + """ + self._override_active = True + self._active = self._apply_bounds(GainSet(kp, ki, kd)) + + def clear_override(self) -> None: + """Release manual override; scheduling resumes on next update().""" + self._override_active = False + + def revert_to_defaults(self, weight_kg: float) -> None: + """ + Immediately snap active gains to table values for the current weight + class (no blending). Used after instability is detected. + """ + self._override_active = False + cls = self.classify(weight_kg) + self._active_class = cls + raw = self._table[cls] + self._active = self._apply_bounds(GainSet(raw.kp, raw.ki, raw.kd)) + + # ── Internal ───────────────────────────────────────────────────────────── + + def _apply_bounds(self, gains: GainSet) -> GainSet: + b = self._bounds + return gains.clamp( + b["kp_min"], b["kp_max"], + b["ki_min"], b["ki_max"], + b["kd_min"], b["kd_max"], + ) + + +# ── InstabilityDetector ─────────────────────────────────────────────────────── + +class InstabilityDetector: + """ + Detects controller instability via two independent criteria: + + 1. Tilt threshold — more than 50 % of recent samples exceed + tilt_threshold_rad → unstable + 2. Oscillations — sign-reversal count in window exceeds + oscillation_count → unstable (growing oscillations) + """ + + def __init__( + self, + tilt_threshold_rad: float = 0.4, + oscillation_count: int = 6, + window_size: int = 100, + ): + self._tilt_threshold = tilt_threshold_rad + self._osc_count = oscillation_count + self._tilts: deque = deque(maxlen=window_size) + + def update(self, tilt_rad: float) -> bool: + """ + Record tilt sample and return True if instability is detected. + False is always returned until the window has at least 4 samples. + """ + self._tilts.append(tilt_rad) + + if len(self._tilts) < 4: + return False + + tilts = list(self._tilts) + + # Criterion 1: tilt amplitude + exceed = sum(1 for t in tilts if abs(t) > self._tilt_threshold) + if exceed > len(tilts) * 0.5: + return True + + # Criterion 2: sign reversals (oscillation detection) + reversals = 0 + prev_sign: int | None = None + for t in tilts: + if abs(t) < 0.01: + continue + sign = 1 if t > 0 else -1 + if prev_sign is not None and sign != prev_sign: + reversals += 1 + prev_sign = sign + + return reversals >= self._osc_count + + def reset(self) -> None: + self._tilts.clear() diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/weight_estimator.py b/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/weight_estimator.py new file mode 100644 index 0000000..52c7411 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/saltybot_adaptive_pid/weight_estimator.py @@ -0,0 +1,246 @@ +""" +weight_estimator.py — Motor-current weight estimator and calibration routine +for the SaltyBot adaptive balance controller (Issue #136). + +Pure module — no ROS2 dependency; fully unit-testable. + +Classes +─────── + WeightEstimator — rolling-window current → weight, change detection + CalibrationRoutine — state-machine: IDLE → ROCKING → SETTLING → DONE/FAILED +""" + +from __future__ import annotations + +import math +from collections import deque +from enum import Enum + + +# ── WeightEstimator ─────────────────────────────────────────────────────────── + +class WeightEstimator: + """ + Estimates payload weight from motor current at steady state. + + Model + ───── + weight_kg = max(0, (I_avg − I_idle) / k_per_kg) + + where I_avg is averaged over a rolling window of steady-state samples. + A sample is accepted only when |tilt| ≤ steady_tilt_threshold_rad. + + Change detection + ──────────────── + Returns `changed=True` when the new estimate differs from the previous + by more than change_threshold_kg. The caller uses this to trigger gain + rescheduling. + """ + + def __init__( + self, + idle_current_a: float = 0.5, + current_per_kg_a: float = 0.3, + window_s: float = 3.0, + control_rate_hz: float = 100.0, + steady_tilt_threshold_rad: float = 0.05, + change_threshold_kg: float = 0.3, + ): + self._idle = idle_current_a + self._per_kg = max(1e-6, current_per_kg_a) # avoid div-by-zero + self._steady_threshold = steady_tilt_threshold_rad + self._change_threshold = change_threshold_kg + + window_size = max(10, int(window_s * control_rate_hz)) + self._samples: deque = deque(maxlen=window_size) + self._last_estimate: float | None = None + + # ── Properties ──────────────────────────────────────────────────────────── + + @property + def last_estimate(self) -> float | None: + return self._last_estimate + + @property + def sample_count(self) -> int: + return len(self._samples) + + # ── Update ──────────────────────────────────────────────────────────────── + + def update( + self, + current_a: float, + tilt_rad: float, + ) -> tuple[float | None, bool]: + """ + Submit one sample (only accepted if in steady state). + + Returns + ------- + (weight_kg, changed) + weight_kg : None if fewer than min_samples collected yet + changed : True if estimate moved by ≥ change_threshold_kg + """ + if abs(tilt_rad) <= self._steady_threshold: + self._samples.append(current_a) + + min_samples = max(5, self._samples.maxlen // 4) + if len(self._samples) < min_samples: + return None, False + + avg_i = sum(self._samples) / len(self._samples) + weight = max(0.0, (avg_i - self._idle) / self._per_kg) + + changed = ( + self._last_estimate is None + or abs(weight - self._last_estimate) >= self._change_threshold + ) + self._last_estimate = weight + return weight, changed + + def reset(self) -> None: + """Clear sample buffer and last estimate (e.g., after calibration).""" + self._samples.clear() + self._last_estimate = None + + +# ── CalibrationRoutine ──────────────────────────────────────────────────────── + +class CalibrationState(Enum): + IDLE = "idle" + ROCKING = "rocking" + SETTLING = "settling" + DONE = "done" + FAILED = "failed" + + +class CalibrationRoutine: + """ + On-demand calibration: rocks the robot back and forth to disturb the + balance, lets it settle, then measures steady-state current to produce + a weight estimate. + + State machine + ───────────── + IDLE ──(start)──► ROCKING ──(cycles done)──► SETTLING ──(samples ok)──► DONE + └──(timeout)────► FAILED + + abort() returns to IDLE from any active state. + + Usage + ───── + Each control tick, call step(current_a, tilt_rad, dt). + The return value is a tilt offset (rad) to add to the balance + set-point. Returns 0.0 when not active. + When state == DONE, read weight_estimate. + """ + + def __init__( + self, + amplitude_rad: float = 0.05, + frequency_hz: float = 0.5, + rock_cycles: int = 3, + settle_s: float = 2.0, + settle_tilt_threshold_rad: float = 0.03, + idle_current_a: float = 0.5, + current_per_kg_a: float = 0.3, + ): + self._amplitude = amplitude_rad + self._frequency = max(1e-3, frequency_hz) + self._rock_cycles = max(1, rock_cycles) + self._settle_s = settle_s + self._settle_threshold = settle_tilt_threshold_rad + self._idle = idle_current_a + self._per_kg = max(1e-6, current_per_kg_a) + + self._state = CalibrationState.IDLE + self._elapsed: float = 0.0 + self._settle_elapsed: float = 0.0 + self._settle_samples: list[float] = [] + self._weight_estimate: float | None = None + + # ── Properties ──────────────────────────────────────────────────────────── + + @property + def state(self) -> CalibrationState: + return self._state + + @property + def weight_estimate(self) -> float | None: + return self._weight_estimate + + @property + def is_active(self) -> bool: + return self._state in (CalibrationState.ROCKING, CalibrationState.SETTLING) + + # ── Control ─────────────────────────────────────────────────────────────── + + def start(self) -> bool: + """ + Start calibration routine. + Returns False if already running (ROCKING or SETTLING). + """ + if self.is_active: + return False + self._state = CalibrationState.ROCKING + self._elapsed = 0.0 + self._settle_elapsed = 0.0 + self._settle_samples = [] + self._weight_estimate = None + return True + + def abort(self) -> None: + """Immediately abort and return to IDLE.""" + self._state = CalibrationState.IDLE + self._elapsed = 0.0 + self._settle_elapsed = 0.0 + + # ── Step ────────────────────────────────────────────────────────────────── + + def step(self, current_a: float, tilt_rad: float, dt: float) -> float: + """ + Advance one control tick. + + Parameters + ---------- + current_a : motor current reading (A) + tilt_rad : current tilt angle (rad) + dt : elapsed time since last step (s) + + Returns + ------- + Tilt offset (rad) to superimpose on the balance set-point. + 0.0 when IDLE / DONE / FAILED. + """ + if self._state == CalibrationState.ROCKING: + self._elapsed += dt + rock_duration = self._rock_cycles / self._frequency + + if self._elapsed >= rock_duration: + self._state = CalibrationState.SETTLING + self._settle_elapsed = 0.0 + return 0.0 + + return self._amplitude * math.sin( + 2.0 * math.pi * self._frequency * self._elapsed + ) + + if self._state == CalibrationState.SETTLING: + self._settle_elapsed += dt + + if abs(tilt_rad) <= self._settle_threshold: + self._settle_samples.append(current_a) + + if self._settle_elapsed >= self._settle_s: + if len(self._settle_samples) >= 5: + avg_i = sum(self._settle_samples) / len(self._settle_samples) + self._weight_estimate = max( + 0.0, (avg_i - self._idle) / self._per_kg + ) + self._state = CalibrationState.DONE + else: + self._state = CalibrationState.FAILED + + return 0.0 + + return 0.0 diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/setup.cfg b/jetson/ros2_ws/src/saltybot_adaptive_pid/setup.cfg new file mode 100644 index 0000000..f8a2c1f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_adaptive_pid + +[install] +install_scripts=$base/lib/saltybot_adaptive_pid diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/setup.py b/jetson/ros2_ws/src/saltybot_adaptive_pid/setup.py new file mode 100644 index 0000000..e1f3193 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages +import os +from glob import glob + +package_name = "saltybot_adaptive_pid" + +setup( + name=package_name, + version="0.1.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", + [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (os.path.join("share", package_name, "config"), + glob("config/*.yaml")), + (os.path.join("share", package_name, "launch"), + glob("launch/*.py")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="Adaptive balance PID with weight estimation and gain scheduling", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + f"adaptive_pid_node = {package_name}.adaptive_pid_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_adaptive_pid/test/test_adaptive_pid.py b/jetson/ros2_ws/src/saltybot_adaptive_pid/test/test_adaptive_pid.py new file mode 100644 index 0000000..99e6db6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_adaptive_pid/test/test_adaptive_pid.py @@ -0,0 +1,591 @@ +""" +test_adaptive_pid.py — Unit tests for pid_controller.py and weight_estimator.py +(Issue #136). No ROS2 runtime required — pure Python. +""" + +import math +import pytest + +from saltybot_adaptive_pid.pid_controller import ( + GainSet, + GainScheduler, + InstabilityDetector, + PIDController, +) +from saltybot_adaptive_pid.weight_estimator import ( + CalibrationRoutine, + CalibrationState, + WeightEstimator, +) + + +# ─── GainSet ───────────────────────────────────────────────────────────────── + +class TestGainSet: + + def test_fields(self): + g = GainSet(kp=1.0, ki=0.1, kd=0.5) + assert g.kp == pytest.approx(1.0) + assert g.ki == pytest.approx(0.1) + assert g.kd == pytest.approx(0.5) + + def test_clamp_within_bounds_unchanged(self): + g = GainSet(5.0, 1.0, 2.0).clamp(0, 10, 0, 5, 0, 5) + assert g.kp == pytest.approx(5.0) + assert g.ki == pytest.approx(1.0) + assert g.kd == pytest.approx(2.0) + + def test_clamp_below_min(self): + g = GainSet(-1.0, -0.5, -2.0).clamp(0, 10, 0, 5, 0, 5) + assert g.kp == pytest.approx(0.0) + assert g.ki == pytest.approx(0.0) + assert g.kd == pytest.approx(0.0) + + def test_clamp_above_max(self): + g = GainSet(200.0, 50.0, 100.0).clamp(0, 10, 0, 5, 0, 5) + assert g.kp == pytest.approx(10.0) + assert g.ki == pytest.approx(5.0) + assert g.kd == pytest.approx(5.0) + + def test_clamp_returns_new_instance(self): + g = GainSet(5.0, 1.0, 2.0) + g2 = g.clamp(0, 10, 0, 5, 0, 5) + assert g2 is not g + + +# ─── PIDController ─────────────────────────────────────────────────────────── + +class TestPIDController: + + def _make(self, kp=1.0, ki=0.0, kd=0.0, **kwargs): + return PIDController(kp=kp, ki=ki, kd=kd, **kwargs) + + def test_p_only(self): + pid = self._make(kp=2.0) + assert pid.step(1.0, 0.01) == pytest.approx(2.0) + + def test_p_negative_error(self): + pid = self._make(kp=3.0) + assert pid.step(-1.0, 0.01) == pytest.approx(-3.0) + + def test_i_accumulates(self): + pid = self._make(kp=0.0, ki=1.0) # isolate I term + # After 3 steps of error=1, dt=0.1: integral = 0.3, output = ki*integral = 0.3 + pid.step(1.0, 0.1) + pid.step(1.0, 0.1) + out = pid.step(1.0, 0.1) + assert out == pytest.approx(0.3, abs=1e-9) + + def test_d_zero_on_first_step(self): + pid = self._make(kp=0.0, kd=10.0) # isolate D term + out = pid.step(1.0, 0.01) + assert out == pytest.approx(0.0) # D is zero on first step + + def test_d_on_rate_change(self): + pid = self._make(kp=0.0, kd=1.0) # isolate D term + pid.step(0.0, 0.1) # first step, D=0 + out = pid.step(1.0, 0.1) # error changed by 1 over dt=0.1 → D=1/0.1=10 + assert out == pytest.approx(10.0) + + def test_output_min_clamp(self): + # Negative error → large negative output → clamped at output_min + pid = self._make(kp=100.0, output_min=-5.0) + out = pid.step(-1.0, 0.01) + assert out == pytest.approx(-5.0) + + def test_output_max_clamp(self): + pid = self._make(kp=100.0, output_max=5.0) + out = pid.step(1.0, 0.01) + assert out == pytest.approx(5.0) + + def test_anti_windup_clamps_integral(self): + pid = self._make(ki=1.0, i_clamp=0.5) + for _ in range(100): + pid.step(1.0, 0.1) # integrate a lot + assert abs(pid.integral) <= 0.5 + 1e-9 + + def test_reset_clears_integral(self): + pid = self._make(ki=1.0) + for _ in range(10): + pid.step(1.0, 0.1) + assert pid.integral > 0.0 + pid.reset() + assert pid.integral == pytest.approx(0.0) + + def test_reset_clears_derivative_state(self): + pid = self._make(kp=0.0, kd=5.0) # isolate D term + pid.step(1.0, 0.1) # sets prev_error + pid.reset() + out = pid.step(1.0, 0.1) # prev_error=None after reset → D=0 + assert out == pytest.approx(0.0) + + def test_update_gains_takes_effect(self): + pid = self._make(kp=1.0) + pid.update_gains(kp=3.0, ki=0.0, kd=0.0) + assert pid.step(1.0, 0.01) == pytest.approx(3.0) + + def test_zero_dt_returns_zero(self): + pid = self._make(kp=5.0) + assert pid.step(1.0, 0.0) == pytest.approx(0.0) + + def test_negative_dt_returns_zero(self): + pid = self._make(kp=5.0) + assert pid.step(1.0, -0.1) == pytest.approx(0.0) + + def test_zero_error_no_pid_output(self): + pid = self._make(kp=5.0, ki=1.0, kd=1.0) + pid.step(0.0, 0.1) + out = pid.step(0.0, 0.1) + assert out == pytest.approx(0.0) + + def test_gain_properties(self): + pid = self._make(kp=2.0, ki=0.5, kd=0.3) + assert pid.kp == pytest.approx(2.0) + assert pid.ki == pytest.approx(0.5) + assert pid.kd == pytest.approx(0.3) + + +# ─── GainScheduler ─────────────────────────────────────────────────────────── + +def _make_scheduler(**kwargs) -> GainScheduler: + table = { + "empty": GainSet(10.0, 0.5, 1.0), + "light": GainSet(15.0, 0.7, 1.5), + "heavy": GainSet(20.0, 1.0, 2.0), + } + thresholds = {"empty_max": 0.5, "light_max": 2.0} + defaults = dict(blend_alpha=0.05, kp_min=0.0, kp_max=100.0, + ki_min=0.0, ki_max=10.0, kd_min=0.0, kd_max=10.0) + defaults.update(kwargs) + return GainScheduler(table, thresholds, **defaults) + + +class TestGainScheduler: + + def test_classify_empty(self): + s = _make_scheduler() + assert s.classify(0.0) == "empty" + assert s.classify(0.5) == "empty" + + def test_classify_light(self): + s = _make_scheduler() + assert s.classify(0.51) == "light" + assert s.classify(2.0) == "light" + + def test_classify_heavy(self): + s = _make_scheduler() + assert s.classify(2.01) == "heavy" + assert s.classify(10.0) == "heavy" + + def test_classify_boundary_empty_light(self): + s = _make_scheduler() + assert s.classify(0.5) == "empty" + assert s.classify(0.500001) == "light" + + def test_initial_class_is_empty(self): + s = _make_scheduler() + assert s.active_class == "empty" + + def test_update_blends_toward_target(self): + s = _make_scheduler(blend_alpha=0.1) + initial_kp = s.active_gains.kp + # Push toward "heavy" + for _ in range(50): + gains, _, _ = s.update(5.0) + # Should have moved significantly toward heavy (kp=20) + assert gains.kp > initial_kp + 1.0 + + def test_blend_does_not_overshoot(self): + s = _make_scheduler(blend_alpha=0.5) + target_kp = 20.0 # heavy + for _ in range(200): + gains, _, _ = s.update(5.0) + assert gains.kp <= target_kp + 1e-6 + + def test_class_changed_flag_true_on_change(self): + s = _make_scheduler() + s.update(0.1) # empty + _, _, changed = s.update(5.0) # → heavy + assert changed is True + + def test_class_changed_flag_false_same_class(self): + s = _make_scheduler() + s.update(0.1) + _, _, changed = s.update(0.2) # still empty + assert changed is False + + def test_override_gains_applied(self): + s = _make_scheduler() + s.override_gains(kp=7.0, ki=0.3, kd=0.8) + assert s.active_gains.kp == pytest.approx(7.0) + assert s.active_gains.ki == pytest.approx(0.3) + assert s.override_active is True + + def test_override_bypasses_scheduling(self): + s = _make_scheduler() + s.override_gains(kp=7.0, ki=0.3, kd=0.8) + for _ in range(20): + gains, _, _ = s.update(5.0) # heavy weight, should not change + assert gains.kp == pytest.approx(7.0) + + def test_override_gains_clamped(self): + s = _make_scheduler(kp_max=10.0) + s.override_gains(kp=999.0, ki=0.5, kd=1.0) + assert s.active_gains.kp == pytest.approx(10.0) + + def test_clear_override_resumes_scheduling(self): + s = _make_scheduler() + s.override_gains(kp=7.0, ki=0.3, kd=0.8) + s.clear_override() + assert s.override_active is False + + def test_revert_to_defaults_snaps_immediately(self): + s = _make_scheduler(blend_alpha=0.001) + for _ in range(5): + s.update(5.0) # start blending toward heavy (kp=20) + s.revert_to_defaults(0.0) # snap back to empty (kp=10) + assert s.active_gains.kp == pytest.approx(10.0) + assert s.active_class == "empty" + + def test_revert_clears_override(self): + s = _make_scheduler() + s.override_gains(kp=7.0, ki=0.3, kd=0.8) + s.revert_to_defaults(0.0) + assert s.override_active is False + + def test_safety_bounds_clamp_table_gains(self): + s = _make_scheduler(kp_max=12.0) # cap below heavy table value (20) + for _ in range(200): + gains, _, _ = s.update(5.0) # heavy + assert gains.kp <= 12.0 + 1e-9 + + +# ─── InstabilityDetector ───────────────────────────────────────────────────── + +class TestInstabilityDetector: + + def _make(self, **kwargs): + defaults = dict(tilt_threshold_rad=0.4, oscillation_count=6, window_size=50) + defaults.update(kwargs) + return InstabilityDetector(**defaults) + + def test_stable_small_tilts(self): + det = self._make() + for _ in range(50): + assert det.update(0.1) is False + + def test_too_few_samples_always_stable(self): + det = self._make() + assert det.update(1.0) is False # only 1 sample + assert det.update(1.0) is False + assert det.update(1.0) is False + + def test_unstable_tilt_exceeds_threshold(self): + det = self._make(tilt_threshold_rad=0.3, window_size=10) + for _ in range(6): + det.update(0.5) # > 0.3, majority + assert det.update(0.5) is True + + def test_stable_below_threshold(self): + det = self._make(tilt_threshold_rad=0.5) + for _ in range(20): + result = det.update(0.3) + assert result is False + + def test_oscillation_triggers_unstable(self): + det = self._make(oscillation_count=4, window_size=50) + # Inject 8 alternating signs → 7 reversals ≥ 4 + tilts = [0.1, -0.1] * 8 + results = [det.update(t) for t in tilts] + assert any(results) + + def test_reset_clears_window(self): + det = self._make(tilt_threshold_rad=0.3, window_size=10) + for _ in range(10): + det.update(0.5) + det.reset() + # After reset, too few samples → stable + assert det.update(0.5) is False + + +# ─── WeightEstimator ───────────────────────────────────────────────────────── + +def _make_estimator(**kwargs) -> WeightEstimator: + defaults = dict( + idle_current_a=0.5, current_per_kg_a=0.3, + window_s=1.0, control_rate_hz=10.0, # window_size=10 + steady_tilt_threshold_rad=0.05, change_threshold_kg=0.3, + ) + defaults.update(kwargs) + return WeightEstimator(**defaults) + + +class TestWeightEstimator: + + def test_insufficient_samples_returns_none(self): + est = _make_estimator() + result, _ = est.update(0.5, tilt_rad=0.0) + assert result is None + + def test_sufficient_samples_returns_estimate(self): + est = _make_estimator() + for _ in range(10): + est.update(0.8, tilt_rad=0.0) # 0.8A → (0.8-0.5)/0.3 = 1.0 kg + wt, _ = est.update(0.8, tilt_rad=0.0) + assert wt == pytest.approx(1.0, abs=0.05) + + def test_non_steady_samples_rejected(self): + est = _make_estimator() + for _ in range(20): + est.update(0.8, tilt_rad=0.1) # |0.1| > 0.05 → rejected + result, _ = est.update(0.8, tilt_rad=0.1) + assert result is None + + def test_known_current_gives_correct_weight(self): + # 1.1A, idle=0.5, per_kg=0.3 → (1.1-0.5)/0.3 = 2.0 kg + est = _make_estimator() + for _ in range(15): + est.update(1.1, tilt_rad=0.0) + wt, _ = est.update(1.1, tilt_rad=0.0) + assert wt == pytest.approx(2.0, abs=0.05) + + def test_negative_estimate_clamped_to_zero(self): + # current below idle → weight should be 0 + est = _make_estimator() + for _ in range(15): + est.update(0.1, tilt_rad=0.0) + wt, _ = est.update(0.1, tilt_rad=0.0) + assert wt == pytest.approx(0.0) + + def test_weight_changed_flag_on_first_estimate(self): + est = _make_estimator() + # Find the first non-None estimate — it must have changed=True + for _ in range(15): + wt, changed = est.update(0.8, tilt_rad=0.0) + if wt is not None: + assert changed is True + break + + def test_weight_unchanged_below_threshold(self): + est = _make_estimator(change_threshold_kg=0.5) + for _ in range(15): + est.update(0.8, tilt_rad=0.0) + # Now feed very similar current (tiny perturbation) + for _ in range(15): + wt, changed = est.update(0.81, tilt_rad=0.0) + # delta is ~0.03 kg << threshold 0.5 + assert changed is False + + def test_reset_clears_samples_and_estimate(self): + est = _make_estimator() + for _ in range(15): + est.update(0.8, tilt_rad=0.0) + est.reset() + assert est.last_estimate is None + result, _ = est.update(0.8, tilt_rad=0.0) + assert result is None # need to re-fill window + + +# ─── CalibrationRoutine ─────────────────────────────────────────────────────── + +def _make_cal(**kwargs) -> CalibrationRoutine: + defaults = dict( + amplitude_rad=0.05, frequency_hz=1.0, + rock_cycles=2, settle_s=1.0, + settle_tilt_threshold_rad=0.03, + idle_current_a=0.5, current_per_kg_a=0.3, + ) + defaults.update(kwargs) + return CalibrationRoutine(**defaults) + + +class TestCalibrationRoutine: + + def test_initial_state_idle(self): + cal = _make_cal() + assert cal.state == CalibrationState.IDLE + assert cal.is_active is False + + def test_start_transitions_to_rocking(self): + cal = _make_cal() + assert cal.start() is True + assert cal.state == CalibrationState.ROCKING + assert cal.is_active is True + + def test_start_returns_false_when_active(self): + cal = _make_cal() + cal.start() + assert cal.start() is False + + def test_start_succeeds_from_done(self): + cal = _make_cal(rock_cycles=1, settle_s=0.01) + cal.start() + dt = 0.01 + for _ in range(200): + cal.step(0.8, tilt_rad=0.0, dt=dt) + # Should be DONE or FAILED at this point + assert cal.state in (CalibrationState.DONE, CalibrationState.FAILED) + assert cal.start() is True # can restart + + def test_abort_returns_to_idle(self): + cal = _make_cal() + cal.start() + cal.abort() + assert cal.state == CalibrationState.IDLE + assert cal.is_active is False + + def test_rocking_output_sinusoidal(self): + # freq=1Hz, cycles=2 → duration=2s; use 110 steps at dt=0.01 to cover >0.5 period + cal = _make_cal(amplitude_rad=0.1, frequency_hz=1.0) + cal.start() + outputs = [cal.step(0.5, 0.0, 0.01) for _ in range(110)] + # Sinusoidal over >1 s: must cross zero and include both signs + assert any(o > 0.0 for o in outputs) + assert any(o < 0.0 for o in outputs) + + def test_rocking_output_bounded_by_amplitude(self): + cal = _make_cal(amplitude_rad=0.05) + cal.start() + for _ in range(200): + out = cal.step(0.5, 0.0, 0.005) + assert abs(out) <= 0.05 + 1e-9 + + def test_idle_output_is_zero(self): + cal = _make_cal() + assert cal.step(0.5, 0.0, 0.01) == pytest.approx(0.0) + + def test_rocking_transitions_to_settling(self): + # rock_cycles=2, freq=1Hz → rock_duration=2s; dt=0.1 → 20 steps + cal = _make_cal(rock_cycles=2, frequency_hz=1.0) + cal.start() + for _ in range(22): + cal.step(0.5, 0.0, 0.1) + assert cal.state == CalibrationState.SETTLING + + def test_settling_output_is_zero(self): + cal = _make_cal(rock_cycles=1, frequency_hz=2.0) + cal.start() + # Skip through rocking + for _ in range(15): + cal.step(0.5, 0.0, 0.1) + # Now in settling + assert cal.state == CalibrationState.SETTLING + out = cal.step(0.5, 0.0, 0.1) + assert out == pytest.approx(0.0) + + def test_done_with_enough_settled_samples(self): + # freq=2Hz, cycles=1 → rock_duration=0.5s; dt=0.05 → 10 steps + # settle_s=0.5s → 10 more steps + cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5) + cal.start() + for _ in range(100): + cal.step(0.8, tilt_rad=0.0, dt=0.05) + assert cal.state == CalibrationState.DONE + assert cal.weight_estimate is not None + assert cal.weight_estimate >= 0.0 + + def test_weight_estimate_known_current(self): + # 0.8A, idle=0.5, per_kg=0.3 → 1.0 kg + cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5) + cal.start() + for _ in range(100): + cal.step(0.8, tilt_rad=0.0, dt=0.05) + assert cal.state == CalibrationState.DONE + assert cal.weight_estimate == pytest.approx(1.0, abs=0.1) + + def test_failed_when_no_settled_samples(self): + # Robot never settles (large tilt) + cal = _make_cal( + rock_cycles=1, frequency_hz=2.0, settle_s=0.5, + settle_tilt_threshold_rad=0.01 + ) + cal.start() + for _ in range(100): + cal.step(0.8, tilt_rad=0.1, dt=0.05) # tilt > threshold → no samples + assert cal.state == CalibrationState.FAILED + assert cal.weight_estimate is None + + def test_rocking_duration(self): + """Rocking lasts exactly rock_cycles / frequency_hz seconds.""" + freq = 1.0 + cycles = 3 + cal = _make_cal(rock_cycles=cycles, frequency_hz=freq, settle_s=100.0) + cal.start() + rock_dur = cycles / freq # 3 s + dt = 0.01 + t = 0.0 + while cal.state == CalibrationState.ROCKING and t < rock_dur + 0.1: + cal.step(0.5, 0.0, dt) + t += dt + assert t == pytest.approx(rock_dur, abs=0.02) + + def test_weight_clamped_to_zero_below_idle(self): + # Current below idle → should clamp to 0, not negative + cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5) + cal.start() + for _ in range(100): + cal.step(0.2, tilt_rad=0.0, dt=0.05) # 0.2 < 0.5 idle + if cal.state == CalibrationState.DONE: + assert cal.weight_estimate == pytest.approx(0.0) + + +# ─── Integration ───────────────────────────────────────────────────────────── + +class TestIntegration: + + def test_pid_with_gain_scheduler_round_trip(self): + """GainScheduler feeds gains into PID; both operate consistently.""" + sched = _make_scheduler() + pid = PIDController(kp=10.0, ki=0.5, kd=1.0, + output_min=-50.0, output_max=50.0) + + # Simulate 1 s at 100 Hz, weight=0.2 kg (empty class) + dt = 0.01 + total_effort = 0.0 + for _ in range(100): + gains, _, _ = sched.update(0.2) + pid.update_gains(gains.kp, gains.ki, gains.kd) + error = 0.1 # constant tilt error + total_effort += pid.step(error, dt) + + assert total_effort > 0.0 # robot should be correcting + + def test_instability_reverts_pid_gains(self): + sched = _make_scheduler() + det = InstabilityDetector(tilt_threshold_rad=0.3, window_size=10) + pid = PIDController(kp=10.0, ki=0.0, kd=0.0) + + # Inject large oscillations + tilts = [0.5, -0.5] * 10 + for t in tilts: + det.update(t) + if det.update(t): + sched.revert_to_defaults(0.0) + gains = sched.active_gains + pid.update_gains(gains.kp, gains.ki, gains.kd) + pid.reset() + break + + assert sched.active_class == "empty" + assert pid.integral == pytest.approx(0.0) + + def test_calibration_updates_weight_estimator_chain(self): + """Full: calibration → weight estimate → gain scheduling.""" + cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5) + sched = _make_scheduler() + + # Run calibration + cal.start() + for _ in range(100): + cal.step(0.8, tilt_rad=0.0, dt=0.05) + + assert cal.state == CalibrationState.DONE + wt = cal.weight_estimate + assert wt is not None + + # Feed weight into scheduler + for _ in range(5): + gains, cls, _ = sched.update(wt) + + # ~1 kg → "light" class + assert cls == "light" -- 2.47.2