Merge pull request 'feat(controls): adaptive PID balance controller with gain scheduling (Issue #136)' (#149) from sl-controls/issue-136-adaptive-pid into main

This commit is contained in:
sl-jetson 2026-03-02 09:51:19 -05:00
commit 732045a086
11 changed files with 1809 additions and 0 deletions

View File

@ -0,0 +1,66 @@
adaptive_pid:
ros__parameters:
# Control loop
control_rate: 100.0 # Hz
balance_setpoint_rad: 0.0 # target tilt (rad); 0 = upright
# PID gains per weight class
# Tune: heavier loads need more proportional gain to overcome inertia
kp_empty: 15.0
ki_empty: 0.5
kd_empty: 1.5
kp_light: 18.0
ki_light: 0.6
kd_light: 2.0
kp_heavy: 22.0
ki_heavy: 0.8
kd_heavy: 2.5
# Safety bounds — ALL gains are hard-clamped to these regardless of source
kp_min: 5.0
kp_max: 40.0
ki_min: 0.0
ki_max: 5.0
kd_min: 0.0
kd_max: 10.0
output_min: -50.0 # balance effort clamp
output_max: 50.0
i_clamp: 10.0 # anti-windup integrator clamp
# Weight classification thresholds (kg)
weight_empty_max_kg: 0.5 # ≤ 0.5 kg → empty
weight_light_max_kg: 2.0 # ≤ 2.0 kg → light; > 2.0 → heavy
# Weight estimation from motor current
idle_current_a: 0.5 # motor current at rest with no payload (A)
current_per_kg_a: 0.3 # current increase per kg payload (A/kg)
weight_window_s: 3.0 # rolling-average window (s)
weight_change_threshold_kg: 0.3 # triggers gain reschedule when exceeded
steady_tilt_threshold_rad: 0.05 # max tilt to accept current sample
# Gain blending
gain_blend_alpha: 0.05 # convergence rate per tick (0=no change, 1=instant)
# at 100 Hz: ~0.4 s to 50 % of target, ~0.9 s to 90 %
# Instability detection
instability_tilt_threshold_rad: 0.4 # |tilt| exceeds this → suspect
instability_oscillation_count: 6 # sign reversals in window → unstable
instability_window_size: 100 # samples (~1 s at 100 Hz)
# IMU watchdog
imu_timeout_s: 0.5 # zero effort if IMU goes silent
# Dynamic reconfigure / manual override
# Set override_enabled=true + override_kp/ki/kd to bypass scheduling
override_enabled: false
override_kp: 15.0
override_ki: 0.5
override_kd: 1.5
# Calibration routine (rock-and-settle)
calibration_amplitude_rad: 0.05 # rocking amplitude
calibration_frequency_hz: 0.5 # rocking frequency
calibration_cycles: 3 # number of rock cycles (duration = cycles/freq)
calibration_settle_s: 2.0 # settling window after rocking

View File

@ -0,0 +1,54 @@
"""
adaptive_pid.launch.py Launch the adaptive balance PID controller.
Usage:
ros2 launch saltybot_adaptive_pid adaptive_pid.launch.py
ros2 launch saltybot_adaptive_pid adaptive_pid.launch.py balance_setpoint_rad:=0.02
Dynamic reconfigure (no restart needed):
ros2 param set /adaptive_pid gain_blend_alpha 0.1
ros2 param set /adaptive_pid override_enabled true
ros2 param set /adaptive_pid override_kp 12.0
ros2 param set /adaptive_pid override_ki 0.4
ros2 param set /adaptive_pid override_kd 1.2
ros2 param set /adaptive_pid override_enabled false
Calibration:
ros2 service call /saltybot/calibrate_balance std_srvs/srv/Trigger
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_adaptive_pid")
params_file = os.path.join(pkg_share, "config", "adaptive_pid_params.yaml")
setpoint_arg = DeclareLaunchArgument(
"balance_setpoint_rad",
default_value="0.0",
description="Target tilt angle in rad (0 = upright)",
)
adaptive_pid = Node(
package="saltybot_adaptive_pid",
executable="adaptive_pid_node",
name="adaptive_pid",
parameters=[
params_file,
{"balance_setpoint_rad": LaunchConfiguration("balance_setpoint_rad")},
],
output="screen",
emulate_tty=True,
)
return LaunchDescription([
setpoint_arg,
adaptive_pid,
])

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_adaptive_pid</name>
<version>0.1.0</version>
<description>Adaptive balance PID with weight estimation and gain scheduling (Issue #136)</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,459 @@
"""
adaptive_pid_node.py Adaptive balance PID controller (Issue #136).
Overview
Subscribes to IMU and motor-current topics, runs a PID balance loop at
`control_rate` Hz, and automatically adjusts gains based on estimated
payload weight.
Weight estimation
Motor current at steady state (low tilt, no cmd_vel) is sampled into a
rolling window. When enough samples accumulate:
weight_kg = max(0, (I_avg I_idle) / k_per_kg)
If the estimate shifts by weight_change_threshold_kg, gains are
re-scheduled immediately (subject to blending).
Gain scheduling
Three weight classes (empty / light / heavy) each have a (kp, ki, kd)
table entry. Active gains blend exponentially toward the target class at
rate gain_blend_alpha per tick.
Safety
All gains clamped to [kp_min..kp_max], [ki_min..ki_max], [kd_min..kd_max]
InstabilityDetector monitors tilt amplitude and sign reversals; on trigger:
gains reverted to table defaults (no blending)
PID integrator reset
WARN logged
Watchdog: if /saltybot/imu goes silent > imu_timeout_s effort = 0
Dynamic reconfigure
Set `override_enabled=true` plus override_kp / override_ki / override_kd
via `ros2 param set` to bypass scheduling. Set `override_enabled=false`
to return to auto-scheduling.
Calibration service
Call `/saltybot/calibrate_balance` (std_srvs/Trigger) to start the
calibration routine. The robot rocks gently for `calibration_cycles /
calibration_frequency_hz` seconds, then settles. The weight estimate is
updated and logged on completion.
Subscribes
/saltybot/imu sensor_msgs/Imu tilt angle (pitch from quaternion)
/saltybot/motor_current std_msgs/Float32 motor current (A)
Publishes
/saltybot/balance_effort std_msgs/Float32 PID output (balance correction)
/saltybot/weight_estimate std_msgs/Float32 current weight estimate (kg)
/saltybot/pid_state std_msgs/String JSON: gains, class, mode, flags
Services
/saltybot/calibrate_balance std_srvs/Trigger
Parameters (see config/adaptive_pid_params.yaml for defaults)
control_rate, balance_setpoint_rad,
kp_empty/light/heavy, ki_*, kd_*,
kp_min/max, ki_min/max, kd_min/max,
weight_empty_max_kg, weight_light_max_kg,
idle_current_a, current_per_kg_a,
weight_window_s, weight_change_threshold_kg,
steady_tilt_threshold_rad, gain_blend_alpha,
instability_tilt_threshold_rad, instability_oscillation_count,
instability_window_size, imu_timeout_s,
override_enabled, override_kp, override_ki, override_kd,
calibration_amplitude_rad, calibration_frequency_hz,
calibration_cycles, calibration_settle_s,
output_min, output_max, i_clamp
"""
import json
import math
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
from sensor_msgs.msg import Imu
from std_msgs.msg import Float32, String
from std_srvs.srv import Trigger
from saltybot_adaptive_pid.pid_controller import (
GainScheduler,
GainSet,
InstabilityDetector,
PIDController,
)
from saltybot_adaptive_pid.weight_estimator import (
CalibrationRoutine,
CalibrationState,
WeightEstimator,
)
def _quaternion_to_pitch(qx: float, qy: float, qz: float, qw: float) -> float:
"""Extract pitch (rotation about Y) from a unit quaternion."""
sinp = 2.0 * (qw * qy - qz * qx)
return math.asin(max(-1.0, min(1.0, sinp)))
class AdaptivePIDNode(Node):
def __init__(self):
super().__init__("adaptive_pid")
# ── Parameters ───────────────────────────────────────────────────────
self._declare_all_parameters()
p = self._load_params()
# ── Sub-systems ──────────────────────────────────────────────────────
self._pid = PIDController(
kp=p["kp_empty"], ki=p["ki_empty"], kd=p["kd_empty"],
output_min=p["output_min"], output_max=p["output_max"],
i_clamp=p["i_clamp"],
)
self._scheduler = GainScheduler(
gain_table={
"empty": GainSet(p["kp_empty"], p["ki_empty"], p["kd_empty"]),
"light": GainSet(p["kp_light"], p["ki_light"], p["kd_light"]),
"heavy": GainSet(p["kp_heavy"], p["ki_heavy"], p["kd_heavy"]),
},
weight_thresholds={
"empty_max": p["weight_empty_max_kg"],
"light_max": p["weight_light_max_kg"],
},
blend_alpha=p["gain_blend_alpha"],
kp_min=p["kp_min"], kp_max=p["kp_max"],
ki_min=p["ki_min"], ki_max=p["ki_max"],
kd_min=p["kd_min"], kd_max=p["kd_max"],
)
self._weight_estimator = WeightEstimator(
idle_current_a=p["idle_current_a"],
current_per_kg_a=p["current_per_kg_a"],
window_s=p["weight_window_s"],
control_rate_hz=p["control_rate"],
steady_tilt_threshold_rad=p["steady_tilt_threshold_rad"],
change_threshold_kg=p["weight_change_threshold_kg"],
)
self._instability_detector = InstabilityDetector(
tilt_threshold_rad=p["instability_tilt_threshold_rad"],
oscillation_count=p["instability_oscillation_count"],
window_size=p["instability_window_size"],
)
self._calibration = CalibrationRoutine(
amplitude_rad=p["calibration_amplitude_rad"],
frequency_hz=p["calibration_frequency_hz"],
rock_cycles=p["calibration_cycles"],
settle_s=p["calibration_settle_s"],
settle_tilt_threshold_rad=p["steady_tilt_threshold_rad"],
idle_current_a=p["idle_current_a"],
current_per_kg_a=p["current_per_kg_a"],
)
# ── State ────────────────────────────────────────────────────────────
self._tilt_rad: float = 0.0
self._current_a: float = 0.0
self._last_imu_t: float = 0.0
self._weight_kg: float = 0.0
self._unstable: bool = False
self._last_ctrl_t: float = time.monotonic()
# ── QoS ──────────────────────────────────────────────────────────────
reliable = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
best_effort = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort)
self.create_subscription(Float32, "/saltybot/motor_current", self._current_cb, reliable)
# ── Publishers ───────────────────────────────────────────────────────
self._effort_pub = self.create_publisher(Float32, "/saltybot/balance_effort", reliable)
self._weight_pub = self.create_publisher(Float32, "/saltybot/weight_estimate", reliable)
self._state_pub = self.create_publisher(String, "/saltybot/pid_state", reliable)
# ── Service ──────────────────────────────────────────────────────────
self.create_service(Trigger, "/saltybot/calibrate_balance", self._calibrate_srv)
# ── Control timer ────────────────────────────────────────────────────
rate = p["control_rate"]
self._ctrl_timer = self.create_timer(1.0 / rate, self._control_cb)
self.get_logger().info(
f"AdaptivePIDNode ready "
f"rate={rate}Hz "
f"kp_empty={p['kp_empty']} ki_empty={p['ki_empty']} kd_empty={p['kd_empty']}"
)
# ── Parameter declaration / load ─────────────────────────────────────────
def _declare_all_parameters(self) -> None:
self.declare_parameter("control_rate", 100.0)
self.declare_parameter("balance_setpoint_rad", 0.0)
# Gains per class
self.declare_parameter("kp_empty", 15.0)
self.declare_parameter("ki_empty", 0.5)
self.declare_parameter("kd_empty", 1.5)
self.declare_parameter("kp_light", 18.0)
self.declare_parameter("ki_light", 0.6)
self.declare_parameter("kd_light", 2.0)
self.declare_parameter("kp_heavy", 22.0)
self.declare_parameter("ki_heavy", 0.8)
self.declare_parameter("kd_heavy", 2.5)
# Safety bounds
self.declare_parameter("kp_min", 5.0)
self.declare_parameter("kp_max", 40.0)
self.declare_parameter("ki_min", 0.0)
self.declare_parameter("ki_max", 5.0)
self.declare_parameter("kd_min", 0.0)
self.declare_parameter("kd_max", 10.0)
self.declare_parameter("output_min", -50.0)
self.declare_parameter("output_max", 50.0)
self.declare_parameter("i_clamp", 10.0)
# Weight classification
self.declare_parameter("weight_empty_max_kg", 0.5)
self.declare_parameter("weight_light_max_kg", 2.0)
# Weight estimation
self.declare_parameter("idle_current_a", 0.5)
self.declare_parameter("current_per_kg_a", 0.3)
self.declare_parameter("weight_window_s", 3.0)
self.declare_parameter("weight_change_threshold_kg", 0.3)
self.declare_parameter("steady_tilt_threshold_rad", 0.05)
# Gain blending
self.declare_parameter("gain_blend_alpha", 0.05)
# Instability detection
self.declare_parameter("instability_tilt_threshold_rad", 0.4)
self.declare_parameter("instability_oscillation_count", 6)
self.declare_parameter("instability_window_size", 100)
# Watchdog
self.declare_parameter("imu_timeout_s", 0.5)
# Dynamic reconfigure / manual override
self.declare_parameter("override_enabled", False)
self.declare_parameter("override_kp", 15.0)
self.declare_parameter("override_ki", 0.5)
self.declare_parameter("override_kd", 1.5)
# Calibration
self.declare_parameter("calibration_amplitude_rad", 0.05)
self.declare_parameter("calibration_frequency_hz", 0.5)
self.declare_parameter("calibration_cycles", 3)
self.declare_parameter("calibration_settle_s", 2.0)
def _load_params(self) -> dict:
g = self.get_parameter
return {k: g(k).value for k in [
"control_rate", "balance_setpoint_rad",
"kp_empty", "ki_empty", "kd_empty",
"kp_light", "ki_light", "kd_light",
"kp_heavy", "ki_heavy", "kd_heavy",
"kp_min", "kp_max", "ki_min", "ki_max", "kd_min", "kd_max",
"output_min", "output_max", "i_clamp",
"weight_empty_max_kg", "weight_light_max_kg",
"idle_current_a", "current_per_kg_a",
"weight_window_s", "weight_change_threshold_kg",
"steady_tilt_threshold_rad", "gain_blend_alpha",
"instability_tilt_threshold_rad", "instability_oscillation_count",
"instability_window_size", "imu_timeout_s",
"override_enabled", "override_kp", "override_ki", "override_kd",
"calibration_amplitude_rad", "calibration_frequency_hz",
"calibration_cycles", "calibration_settle_s",
]}
# ── Callbacks ────────────────────────────────────────────────────────────
def _imu_cb(self, msg: Imu) -> None:
q = msg.orientation
self._tilt_rad = _quaternion_to_pitch(q.x, q.y, q.z, q.w)
self._last_imu_t = time.monotonic()
def _current_cb(self, msg: Float32) -> None:
self._current_a = msg.data
def _calibrate_srv(self, request, response) -> Trigger.Response:
if self._calibration.is_active:
response.success = False
response.message = "Calibration already in progress"
else:
self._calibration.start()
self._pid.reset()
self._weight_estimator.reset()
response.success = True
response.message = "Calibration started"
self.get_logger().info("Calibration started")
return response
# ── 100 Hz control loop ──────────────────────────────────────────────────
def _control_cb(self) -> None:
now = time.monotonic()
dt = now - self._last_ctrl_t
self._last_ctrl_t = now
p = self._load_params()
# ── IMU watchdog ─────────────────────────────────────────────────────
imu_age = (now - self._last_imu_t) if self._last_imu_t > 0.0 else 1e9
if imu_age > p["imu_timeout_s"]:
self._publish_effort(0.0, p, reason="imu_timeout")
return
tilt = self._tilt_rad
current = self._current_a
# ── Calibration ──────────────────────────────────────────────────────
cal_offset = self._calibration.step(current, tilt, dt)
if self._calibration.state == CalibrationState.DONE and \
self._calibration.weight_estimate is not None:
cal_wt = self._calibration.weight_estimate
self._weight_kg = cal_wt
self._weight_estimator.reset()
self.get_logger().info(
f"Calibration done: weight={cal_wt:.2f} kg "
f"class={self._scheduler.classify(cal_wt)}"
)
if self._calibration.state == CalibrationState.FAILED:
self.get_logger().warn("Calibration failed: insufficient settled samples")
# ── Dynamic reconfigure / override ────────────────────────────────────
if p["override_enabled"]:
if not self._scheduler.override_active:
self._scheduler.override_gains(
p["override_kp"], p["override_ki"], p["override_kd"]
)
self.get_logger().info(
f"Manual override: kp={p['override_kp']} "
f"ki={p['override_ki']} kd={p['override_kd']}"
)
else:
if self._scheduler.override_active:
self._scheduler.clear_override()
self.get_logger().info("Manual override cleared, resuming scheduling")
# ── Weight estimation ─────────────────────────────────────────────────
if not self._calibration.is_active:
weight_est, weight_changed = self._weight_estimator.update(current, tilt)
if weight_est is not None:
if weight_changed:
old_class = self._scheduler.active_class
self._weight_kg = weight_est
new_class = self._scheduler.classify(weight_est)
if old_class != new_class:
self.get_logger().info(
f"Weight changed: {weight_est:.2f} kg "
f"({old_class}{new_class})"
)
else:
self.get_logger().info(
f"Weight updated: {weight_est:.2f} kg class={new_class}"
)
# ── Gain scheduling ──────────────────────────────────────────────────
gains, wt_class, class_changed = self._scheduler.update(self._weight_kg)
if class_changed:
self.get_logger().info(
f"Gains rescheduled → class={wt_class} "
f"kp={gains.kp:.3f} ki={gains.ki:.3f} kd={gains.kd:.3f}"
)
self._pid.update_gains(gains.kp, gains.ki, gains.kd)
# ── Instability detection ─────────────────────────────────────────────
unstable = self._instability_detector.update(tilt)
if unstable and not self._unstable:
self._unstable = True
self.get_logger().warn(
f"Instability detected (tilt={tilt:.3f} rad) — "
f"reverting gains to defaults"
)
self._scheduler.revert_to_defaults(self._weight_kg)
reverted = self._scheduler.active_gains
self._pid.update_gains(reverted.kp, reverted.ki, reverted.kd)
self._pid.reset()
elif not unstable:
self._unstable = False
# ── PID step ─────────────────────────────────────────────────────────
setpoint = p["balance_setpoint_rad"] + cal_offset
error = setpoint - tilt
effort = self._pid.step(error, dt)
self._publish_effort(effort, p)
# ── Weight + state publish ────────────────────────────────────────────
wt_msg = Float32()
wt_msg.data = float(self._weight_kg)
self._weight_pub.publish(wt_msg)
state_msg = String()
state_msg.data = json.dumps({
"kp": round(gains.kp, 4),
"ki": round(gains.ki, 4),
"kd": round(gains.kd, 4),
"weight_class": wt_class,
"weight_kg": round(self._weight_kg, 3),
"override_active": self._scheduler.override_active,
"unstable": self._unstable,
"calibrating": self._calibration.is_active,
"cal_state": self._calibration.state.value,
"tilt_rad": round(tilt, 4),
"effort": round(effort, 4),
})
self._state_pub.publish(state_msg)
def _publish_effort(self, effort: float, p: dict, reason: str = "") -> None:
msg = Float32()
msg.data = float(effort)
self._effort_pub.publish(msg)
# ── Lifecycle ─────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
# Zero effort on shutdown
try:
msg = Float32()
msg.data = 0.0
self._effort_pub.publish(msg)
except Exception:
pass
super().destroy_node()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = AdaptivePIDNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,333 @@
"""
pid_controller.py PID controller, gain scheduler, and instability detector
for the SaltyBot adaptive balance controller (Issue #136).
Pure module no ROS2 dependency; fully unit-testable.
Classes
GainSet immutable (kp, ki, kd) triple with safety-clamp helper
PIDController discrete PID with anti-windup and output clamping
GainScheduler weight-class gain table, exponential blending,
safety bounds, manual override, default revert
InstabilityDetector monitors tilt for threshold violations and oscillations
"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass
# ── GainSet ──────────────────────────────────────────────────────────────────
@dataclass
class GainSet:
kp: float
ki: float
kd: float
def clamp(
self,
kp_min: float, kp_max: float,
ki_min: float, ki_max: float,
kd_min: float, kd_max: float,
) -> "GainSet":
"""Return a new GainSet with all gains clamped to the given bounds."""
return GainSet(
kp=max(kp_min, min(kp_max, self.kp)),
ki=max(ki_min, min(ki_max, self.ki)),
kd=max(kd_min, min(kd_max, self.kd)),
)
# ── PIDController ─────────────────────────────────────────────────────────────
class PIDController:
"""
Discrete PID controller with:
- Anti-windup: integral clamped to ±i_clamp
- Output clamping: result clamped to [output_min, output_max]
- Derivative on error (D-on-error, not D-on-measurement)
- Zero derivative on first step (no derivative kick)
"""
def __init__(
self,
kp: float,
ki: float,
kd: float,
output_min: float = -1e9,
output_max: float = 1e9,
i_clamp: float = 1e9,
):
self._kp = kp
self._ki = ki
self._kd = kd
self._output_min = output_min
self._output_max = output_max
self._i_clamp = abs(i_clamp)
self._integral: float = 0.0
self._prev_error: float | None = None
# ── Properties ────────────────────────────────────────────────────────────
@property
def kp(self) -> float:
return self._kp
@property
def ki(self) -> float:
return self._ki
@property
def kd(self) -> float:
return self._kd
@property
def integral(self) -> float:
return self._integral
# ── Mutators ──────────────────────────────────────────────────────────────
def update_gains(self, kp: float, ki: float, kd: float) -> None:
"""Update gains without resetting integrator or derivative state."""
self._kp = kp
self._ki = ki
self._kd = kd
def reset(self) -> None:
"""Clear integrator and derivative memory (e.g., after mode change)."""
self._integral = 0.0
self._prev_error = None
# ── Step ──────────────────────────────────────────────────────────────────
def step(self, error: float, dt: float) -> float:
"""
Compute one PID step.
Parameters
----------
error : setpoint measurement
dt : elapsed time since last step (s); must be > 0
Returns
-------
Control output clamped to [output_min, output_max].
Returns 0.0 if dt 0.
"""
if dt <= 0.0:
return 0.0
# Proportional
p = self._kp * error
# Integral with anti-windup
self._integral += error * dt
self._integral = max(-self._i_clamp, min(self._i_clamp, self._integral))
i = self._ki * self._integral
# Derivative (zero on first call to avoid derivative kick)
if self._prev_error is None:
d = 0.0
else:
d = self._kd * (error - self._prev_error) / dt
self._prev_error = error
output = p + i + d
return max(self._output_min, min(self._output_max, output))
# ── GainScheduler ────────────────────────────────────────────────────────────
class GainScheduler:
"""
Maps estimated robot weight to PID gains via a 3-class lookup table.
Transitions between classes are blended exponentially so the active gains
track the target smoothly.
Weight classes (ascending):
"empty" up to weight_thresholds["empty_max"] kg
"light" up to weight_thresholds["light_max"] kg
"heavy" above light_max
Safety clamping is applied to every blended gain before returning.
"""
CLASSES = ("empty", "light", "heavy")
def __init__(
self,
gain_table: dict[str, GainSet],
weight_thresholds: dict[str, float],
blend_alpha: float = 0.05,
kp_min: float = 0.0, kp_max: float = 100.0,
ki_min: float = 0.0, ki_max: float = 20.0,
kd_min: float = 0.0, kd_max: float = 20.0,
):
"""
Parameters
----------
gain_table : {"empty": GainSet, "light": GainSet, "heavy": GainSet}
weight_thresholds : {"empty_max": float, "light_max": float}
blend_alpha : convergence rate (0, 1] per control tick
"""
self._table = gain_table
self._thresholds = weight_thresholds
self._alpha = max(1e-6, min(1.0, blend_alpha))
self._bounds = {
"kp_min": kp_min, "kp_max": kp_max,
"ki_min": ki_min, "ki_max": ki_max,
"kd_min": kd_min, "kd_max": kd_max,
}
# Active (blended) gains start at "empty" table values
self._active_class: str = "empty"
self._override_active: bool = False
raw = self._table["empty"]
self._active = self._apply_bounds(GainSet(raw.kp, raw.ki, raw.kd))
# ── Properties ────────────────────────────────────────────────────────────
@property
def active_class(self) -> str:
return self._active_class
@property
def active_gains(self) -> GainSet:
return self._active
@property
def override_active(self) -> bool:
return self._override_active
# ── Classification ────────────────────────────────────────────────────────
def classify(self, weight_kg: float) -> str:
"""Map weight (kg) to class name."""
if weight_kg <= self._thresholds.get("empty_max", 0.5):
return "empty"
if weight_kg <= self._thresholds.get("light_max", 2.0):
return "light"
return "heavy"
# ── Update ────────────────────────────────────────────────────────────────
def update(self, weight_kg: float) -> tuple[GainSet, str, bool]:
"""
Blend active gains toward the target for the given weight.
Manual override bypasses scheduling.
Returns
-------
(active_gains, active_class, class_changed)
"""
if self._override_active:
return self._active, self._active_class, False
target_class = self.classify(weight_kg)
class_changed = target_class != self._active_class
self._active_class = target_class
target = self._table[target_class]
a = self._alpha
blended = GainSet(
kp=(1.0 - a) * self._active.kp + a * target.kp,
ki=(1.0 - a) * self._active.ki + a * target.ki,
kd=(1.0 - a) * self._active.kd + a * target.kd,
)
self._active = self._apply_bounds(blended)
return self._active, self._active_class, class_changed
# ── Override / revert ────────────────────────────────────────────────────
def override_gains(self, kp: float, ki: float, kd: float) -> None:
"""
Apply manual gain override (dynamic reconfigure / operator tuning).
Gains are clamped to safety bounds.
"""
self._override_active = True
self._active = self._apply_bounds(GainSet(kp, ki, kd))
def clear_override(self) -> None:
"""Release manual override; scheduling resumes on next update()."""
self._override_active = False
def revert_to_defaults(self, weight_kg: float) -> None:
"""
Immediately snap active gains to table values for the current weight
class (no blending). Used after instability is detected.
"""
self._override_active = False
cls = self.classify(weight_kg)
self._active_class = cls
raw = self._table[cls]
self._active = self._apply_bounds(GainSet(raw.kp, raw.ki, raw.kd))
# ── Internal ─────────────────────────────────────────────────────────────
def _apply_bounds(self, gains: GainSet) -> GainSet:
b = self._bounds
return gains.clamp(
b["kp_min"], b["kp_max"],
b["ki_min"], b["ki_max"],
b["kd_min"], b["kd_max"],
)
# ── InstabilityDetector ───────────────────────────────────────────────────────
class InstabilityDetector:
"""
Detects controller instability via two independent criteria:
1. Tilt threshold more than 50 % of recent samples exceed
tilt_threshold_rad unstable
2. Oscillations sign-reversal count in window exceeds
oscillation_count unstable (growing oscillations)
"""
def __init__(
self,
tilt_threshold_rad: float = 0.4,
oscillation_count: int = 6,
window_size: int = 100,
):
self._tilt_threshold = tilt_threshold_rad
self._osc_count = oscillation_count
self._tilts: deque = deque(maxlen=window_size)
def update(self, tilt_rad: float) -> bool:
"""
Record tilt sample and return True if instability is detected.
False is always returned until the window has at least 4 samples.
"""
self._tilts.append(tilt_rad)
if len(self._tilts) < 4:
return False
tilts = list(self._tilts)
# Criterion 1: tilt amplitude
exceed = sum(1 for t in tilts if abs(t) > self._tilt_threshold)
if exceed > len(tilts) * 0.5:
return True
# Criterion 2: sign reversals (oscillation detection)
reversals = 0
prev_sign: int | None = None
for t in tilts:
if abs(t) < 0.01:
continue
sign = 1 if t > 0 else -1
if prev_sign is not None and sign != prev_sign:
reversals += 1
prev_sign = sign
return reversals >= self._osc_count
def reset(self) -> None:
self._tilts.clear()

View File

@ -0,0 +1,246 @@
"""
weight_estimator.py Motor-current weight estimator and calibration routine
for the SaltyBot adaptive balance controller (Issue #136).
Pure module no ROS2 dependency; fully unit-testable.
Classes
WeightEstimator rolling-window current weight, change detection
CalibrationRoutine state-machine: IDLE ROCKING SETTLING DONE/FAILED
"""
from __future__ import annotations
import math
from collections import deque
from enum import Enum
# ── WeightEstimator ───────────────────────────────────────────────────────────
class WeightEstimator:
"""
Estimates payload weight from motor current at steady state.
Model
weight_kg = max(0, (I_avg I_idle) / k_per_kg)
where I_avg is averaged over a rolling window of steady-state samples.
A sample is accepted only when |tilt| steady_tilt_threshold_rad.
Change detection
Returns `changed=True` when the new estimate differs from the previous
by more than change_threshold_kg. The caller uses this to trigger gain
rescheduling.
"""
def __init__(
self,
idle_current_a: float = 0.5,
current_per_kg_a: float = 0.3,
window_s: float = 3.0,
control_rate_hz: float = 100.0,
steady_tilt_threshold_rad: float = 0.05,
change_threshold_kg: float = 0.3,
):
self._idle = idle_current_a
self._per_kg = max(1e-6, current_per_kg_a) # avoid div-by-zero
self._steady_threshold = steady_tilt_threshold_rad
self._change_threshold = change_threshold_kg
window_size = max(10, int(window_s * control_rate_hz))
self._samples: deque = deque(maxlen=window_size)
self._last_estimate: float | None = None
# ── Properties ────────────────────────────────────────────────────────────
@property
def last_estimate(self) -> float | None:
return self._last_estimate
@property
def sample_count(self) -> int:
return len(self._samples)
# ── Update ────────────────────────────────────────────────────────────────
def update(
self,
current_a: float,
tilt_rad: float,
) -> tuple[float | None, bool]:
"""
Submit one sample (only accepted if in steady state).
Returns
-------
(weight_kg, changed)
weight_kg : None if fewer than min_samples collected yet
changed : True if estimate moved by change_threshold_kg
"""
if abs(tilt_rad) <= self._steady_threshold:
self._samples.append(current_a)
min_samples = max(5, self._samples.maxlen // 4)
if len(self._samples) < min_samples:
return None, False
avg_i = sum(self._samples) / len(self._samples)
weight = max(0.0, (avg_i - self._idle) / self._per_kg)
changed = (
self._last_estimate is None
or abs(weight - self._last_estimate) >= self._change_threshold
)
self._last_estimate = weight
return weight, changed
def reset(self) -> None:
"""Clear sample buffer and last estimate (e.g., after calibration)."""
self._samples.clear()
self._last_estimate = None
# ── CalibrationRoutine ────────────────────────────────────────────────────────
class CalibrationState(Enum):
IDLE = "idle"
ROCKING = "rocking"
SETTLING = "settling"
DONE = "done"
FAILED = "failed"
class CalibrationRoutine:
"""
On-demand calibration: rocks the robot back and forth to disturb the
balance, lets it settle, then measures steady-state current to produce
a weight estimate.
State machine
IDLE (start) ROCKING (cycles done) SETTLING (samples ok) DONE
(timeout) FAILED
abort() returns to IDLE from any active state.
Usage
Each control tick, call step(current_a, tilt_rad, dt).
The return value is a tilt offset (rad) to add to the balance
set-point. Returns 0.0 when not active.
When state == DONE, read weight_estimate.
"""
def __init__(
self,
amplitude_rad: float = 0.05,
frequency_hz: float = 0.5,
rock_cycles: int = 3,
settle_s: float = 2.0,
settle_tilt_threshold_rad: float = 0.03,
idle_current_a: float = 0.5,
current_per_kg_a: float = 0.3,
):
self._amplitude = amplitude_rad
self._frequency = max(1e-3, frequency_hz)
self._rock_cycles = max(1, rock_cycles)
self._settle_s = settle_s
self._settle_threshold = settle_tilt_threshold_rad
self._idle = idle_current_a
self._per_kg = max(1e-6, current_per_kg_a)
self._state = CalibrationState.IDLE
self._elapsed: float = 0.0
self._settle_elapsed: float = 0.0
self._settle_samples: list[float] = []
self._weight_estimate: float | None = None
# ── Properties ────────────────────────────────────────────────────────────
@property
def state(self) -> CalibrationState:
return self._state
@property
def weight_estimate(self) -> float | None:
return self._weight_estimate
@property
def is_active(self) -> bool:
return self._state in (CalibrationState.ROCKING, CalibrationState.SETTLING)
# ── Control ───────────────────────────────────────────────────────────────
def start(self) -> bool:
"""
Start calibration routine.
Returns False if already running (ROCKING or SETTLING).
"""
if self.is_active:
return False
self._state = CalibrationState.ROCKING
self._elapsed = 0.0
self._settle_elapsed = 0.0
self._settle_samples = []
self._weight_estimate = None
return True
def abort(self) -> None:
"""Immediately abort and return to IDLE."""
self._state = CalibrationState.IDLE
self._elapsed = 0.0
self._settle_elapsed = 0.0
# ── Step ──────────────────────────────────────────────────────────────────
def step(self, current_a: float, tilt_rad: float, dt: float) -> float:
"""
Advance one control tick.
Parameters
----------
current_a : motor current reading (A)
tilt_rad : current tilt angle (rad)
dt : elapsed time since last step (s)
Returns
-------
Tilt offset (rad) to superimpose on the balance set-point.
0.0 when IDLE / DONE / FAILED.
"""
if self._state == CalibrationState.ROCKING:
self._elapsed += dt
rock_duration = self._rock_cycles / self._frequency
if self._elapsed >= rock_duration:
self._state = CalibrationState.SETTLING
self._settle_elapsed = 0.0
return 0.0
return self._amplitude * math.sin(
2.0 * math.pi * self._frequency * self._elapsed
)
if self._state == CalibrationState.SETTLING:
self._settle_elapsed += dt
if abs(tilt_rad) <= self._settle_threshold:
self._settle_samples.append(current_a)
if self._settle_elapsed >= self._settle_s:
if len(self._settle_samples) >= 5:
avg_i = sum(self._settle_samples) / len(self._settle_samples)
self._weight_estimate = max(
0.0, (avg_i - self._idle) / self._per_kg
)
self._state = CalibrationState.DONE
else:
self._state = CalibrationState.FAILED
return 0.0
return 0.0

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_adaptive_pid
[install]
install_scripts=$base/lib/saltybot_adaptive_pid

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
import os
from glob import glob
package_name = "saltybot_adaptive_pid"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "config"),
glob("config/*.yaml")),
(os.path.join("share", package_name, "launch"),
glob("launch/*.py")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Adaptive balance PID with weight estimation and gain scheduling",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
f"adaptive_pid_node = {package_name}.adaptive_pid_node:main",
],
},
)

View File

@ -0,0 +1,591 @@
"""
test_adaptive_pid.py Unit tests for pid_controller.py and weight_estimator.py
(Issue #136). No ROS2 runtime required — pure Python.
"""
import math
import pytest
from saltybot_adaptive_pid.pid_controller import (
GainSet,
GainScheduler,
InstabilityDetector,
PIDController,
)
from saltybot_adaptive_pid.weight_estimator import (
CalibrationRoutine,
CalibrationState,
WeightEstimator,
)
# ─── GainSet ─────────────────────────────────────────────────────────────────
class TestGainSet:
def test_fields(self):
g = GainSet(kp=1.0, ki=0.1, kd=0.5)
assert g.kp == pytest.approx(1.0)
assert g.ki == pytest.approx(0.1)
assert g.kd == pytest.approx(0.5)
def test_clamp_within_bounds_unchanged(self):
g = GainSet(5.0, 1.0, 2.0).clamp(0, 10, 0, 5, 0, 5)
assert g.kp == pytest.approx(5.0)
assert g.ki == pytest.approx(1.0)
assert g.kd == pytest.approx(2.0)
def test_clamp_below_min(self):
g = GainSet(-1.0, -0.5, -2.0).clamp(0, 10, 0, 5, 0, 5)
assert g.kp == pytest.approx(0.0)
assert g.ki == pytest.approx(0.0)
assert g.kd == pytest.approx(0.0)
def test_clamp_above_max(self):
g = GainSet(200.0, 50.0, 100.0).clamp(0, 10, 0, 5, 0, 5)
assert g.kp == pytest.approx(10.0)
assert g.ki == pytest.approx(5.0)
assert g.kd == pytest.approx(5.0)
def test_clamp_returns_new_instance(self):
g = GainSet(5.0, 1.0, 2.0)
g2 = g.clamp(0, 10, 0, 5, 0, 5)
assert g2 is not g
# ─── PIDController ───────────────────────────────────────────────────────────
class TestPIDController:
def _make(self, kp=1.0, ki=0.0, kd=0.0, **kwargs):
return PIDController(kp=kp, ki=ki, kd=kd, **kwargs)
def test_p_only(self):
pid = self._make(kp=2.0)
assert pid.step(1.0, 0.01) == pytest.approx(2.0)
def test_p_negative_error(self):
pid = self._make(kp=3.0)
assert pid.step(-1.0, 0.01) == pytest.approx(-3.0)
def test_i_accumulates(self):
pid = self._make(kp=0.0, ki=1.0) # isolate I term
# After 3 steps of error=1, dt=0.1: integral = 0.3, output = ki*integral = 0.3
pid.step(1.0, 0.1)
pid.step(1.0, 0.1)
out = pid.step(1.0, 0.1)
assert out == pytest.approx(0.3, abs=1e-9)
def test_d_zero_on_first_step(self):
pid = self._make(kp=0.0, kd=10.0) # isolate D term
out = pid.step(1.0, 0.01)
assert out == pytest.approx(0.0) # D is zero on first step
def test_d_on_rate_change(self):
pid = self._make(kp=0.0, kd=1.0) # isolate D term
pid.step(0.0, 0.1) # first step, D=0
out = pid.step(1.0, 0.1) # error changed by 1 over dt=0.1 → D=1/0.1=10
assert out == pytest.approx(10.0)
def test_output_min_clamp(self):
# Negative error → large negative output → clamped at output_min
pid = self._make(kp=100.0, output_min=-5.0)
out = pid.step(-1.0, 0.01)
assert out == pytest.approx(-5.0)
def test_output_max_clamp(self):
pid = self._make(kp=100.0, output_max=5.0)
out = pid.step(1.0, 0.01)
assert out == pytest.approx(5.0)
def test_anti_windup_clamps_integral(self):
pid = self._make(ki=1.0, i_clamp=0.5)
for _ in range(100):
pid.step(1.0, 0.1) # integrate a lot
assert abs(pid.integral) <= 0.5 + 1e-9
def test_reset_clears_integral(self):
pid = self._make(ki=1.0)
for _ in range(10):
pid.step(1.0, 0.1)
assert pid.integral > 0.0
pid.reset()
assert pid.integral == pytest.approx(0.0)
def test_reset_clears_derivative_state(self):
pid = self._make(kp=0.0, kd=5.0) # isolate D term
pid.step(1.0, 0.1) # sets prev_error
pid.reset()
out = pid.step(1.0, 0.1) # prev_error=None after reset → D=0
assert out == pytest.approx(0.0)
def test_update_gains_takes_effect(self):
pid = self._make(kp=1.0)
pid.update_gains(kp=3.0, ki=0.0, kd=0.0)
assert pid.step(1.0, 0.01) == pytest.approx(3.0)
def test_zero_dt_returns_zero(self):
pid = self._make(kp=5.0)
assert pid.step(1.0, 0.0) == pytest.approx(0.0)
def test_negative_dt_returns_zero(self):
pid = self._make(kp=5.0)
assert pid.step(1.0, -0.1) == pytest.approx(0.0)
def test_zero_error_no_pid_output(self):
pid = self._make(kp=5.0, ki=1.0, kd=1.0)
pid.step(0.0, 0.1)
out = pid.step(0.0, 0.1)
assert out == pytest.approx(0.0)
def test_gain_properties(self):
pid = self._make(kp=2.0, ki=0.5, kd=0.3)
assert pid.kp == pytest.approx(2.0)
assert pid.ki == pytest.approx(0.5)
assert pid.kd == pytest.approx(0.3)
# ─── GainScheduler ───────────────────────────────────────────────────────────
def _make_scheduler(**kwargs) -> GainScheduler:
table = {
"empty": GainSet(10.0, 0.5, 1.0),
"light": GainSet(15.0, 0.7, 1.5),
"heavy": GainSet(20.0, 1.0, 2.0),
}
thresholds = {"empty_max": 0.5, "light_max": 2.0}
defaults = dict(blend_alpha=0.05, kp_min=0.0, kp_max=100.0,
ki_min=0.0, ki_max=10.0, kd_min=0.0, kd_max=10.0)
defaults.update(kwargs)
return GainScheduler(table, thresholds, **defaults)
class TestGainScheduler:
def test_classify_empty(self):
s = _make_scheduler()
assert s.classify(0.0) == "empty"
assert s.classify(0.5) == "empty"
def test_classify_light(self):
s = _make_scheduler()
assert s.classify(0.51) == "light"
assert s.classify(2.0) == "light"
def test_classify_heavy(self):
s = _make_scheduler()
assert s.classify(2.01) == "heavy"
assert s.classify(10.0) == "heavy"
def test_classify_boundary_empty_light(self):
s = _make_scheduler()
assert s.classify(0.5) == "empty"
assert s.classify(0.500001) == "light"
def test_initial_class_is_empty(self):
s = _make_scheduler()
assert s.active_class == "empty"
def test_update_blends_toward_target(self):
s = _make_scheduler(blend_alpha=0.1)
initial_kp = s.active_gains.kp
# Push toward "heavy"
for _ in range(50):
gains, _, _ = s.update(5.0)
# Should have moved significantly toward heavy (kp=20)
assert gains.kp > initial_kp + 1.0
def test_blend_does_not_overshoot(self):
s = _make_scheduler(blend_alpha=0.5)
target_kp = 20.0 # heavy
for _ in range(200):
gains, _, _ = s.update(5.0)
assert gains.kp <= target_kp + 1e-6
def test_class_changed_flag_true_on_change(self):
s = _make_scheduler()
s.update(0.1) # empty
_, _, changed = s.update(5.0) # → heavy
assert changed is True
def test_class_changed_flag_false_same_class(self):
s = _make_scheduler()
s.update(0.1)
_, _, changed = s.update(0.2) # still empty
assert changed is False
def test_override_gains_applied(self):
s = _make_scheduler()
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
assert s.active_gains.kp == pytest.approx(7.0)
assert s.active_gains.ki == pytest.approx(0.3)
assert s.override_active is True
def test_override_bypasses_scheduling(self):
s = _make_scheduler()
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
for _ in range(20):
gains, _, _ = s.update(5.0) # heavy weight, should not change
assert gains.kp == pytest.approx(7.0)
def test_override_gains_clamped(self):
s = _make_scheduler(kp_max=10.0)
s.override_gains(kp=999.0, ki=0.5, kd=1.0)
assert s.active_gains.kp == pytest.approx(10.0)
def test_clear_override_resumes_scheduling(self):
s = _make_scheduler()
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
s.clear_override()
assert s.override_active is False
def test_revert_to_defaults_snaps_immediately(self):
s = _make_scheduler(blend_alpha=0.001)
for _ in range(5):
s.update(5.0) # start blending toward heavy (kp=20)
s.revert_to_defaults(0.0) # snap back to empty (kp=10)
assert s.active_gains.kp == pytest.approx(10.0)
assert s.active_class == "empty"
def test_revert_clears_override(self):
s = _make_scheduler()
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
s.revert_to_defaults(0.0)
assert s.override_active is False
def test_safety_bounds_clamp_table_gains(self):
s = _make_scheduler(kp_max=12.0) # cap below heavy table value (20)
for _ in range(200):
gains, _, _ = s.update(5.0) # heavy
assert gains.kp <= 12.0 + 1e-9
# ─── InstabilityDetector ─────────────────────────────────────────────────────
class TestInstabilityDetector:
def _make(self, **kwargs):
defaults = dict(tilt_threshold_rad=0.4, oscillation_count=6, window_size=50)
defaults.update(kwargs)
return InstabilityDetector(**defaults)
def test_stable_small_tilts(self):
det = self._make()
for _ in range(50):
assert det.update(0.1) is False
def test_too_few_samples_always_stable(self):
det = self._make()
assert det.update(1.0) is False # only 1 sample
assert det.update(1.0) is False
assert det.update(1.0) is False
def test_unstable_tilt_exceeds_threshold(self):
det = self._make(tilt_threshold_rad=0.3, window_size=10)
for _ in range(6):
det.update(0.5) # > 0.3, majority
assert det.update(0.5) is True
def test_stable_below_threshold(self):
det = self._make(tilt_threshold_rad=0.5)
for _ in range(20):
result = det.update(0.3)
assert result is False
def test_oscillation_triggers_unstable(self):
det = self._make(oscillation_count=4, window_size=50)
# Inject 8 alternating signs → 7 reversals ≥ 4
tilts = [0.1, -0.1] * 8
results = [det.update(t) for t in tilts]
assert any(results)
def test_reset_clears_window(self):
det = self._make(tilt_threshold_rad=0.3, window_size=10)
for _ in range(10):
det.update(0.5)
det.reset()
# After reset, too few samples → stable
assert det.update(0.5) is False
# ─── WeightEstimator ─────────────────────────────────────────────────────────
def _make_estimator(**kwargs) -> WeightEstimator:
defaults = dict(
idle_current_a=0.5, current_per_kg_a=0.3,
window_s=1.0, control_rate_hz=10.0, # window_size=10
steady_tilt_threshold_rad=0.05, change_threshold_kg=0.3,
)
defaults.update(kwargs)
return WeightEstimator(**defaults)
class TestWeightEstimator:
def test_insufficient_samples_returns_none(self):
est = _make_estimator()
result, _ = est.update(0.5, tilt_rad=0.0)
assert result is None
def test_sufficient_samples_returns_estimate(self):
est = _make_estimator()
for _ in range(10):
est.update(0.8, tilt_rad=0.0) # 0.8A → (0.8-0.5)/0.3 = 1.0 kg
wt, _ = est.update(0.8, tilt_rad=0.0)
assert wt == pytest.approx(1.0, abs=0.05)
def test_non_steady_samples_rejected(self):
est = _make_estimator()
for _ in range(20):
est.update(0.8, tilt_rad=0.1) # |0.1| > 0.05 → rejected
result, _ = est.update(0.8, tilt_rad=0.1)
assert result is None
def test_known_current_gives_correct_weight(self):
# 1.1A, idle=0.5, per_kg=0.3 → (1.1-0.5)/0.3 = 2.0 kg
est = _make_estimator()
for _ in range(15):
est.update(1.1, tilt_rad=0.0)
wt, _ = est.update(1.1, tilt_rad=0.0)
assert wt == pytest.approx(2.0, abs=0.05)
def test_negative_estimate_clamped_to_zero(self):
# current below idle → weight should be 0
est = _make_estimator()
for _ in range(15):
est.update(0.1, tilt_rad=0.0)
wt, _ = est.update(0.1, tilt_rad=0.0)
assert wt == pytest.approx(0.0)
def test_weight_changed_flag_on_first_estimate(self):
est = _make_estimator()
# Find the first non-None estimate — it must have changed=True
for _ in range(15):
wt, changed = est.update(0.8, tilt_rad=0.0)
if wt is not None:
assert changed is True
break
def test_weight_unchanged_below_threshold(self):
est = _make_estimator(change_threshold_kg=0.5)
for _ in range(15):
est.update(0.8, tilt_rad=0.0)
# Now feed very similar current (tiny perturbation)
for _ in range(15):
wt, changed = est.update(0.81, tilt_rad=0.0)
# delta is ~0.03 kg << threshold 0.5
assert changed is False
def test_reset_clears_samples_and_estimate(self):
est = _make_estimator()
for _ in range(15):
est.update(0.8, tilt_rad=0.0)
est.reset()
assert est.last_estimate is None
result, _ = est.update(0.8, tilt_rad=0.0)
assert result is None # need to re-fill window
# ─── CalibrationRoutine ───────────────────────────────────────────────────────
def _make_cal(**kwargs) -> CalibrationRoutine:
defaults = dict(
amplitude_rad=0.05, frequency_hz=1.0,
rock_cycles=2, settle_s=1.0,
settle_tilt_threshold_rad=0.03,
idle_current_a=0.5, current_per_kg_a=0.3,
)
defaults.update(kwargs)
return CalibrationRoutine(**defaults)
class TestCalibrationRoutine:
def test_initial_state_idle(self):
cal = _make_cal()
assert cal.state == CalibrationState.IDLE
assert cal.is_active is False
def test_start_transitions_to_rocking(self):
cal = _make_cal()
assert cal.start() is True
assert cal.state == CalibrationState.ROCKING
assert cal.is_active is True
def test_start_returns_false_when_active(self):
cal = _make_cal()
cal.start()
assert cal.start() is False
def test_start_succeeds_from_done(self):
cal = _make_cal(rock_cycles=1, settle_s=0.01)
cal.start()
dt = 0.01
for _ in range(200):
cal.step(0.8, tilt_rad=0.0, dt=dt)
# Should be DONE or FAILED at this point
assert cal.state in (CalibrationState.DONE, CalibrationState.FAILED)
assert cal.start() is True # can restart
def test_abort_returns_to_idle(self):
cal = _make_cal()
cal.start()
cal.abort()
assert cal.state == CalibrationState.IDLE
assert cal.is_active is False
def test_rocking_output_sinusoidal(self):
# freq=1Hz, cycles=2 → duration=2s; use 110 steps at dt=0.01 to cover >0.5 period
cal = _make_cal(amplitude_rad=0.1, frequency_hz=1.0)
cal.start()
outputs = [cal.step(0.5, 0.0, 0.01) for _ in range(110)]
# Sinusoidal over >1 s: must cross zero and include both signs
assert any(o > 0.0 for o in outputs)
assert any(o < 0.0 for o in outputs)
def test_rocking_output_bounded_by_amplitude(self):
cal = _make_cal(amplitude_rad=0.05)
cal.start()
for _ in range(200):
out = cal.step(0.5, 0.0, 0.005)
assert abs(out) <= 0.05 + 1e-9
def test_idle_output_is_zero(self):
cal = _make_cal()
assert cal.step(0.5, 0.0, 0.01) == pytest.approx(0.0)
def test_rocking_transitions_to_settling(self):
# rock_cycles=2, freq=1Hz → rock_duration=2s; dt=0.1 → 20 steps
cal = _make_cal(rock_cycles=2, frequency_hz=1.0)
cal.start()
for _ in range(22):
cal.step(0.5, 0.0, 0.1)
assert cal.state == CalibrationState.SETTLING
def test_settling_output_is_zero(self):
cal = _make_cal(rock_cycles=1, frequency_hz=2.0)
cal.start()
# Skip through rocking
for _ in range(15):
cal.step(0.5, 0.0, 0.1)
# Now in settling
assert cal.state == CalibrationState.SETTLING
out = cal.step(0.5, 0.0, 0.1)
assert out == pytest.approx(0.0)
def test_done_with_enough_settled_samples(self):
# freq=2Hz, cycles=1 → rock_duration=0.5s; dt=0.05 → 10 steps
# settle_s=0.5s → 10 more steps
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
cal.start()
for _ in range(100):
cal.step(0.8, tilt_rad=0.0, dt=0.05)
assert cal.state == CalibrationState.DONE
assert cal.weight_estimate is not None
assert cal.weight_estimate >= 0.0
def test_weight_estimate_known_current(self):
# 0.8A, idle=0.5, per_kg=0.3 → 1.0 kg
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
cal.start()
for _ in range(100):
cal.step(0.8, tilt_rad=0.0, dt=0.05)
assert cal.state == CalibrationState.DONE
assert cal.weight_estimate == pytest.approx(1.0, abs=0.1)
def test_failed_when_no_settled_samples(self):
# Robot never settles (large tilt)
cal = _make_cal(
rock_cycles=1, frequency_hz=2.0, settle_s=0.5,
settle_tilt_threshold_rad=0.01
)
cal.start()
for _ in range(100):
cal.step(0.8, tilt_rad=0.1, dt=0.05) # tilt > threshold → no samples
assert cal.state == CalibrationState.FAILED
assert cal.weight_estimate is None
def test_rocking_duration(self):
"""Rocking lasts exactly rock_cycles / frequency_hz seconds."""
freq = 1.0
cycles = 3
cal = _make_cal(rock_cycles=cycles, frequency_hz=freq, settle_s=100.0)
cal.start()
rock_dur = cycles / freq # 3 s
dt = 0.01
t = 0.0
while cal.state == CalibrationState.ROCKING and t < rock_dur + 0.1:
cal.step(0.5, 0.0, dt)
t += dt
assert t == pytest.approx(rock_dur, abs=0.02)
def test_weight_clamped_to_zero_below_idle(self):
# Current below idle → should clamp to 0, not negative
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
cal.start()
for _ in range(100):
cal.step(0.2, tilt_rad=0.0, dt=0.05) # 0.2 < 0.5 idle
if cal.state == CalibrationState.DONE:
assert cal.weight_estimate == pytest.approx(0.0)
# ─── Integration ─────────────────────────────────────────────────────────────
class TestIntegration:
def test_pid_with_gain_scheduler_round_trip(self):
"""GainScheduler feeds gains into PID; both operate consistently."""
sched = _make_scheduler()
pid = PIDController(kp=10.0, ki=0.5, kd=1.0,
output_min=-50.0, output_max=50.0)
# Simulate 1 s at 100 Hz, weight=0.2 kg (empty class)
dt = 0.01
total_effort = 0.0
for _ in range(100):
gains, _, _ = sched.update(0.2)
pid.update_gains(gains.kp, gains.ki, gains.kd)
error = 0.1 # constant tilt error
total_effort += pid.step(error, dt)
assert total_effort > 0.0 # robot should be correcting
def test_instability_reverts_pid_gains(self):
sched = _make_scheduler()
det = InstabilityDetector(tilt_threshold_rad=0.3, window_size=10)
pid = PIDController(kp=10.0, ki=0.0, kd=0.0)
# Inject large oscillations
tilts = [0.5, -0.5] * 10
for t in tilts:
det.update(t)
if det.update(t):
sched.revert_to_defaults(0.0)
gains = sched.active_gains
pid.update_gains(gains.kp, gains.ki, gains.kd)
pid.reset()
break
assert sched.active_class == "empty"
assert pid.integral == pytest.approx(0.0)
def test_calibration_updates_weight_estimator_chain(self):
"""Full: calibration → weight estimate → gain scheduling."""
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
sched = _make_scheduler()
# Run calibration
cal.start()
for _ in range(100):
cal.step(0.8, tilt_rad=0.0, dt=0.05)
assert cal.state == CalibrationState.DONE
wt = cal.weight_estimate
assert wt is not None
# Feed weight into scheduler
for _ in range(5):
gains, cls, _ = sched.update(wt)
# ~1 kg → "light" class
assert cls == "light"