feat(controls): adaptive PID balance controller with gain scheduling (Issue #136)
Pure modules (no ROS2 dep, fully unit-tested):
- pid_controller.py:
GainSet — (kp,ki,kd) with safety clamp helper
PIDController — anti-windup integral, D-on-error, output clamping
GainScheduler — 3-class weight table (empty/light/heavy), exponential
gain blending (alpha per tick), safety bounds clamping, manual
override, immediate revert-to-defaults on instability
InstabilityDetector — dual criteria: tilt threshold (>50% of window)
+ sign-reversal count (oscillation)
- weight_estimator.py:
WeightEstimator — rolling-window current→weight, steady-state gating
(|tilt|≤threshold), change detection (threshold_kg)
CalibrationRoutine — IDLE→ROCKING→SETTLING→DONE/FAILED state machine;
sinusoidal rocking output, settling current sampling, weight estimate
from avg current; abort() / restart supported
- adaptive_pid_node.py: 100 Hz ROS2 node
Sub: /saltybot/imu (Imu, pitch from quaternion), /saltybot/motor_current
Pub: /saltybot/balance_effort (Float32), /saltybot/weight_estimate,
/saltybot/pid_state (JSON: gains, class, weight_kg, flags)
Srv: /saltybot/calibrate_balance (std_srvs/Trigger)
IMU watchdog (0.5 s), dynamic reconfigure via override_enabled param,
instability → revert + PID reset, structured INFO/WARN logging
- config/adaptive_pid_params.yaml, launch/adaptive_pid.launch.py,
package.xml, setup.py, setup.cfg
- test/test_adaptive_pid.py: 68/68 unit tests passing
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
9281f3bc44
commit
4d2b008c48
@ -0,0 +1,66 @@
|
|||||||
|
adaptive_pid:
|
||||||
|
ros__parameters:
|
||||||
|
# Control loop
|
||||||
|
control_rate: 100.0 # Hz
|
||||||
|
balance_setpoint_rad: 0.0 # target tilt (rad); 0 = upright
|
||||||
|
|
||||||
|
# PID gains per weight class
|
||||||
|
# Tune: heavier loads need more proportional gain to overcome inertia
|
||||||
|
kp_empty: 15.0
|
||||||
|
ki_empty: 0.5
|
||||||
|
kd_empty: 1.5
|
||||||
|
|
||||||
|
kp_light: 18.0
|
||||||
|
ki_light: 0.6
|
||||||
|
kd_light: 2.0
|
||||||
|
|
||||||
|
kp_heavy: 22.0
|
||||||
|
ki_heavy: 0.8
|
||||||
|
kd_heavy: 2.5
|
||||||
|
|
||||||
|
# Safety bounds — ALL gains are hard-clamped to these regardless of source
|
||||||
|
kp_min: 5.0
|
||||||
|
kp_max: 40.0
|
||||||
|
ki_min: 0.0
|
||||||
|
ki_max: 5.0
|
||||||
|
kd_min: 0.0
|
||||||
|
kd_max: 10.0
|
||||||
|
output_min: -50.0 # balance effort clamp
|
||||||
|
output_max: 50.0
|
||||||
|
i_clamp: 10.0 # anti-windup integrator clamp
|
||||||
|
|
||||||
|
# Weight classification thresholds (kg)
|
||||||
|
weight_empty_max_kg: 0.5 # ≤ 0.5 kg → empty
|
||||||
|
weight_light_max_kg: 2.0 # ≤ 2.0 kg → light; > 2.0 → heavy
|
||||||
|
|
||||||
|
# Weight estimation from motor current
|
||||||
|
idle_current_a: 0.5 # motor current at rest with no payload (A)
|
||||||
|
current_per_kg_a: 0.3 # current increase per kg payload (A/kg)
|
||||||
|
weight_window_s: 3.0 # rolling-average window (s)
|
||||||
|
weight_change_threshold_kg: 0.3 # triggers gain reschedule when exceeded
|
||||||
|
steady_tilt_threshold_rad: 0.05 # max tilt to accept current sample
|
||||||
|
|
||||||
|
# Gain blending
|
||||||
|
gain_blend_alpha: 0.05 # convergence rate per tick (0=no change, 1=instant)
|
||||||
|
# at 100 Hz: ~0.4 s to 50 % of target, ~0.9 s to 90 %
|
||||||
|
|
||||||
|
# Instability detection
|
||||||
|
instability_tilt_threshold_rad: 0.4 # |tilt| exceeds this → suspect
|
||||||
|
instability_oscillation_count: 6 # sign reversals in window → unstable
|
||||||
|
instability_window_size: 100 # samples (~1 s at 100 Hz)
|
||||||
|
|
||||||
|
# IMU watchdog
|
||||||
|
imu_timeout_s: 0.5 # zero effort if IMU goes silent
|
||||||
|
|
||||||
|
# Dynamic reconfigure / manual override
|
||||||
|
# Set override_enabled=true + override_kp/ki/kd to bypass scheduling
|
||||||
|
override_enabled: false
|
||||||
|
override_kp: 15.0
|
||||||
|
override_ki: 0.5
|
||||||
|
override_kd: 1.5
|
||||||
|
|
||||||
|
# Calibration routine (rock-and-settle)
|
||||||
|
calibration_amplitude_rad: 0.05 # rocking amplitude
|
||||||
|
calibration_frequency_hz: 0.5 # rocking frequency
|
||||||
|
calibration_cycles: 3 # number of rock cycles (duration = cycles/freq)
|
||||||
|
calibration_settle_s: 2.0 # settling window after rocking
|
||||||
@ -0,0 +1,54 @@
|
|||||||
|
"""
|
||||||
|
adaptive_pid.launch.py — Launch the adaptive balance PID controller.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_adaptive_pid adaptive_pid.launch.py
|
||||||
|
ros2 launch saltybot_adaptive_pid adaptive_pid.launch.py balance_setpoint_rad:=0.02
|
||||||
|
|
||||||
|
Dynamic reconfigure (no restart needed):
|
||||||
|
ros2 param set /adaptive_pid gain_blend_alpha 0.1
|
||||||
|
ros2 param set /adaptive_pid override_enabled true
|
||||||
|
ros2 param set /adaptive_pid override_kp 12.0
|
||||||
|
ros2 param set /adaptive_pid override_ki 0.4
|
||||||
|
ros2 param set /adaptive_pid override_kd 1.2
|
||||||
|
ros2 param set /adaptive_pid override_enabled false
|
||||||
|
|
||||||
|
Calibration:
|
||||||
|
ros2 service call /saltybot/calibrate_balance std_srvs/srv/Trigger
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg_share = get_package_share_directory("saltybot_adaptive_pid")
|
||||||
|
params_file = os.path.join(pkg_share, "config", "adaptive_pid_params.yaml")
|
||||||
|
|
||||||
|
setpoint_arg = DeclareLaunchArgument(
|
||||||
|
"balance_setpoint_rad",
|
||||||
|
default_value="0.0",
|
||||||
|
description="Target tilt angle in rad (0 = upright)",
|
||||||
|
)
|
||||||
|
|
||||||
|
adaptive_pid = Node(
|
||||||
|
package="saltybot_adaptive_pid",
|
||||||
|
executable="adaptive_pid_node",
|
||||||
|
name="adaptive_pid",
|
||||||
|
parameters=[
|
||||||
|
params_file,
|
||||||
|
{"balance_setpoint_rad": LaunchConfiguration("balance_setpoint_rad")},
|
||||||
|
],
|
||||||
|
output="screen",
|
||||||
|
emulate_tty=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
setpoint_arg,
|
||||||
|
adaptive_pid,
|
||||||
|
])
|
||||||
23
jetson/ros2_ws/src/saltybot_adaptive_pid/package.xml
Normal file
23
jetson/ros2_ws/src/saltybot_adaptive_pid/package.xml
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_adaptive_pid</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Adaptive balance PID with weight estimation and gain scheduling (Issue #136)</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>std_srvs</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,459 @@
|
|||||||
|
"""
|
||||||
|
adaptive_pid_node.py — Adaptive balance PID controller (Issue #136).
|
||||||
|
|
||||||
|
Overview
|
||||||
|
────────
|
||||||
|
Subscribes to IMU and motor-current topics, runs a PID balance loop at
|
||||||
|
`control_rate` Hz, and automatically adjusts gains based on estimated
|
||||||
|
payload weight.
|
||||||
|
|
||||||
|
Weight estimation
|
||||||
|
─────────────────
|
||||||
|
Motor current at steady state (low tilt, no cmd_vel) is sampled into a
|
||||||
|
rolling window. When enough samples accumulate:
|
||||||
|
|
||||||
|
weight_kg = max(0, (I_avg − I_idle) / k_per_kg)
|
||||||
|
|
||||||
|
If the estimate shifts by ≥ weight_change_threshold_kg, gains are
|
||||||
|
re-scheduled immediately (subject to blending).
|
||||||
|
|
||||||
|
Gain scheduling
|
||||||
|
───────────────
|
||||||
|
Three weight classes (empty / light / heavy) each have a (kp, ki, kd)
|
||||||
|
table entry. Active gains blend exponentially toward the target class at
|
||||||
|
rate gain_blend_alpha per tick.
|
||||||
|
|
||||||
|
Safety
|
||||||
|
──────
|
||||||
|
• All gains clamped to [kp_min..kp_max], [ki_min..ki_max], [kd_min..kd_max]
|
||||||
|
• InstabilityDetector monitors tilt amplitude and sign reversals; on trigger:
|
||||||
|
– gains reverted to table defaults (no blending)
|
||||||
|
– PID integrator reset
|
||||||
|
– WARN logged
|
||||||
|
• Watchdog: if /saltybot/imu goes silent > imu_timeout_s → effort = 0
|
||||||
|
|
||||||
|
Dynamic reconfigure
|
||||||
|
───────────────────
|
||||||
|
Set `override_enabled=true` plus override_kp / override_ki / override_kd
|
||||||
|
via `ros2 param set` to bypass scheduling. Set `override_enabled=false`
|
||||||
|
to return to auto-scheduling.
|
||||||
|
|
||||||
|
Calibration service
|
||||||
|
───────────────────
|
||||||
|
Call `/saltybot/calibrate_balance` (std_srvs/Trigger) to start the
|
||||||
|
calibration routine. The robot rocks gently for `calibration_cycles /
|
||||||
|
calibration_frequency_hz` seconds, then settles. The weight estimate is
|
||||||
|
updated and logged on completion.
|
||||||
|
|
||||||
|
Subscribes
|
||||||
|
──────────
|
||||||
|
/saltybot/imu sensor_msgs/Imu — tilt angle (pitch from quaternion)
|
||||||
|
/saltybot/motor_current std_msgs/Float32 — motor current (A)
|
||||||
|
|
||||||
|
Publishes
|
||||||
|
─────────
|
||||||
|
/saltybot/balance_effort std_msgs/Float32 — PID output (balance correction)
|
||||||
|
/saltybot/weight_estimate std_msgs/Float32 — current weight estimate (kg)
|
||||||
|
/saltybot/pid_state std_msgs/String — JSON: gains, class, mode, flags
|
||||||
|
|
||||||
|
Services
|
||||||
|
────────
|
||||||
|
/saltybot/calibrate_balance std_srvs/Trigger
|
||||||
|
|
||||||
|
Parameters (see config/adaptive_pid_params.yaml for defaults)
|
||||||
|
──────────────────────────────────────────────────────────────
|
||||||
|
control_rate, balance_setpoint_rad,
|
||||||
|
kp_empty/light/heavy, ki_*, kd_*,
|
||||||
|
kp_min/max, ki_min/max, kd_min/max,
|
||||||
|
weight_empty_max_kg, weight_light_max_kg,
|
||||||
|
idle_current_a, current_per_kg_a,
|
||||||
|
weight_window_s, weight_change_threshold_kg,
|
||||||
|
steady_tilt_threshold_rad, gain_blend_alpha,
|
||||||
|
instability_tilt_threshold_rad, instability_oscillation_count,
|
||||||
|
instability_window_size, imu_timeout_s,
|
||||||
|
override_enabled, override_kp, override_ki, override_kd,
|
||||||
|
calibration_amplitude_rad, calibration_frequency_hz,
|
||||||
|
calibration_cycles, calibration_settle_s,
|
||||||
|
output_min, output_max, i_clamp
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||||
|
|
||||||
|
from sensor_msgs.msg import Imu
|
||||||
|
from std_msgs.msg import Float32, String
|
||||||
|
from std_srvs.srv import Trigger
|
||||||
|
|
||||||
|
from saltybot_adaptive_pid.pid_controller import (
|
||||||
|
GainScheduler,
|
||||||
|
GainSet,
|
||||||
|
InstabilityDetector,
|
||||||
|
PIDController,
|
||||||
|
)
|
||||||
|
from saltybot_adaptive_pid.weight_estimator import (
|
||||||
|
CalibrationRoutine,
|
||||||
|
CalibrationState,
|
||||||
|
WeightEstimator,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _quaternion_to_pitch(qx: float, qy: float, qz: float, qw: float) -> float:
|
||||||
|
"""Extract pitch (rotation about Y) from a unit quaternion."""
|
||||||
|
sinp = 2.0 * (qw * qy - qz * qx)
|
||||||
|
return math.asin(max(-1.0, min(1.0, sinp)))
|
||||||
|
|
||||||
|
|
||||||
|
class AdaptivePIDNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("adaptive_pid")
|
||||||
|
|
||||||
|
# ── Parameters ───────────────────────────────────────────────────────
|
||||||
|
self._declare_all_parameters()
|
||||||
|
p = self._load_params()
|
||||||
|
|
||||||
|
# ── Sub-systems ──────────────────────────────────────────────────────
|
||||||
|
self._pid = PIDController(
|
||||||
|
kp=p["kp_empty"], ki=p["ki_empty"], kd=p["kd_empty"],
|
||||||
|
output_min=p["output_min"], output_max=p["output_max"],
|
||||||
|
i_clamp=p["i_clamp"],
|
||||||
|
)
|
||||||
|
|
||||||
|
self._scheduler = GainScheduler(
|
||||||
|
gain_table={
|
||||||
|
"empty": GainSet(p["kp_empty"], p["ki_empty"], p["kd_empty"]),
|
||||||
|
"light": GainSet(p["kp_light"], p["ki_light"], p["kd_light"]),
|
||||||
|
"heavy": GainSet(p["kp_heavy"], p["ki_heavy"], p["kd_heavy"]),
|
||||||
|
},
|
||||||
|
weight_thresholds={
|
||||||
|
"empty_max": p["weight_empty_max_kg"],
|
||||||
|
"light_max": p["weight_light_max_kg"],
|
||||||
|
},
|
||||||
|
blend_alpha=p["gain_blend_alpha"],
|
||||||
|
kp_min=p["kp_min"], kp_max=p["kp_max"],
|
||||||
|
ki_min=p["ki_min"], ki_max=p["ki_max"],
|
||||||
|
kd_min=p["kd_min"], kd_max=p["kd_max"],
|
||||||
|
)
|
||||||
|
|
||||||
|
self._weight_estimator = WeightEstimator(
|
||||||
|
idle_current_a=p["idle_current_a"],
|
||||||
|
current_per_kg_a=p["current_per_kg_a"],
|
||||||
|
window_s=p["weight_window_s"],
|
||||||
|
control_rate_hz=p["control_rate"],
|
||||||
|
steady_tilt_threshold_rad=p["steady_tilt_threshold_rad"],
|
||||||
|
change_threshold_kg=p["weight_change_threshold_kg"],
|
||||||
|
)
|
||||||
|
|
||||||
|
self._instability_detector = InstabilityDetector(
|
||||||
|
tilt_threshold_rad=p["instability_tilt_threshold_rad"],
|
||||||
|
oscillation_count=p["instability_oscillation_count"],
|
||||||
|
window_size=p["instability_window_size"],
|
||||||
|
)
|
||||||
|
|
||||||
|
self._calibration = CalibrationRoutine(
|
||||||
|
amplitude_rad=p["calibration_amplitude_rad"],
|
||||||
|
frequency_hz=p["calibration_frequency_hz"],
|
||||||
|
rock_cycles=p["calibration_cycles"],
|
||||||
|
settle_s=p["calibration_settle_s"],
|
||||||
|
settle_tilt_threshold_rad=p["steady_tilt_threshold_rad"],
|
||||||
|
idle_current_a=p["idle_current_a"],
|
||||||
|
current_per_kg_a=p["current_per_kg_a"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── State ────────────────────────────────────────────────────────────
|
||||||
|
self._tilt_rad: float = 0.0
|
||||||
|
self._current_a: float = 0.0
|
||||||
|
self._last_imu_t: float = 0.0
|
||||||
|
self._weight_kg: float = 0.0
|
||||||
|
self._unstable: bool = False
|
||||||
|
self._last_ctrl_t: float = time.monotonic()
|
||||||
|
|
||||||
|
# ── QoS ──────────────────────────────────────────────────────────────
|
||||||
|
reliable = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
best_effort = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscriptions ────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort)
|
||||||
|
self.create_subscription(Float32, "/saltybot/motor_current", self._current_cb, reliable)
|
||||||
|
|
||||||
|
# ── Publishers ───────────────────────────────────────────────────────
|
||||||
|
self._effort_pub = self.create_publisher(Float32, "/saltybot/balance_effort", reliable)
|
||||||
|
self._weight_pub = self.create_publisher(Float32, "/saltybot/weight_estimate", reliable)
|
||||||
|
self._state_pub = self.create_publisher(String, "/saltybot/pid_state", reliable)
|
||||||
|
|
||||||
|
# ── Service ──────────────────────────────────────────────────────────
|
||||||
|
self.create_service(Trigger, "/saltybot/calibrate_balance", self._calibrate_srv)
|
||||||
|
|
||||||
|
# ── Control timer ────────────────────────────────────────────────────
|
||||||
|
rate = p["control_rate"]
|
||||||
|
self._ctrl_timer = self.create_timer(1.0 / rate, self._control_cb)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"AdaptivePIDNode ready "
|
||||||
|
f"rate={rate}Hz "
|
||||||
|
f"kp_empty={p['kp_empty']} ki_empty={p['ki_empty']} kd_empty={p['kd_empty']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Parameter declaration / load ─────────────────────────────────────────
|
||||||
|
|
||||||
|
def _declare_all_parameters(self) -> None:
|
||||||
|
self.declare_parameter("control_rate", 100.0)
|
||||||
|
self.declare_parameter("balance_setpoint_rad", 0.0)
|
||||||
|
# Gains per class
|
||||||
|
self.declare_parameter("kp_empty", 15.0)
|
||||||
|
self.declare_parameter("ki_empty", 0.5)
|
||||||
|
self.declare_parameter("kd_empty", 1.5)
|
||||||
|
self.declare_parameter("kp_light", 18.0)
|
||||||
|
self.declare_parameter("ki_light", 0.6)
|
||||||
|
self.declare_parameter("kd_light", 2.0)
|
||||||
|
self.declare_parameter("kp_heavy", 22.0)
|
||||||
|
self.declare_parameter("ki_heavy", 0.8)
|
||||||
|
self.declare_parameter("kd_heavy", 2.5)
|
||||||
|
# Safety bounds
|
||||||
|
self.declare_parameter("kp_min", 5.0)
|
||||||
|
self.declare_parameter("kp_max", 40.0)
|
||||||
|
self.declare_parameter("ki_min", 0.0)
|
||||||
|
self.declare_parameter("ki_max", 5.0)
|
||||||
|
self.declare_parameter("kd_min", 0.0)
|
||||||
|
self.declare_parameter("kd_max", 10.0)
|
||||||
|
self.declare_parameter("output_min", -50.0)
|
||||||
|
self.declare_parameter("output_max", 50.0)
|
||||||
|
self.declare_parameter("i_clamp", 10.0)
|
||||||
|
# Weight classification
|
||||||
|
self.declare_parameter("weight_empty_max_kg", 0.5)
|
||||||
|
self.declare_parameter("weight_light_max_kg", 2.0)
|
||||||
|
# Weight estimation
|
||||||
|
self.declare_parameter("idle_current_a", 0.5)
|
||||||
|
self.declare_parameter("current_per_kg_a", 0.3)
|
||||||
|
self.declare_parameter("weight_window_s", 3.0)
|
||||||
|
self.declare_parameter("weight_change_threshold_kg", 0.3)
|
||||||
|
self.declare_parameter("steady_tilt_threshold_rad", 0.05)
|
||||||
|
# Gain blending
|
||||||
|
self.declare_parameter("gain_blend_alpha", 0.05)
|
||||||
|
# Instability detection
|
||||||
|
self.declare_parameter("instability_tilt_threshold_rad", 0.4)
|
||||||
|
self.declare_parameter("instability_oscillation_count", 6)
|
||||||
|
self.declare_parameter("instability_window_size", 100)
|
||||||
|
# Watchdog
|
||||||
|
self.declare_parameter("imu_timeout_s", 0.5)
|
||||||
|
# Dynamic reconfigure / manual override
|
||||||
|
self.declare_parameter("override_enabled", False)
|
||||||
|
self.declare_parameter("override_kp", 15.0)
|
||||||
|
self.declare_parameter("override_ki", 0.5)
|
||||||
|
self.declare_parameter("override_kd", 1.5)
|
||||||
|
# Calibration
|
||||||
|
self.declare_parameter("calibration_amplitude_rad", 0.05)
|
||||||
|
self.declare_parameter("calibration_frequency_hz", 0.5)
|
||||||
|
self.declare_parameter("calibration_cycles", 3)
|
||||||
|
self.declare_parameter("calibration_settle_s", 2.0)
|
||||||
|
|
||||||
|
def _load_params(self) -> dict:
|
||||||
|
g = self.get_parameter
|
||||||
|
return {k: g(k).value for k in [
|
||||||
|
"control_rate", "balance_setpoint_rad",
|
||||||
|
"kp_empty", "ki_empty", "kd_empty",
|
||||||
|
"kp_light", "ki_light", "kd_light",
|
||||||
|
"kp_heavy", "ki_heavy", "kd_heavy",
|
||||||
|
"kp_min", "kp_max", "ki_min", "ki_max", "kd_min", "kd_max",
|
||||||
|
"output_min", "output_max", "i_clamp",
|
||||||
|
"weight_empty_max_kg", "weight_light_max_kg",
|
||||||
|
"idle_current_a", "current_per_kg_a",
|
||||||
|
"weight_window_s", "weight_change_threshold_kg",
|
||||||
|
"steady_tilt_threshold_rad", "gain_blend_alpha",
|
||||||
|
"instability_tilt_threshold_rad", "instability_oscillation_count",
|
||||||
|
"instability_window_size", "imu_timeout_s",
|
||||||
|
"override_enabled", "override_kp", "override_ki", "override_kd",
|
||||||
|
"calibration_amplitude_rad", "calibration_frequency_hz",
|
||||||
|
"calibration_cycles", "calibration_settle_s",
|
||||||
|
]}
|
||||||
|
|
||||||
|
# ── Callbacks ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _imu_cb(self, msg: Imu) -> None:
|
||||||
|
q = msg.orientation
|
||||||
|
self._tilt_rad = _quaternion_to_pitch(q.x, q.y, q.z, q.w)
|
||||||
|
self._last_imu_t = time.monotonic()
|
||||||
|
|
||||||
|
def _current_cb(self, msg: Float32) -> None:
|
||||||
|
self._current_a = msg.data
|
||||||
|
|
||||||
|
def _calibrate_srv(self, request, response) -> Trigger.Response:
|
||||||
|
if self._calibration.is_active:
|
||||||
|
response.success = False
|
||||||
|
response.message = "Calibration already in progress"
|
||||||
|
else:
|
||||||
|
self._calibration.start()
|
||||||
|
self._pid.reset()
|
||||||
|
self._weight_estimator.reset()
|
||||||
|
response.success = True
|
||||||
|
response.message = "Calibration started"
|
||||||
|
self.get_logger().info("Calibration started")
|
||||||
|
return response
|
||||||
|
|
||||||
|
# ── 100 Hz control loop ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _control_cb(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
dt = now - self._last_ctrl_t
|
||||||
|
self._last_ctrl_t = now
|
||||||
|
|
||||||
|
p = self._load_params()
|
||||||
|
|
||||||
|
# ── IMU watchdog ─────────────────────────────────────────────────────
|
||||||
|
imu_age = (now - self._last_imu_t) if self._last_imu_t > 0.0 else 1e9
|
||||||
|
if imu_age > p["imu_timeout_s"]:
|
||||||
|
self._publish_effort(0.0, p, reason="imu_timeout")
|
||||||
|
return
|
||||||
|
|
||||||
|
tilt = self._tilt_rad
|
||||||
|
current = self._current_a
|
||||||
|
|
||||||
|
# ── Calibration ──────────────────────────────────────────────────────
|
||||||
|
cal_offset = self._calibration.step(current, tilt, dt)
|
||||||
|
if self._calibration.state == CalibrationState.DONE and \
|
||||||
|
self._calibration.weight_estimate is not None:
|
||||||
|
cal_wt = self._calibration.weight_estimate
|
||||||
|
self._weight_kg = cal_wt
|
||||||
|
self._weight_estimator.reset()
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Calibration done: weight={cal_wt:.2f} kg "
|
||||||
|
f"class={self._scheduler.classify(cal_wt)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._calibration.state == CalibrationState.FAILED:
|
||||||
|
self.get_logger().warn("Calibration failed: insufficient settled samples")
|
||||||
|
|
||||||
|
# ── Dynamic reconfigure / override ────────────────────────────────────
|
||||||
|
if p["override_enabled"]:
|
||||||
|
if not self._scheduler.override_active:
|
||||||
|
self._scheduler.override_gains(
|
||||||
|
p["override_kp"], p["override_ki"], p["override_kd"]
|
||||||
|
)
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Manual override: kp={p['override_kp']} "
|
||||||
|
f"ki={p['override_ki']} kd={p['override_kd']}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
if self._scheduler.override_active:
|
||||||
|
self._scheduler.clear_override()
|
||||||
|
self.get_logger().info("Manual override cleared, resuming scheduling")
|
||||||
|
|
||||||
|
# ── Weight estimation ─────────────────────────────────────────────────
|
||||||
|
if not self._calibration.is_active:
|
||||||
|
weight_est, weight_changed = self._weight_estimator.update(current, tilt)
|
||||||
|
if weight_est is not None:
|
||||||
|
if weight_changed:
|
||||||
|
old_class = self._scheduler.active_class
|
||||||
|
self._weight_kg = weight_est
|
||||||
|
new_class = self._scheduler.classify(weight_est)
|
||||||
|
if old_class != new_class:
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Weight changed: {weight_est:.2f} kg "
|
||||||
|
f"({old_class} → {new_class})"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Weight updated: {weight_est:.2f} kg class={new_class}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Gain scheduling ──────────────────────────────────────────────────
|
||||||
|
gains, wt_class, class_changed = self._scheduler.update(self._weight_kg)
|
||||||
|
|
||||||
|
if class_changed:
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Gains rescheduled → class={wt_class} "
|
||||||
|
f"kp={gains.kp:.3f} ki={gains.ki:.3f} kd={gains.kd:.3f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
self._pid.update_gains(gains.kp, gains.ki, gains.kd)
|
||||||
|
|
||||||
|
# ── Instability detection ─────────────────────────────────────────────
|
||||||
|
unstable = self._instability_detector.update(tilt)
|
||||||
|
if unstable and not self._unstable:
|
||||||
|
self._unstable = True
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Instability detected (tilt={tilt:.3f} rad) — "
|
||||||
|
f"reverting gains to defaults"
|
||||||
|
)
|
||||||
|
self._scheduler.revert_to_defaults(self._weight_kg)
|
||||||
|
reverted = self._scheduler.active_gains
|
||||||
|
self._pid.update_gains(reverted.kp, reverted.ki, reverted.kd)
|
||||||
|
self._pid.reset()
|
||||||
|
elif not unstable:
|
||||||
|
self._unstable = False
|
||||||
|
|
||||||
|
# ── PID step ─────────────────────────────────────────────────────────
|
||||||
|
setpoint = p["balance_setpoint_rad"] + cal_offset
|
||||||
|
error = setpoint - tilt
|
||||||
|
effort = self._pid.step(error, dt)
|
||||||
|
|
||||||
|
self._publish_effort(effort, p)
|
||||||
|
|
||||||
|
# ── Weight + state publish ────────────────────────────────────────────
|
||||||
|
wt_msg = Float32()
|
||||||
|
wt_msg.data = float(self._weight_kg)
|
||||||
|
self._weight_pub.publish(wt_msg)
|
||||||
|
|
||||||
|
state_msg = String()
|
||||||
|
state_msg.data = json.dumps({
|
||||||
|
"kp": round(gains.kp, 4),
|
||||||
|
"ki": round(gains.ki, 4),
|
||||||
|
"kd": round(gains.kd, 4),
|
||||||
|
"weight_class": wt_class,
|
||||||
|
"weight_kg": round(self._weight_kg, 3),
|
||||||
|
"override_active": self._scheduler.override_active,
|
||||||
|
"unstable": self._unstable,
|
||||||
|
"calibrating": self._calibration.is_active,
|
||||||
|
"cal_state": self._calibration.state.value,
|
||||||
|
"tilt_rad": round(tilt, 4),
|
||||||
|
"effort": round(effort, 4),
|
||||||
|
})
|
||||||
|
self._state_pub.publish(state_msg)
|
||||||
|
|
||||||
|
def _publish_effort(self, effort: float, p: dict, reason: str = "") -> None:
|
||||||
|
msg = Float32()
|
||||||
|
msg.data = float(effort)
|
||||||
|
self._effort_pub.publish(msg)
|
||||||
|
|
||||||
|
# ── Lifecycle ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
# Zero effort on shutdown
|
||||||
|
try:
|
||||||
|
msg = Float32()
|
||||||
|
msg.data = 0.0
|
||||||
|
self._effort_pub.publish(msg)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = AdaptivePIDNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,333 @@
|
|||||||
|
"""
|
||||||
|
pid_controller.py — PID controller, gain scheduler, and instability detector
|
||||||
|
for the SaltyBot adaptive balance controller (Issue #136).
|
||||||
|
|
||||||
|
Pure module — no ROS2 dependency; fully unit-testable.
|
||||||
|
|
||||||
|
Classes
|
||||||
|
───────
|
||||||
|
GainSet — immutable (kp, ki, kd) triple with safety-clamp helper
|
||||||
|
PIDController — discrete PID with anti-windup and output clamping
|
||||||
|
GainScheduler — weight-class → gain table, exponential blending,
|
||||||
|
safety bounds, manual override, default revert
|
||||||
|
InstabilityDetector — monitors tilt for threshold violations and oscillations
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
# ── GainSet ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class GainSet:
|
||||||
|
kp: float
|
||||||
|
ki: float
|
||||||
|
kd: float
|
||||||
|
|
||||||
|
def clamp(
|
||||||
|
self,
|
||||||
|
kp_min: float, kp_max: float,
|
||||||
|
ki_min: float, ki_max: float,
|
||||||
|
kd_min: float, kd_max: float,
|
||||||
|
) -> "GainSet":
|
||||||
|
"""Return a new GainSet with all gains clamped to the given bounds."""
|
||||||
|
return GainSet(
|
||||||
|
kp=max(kp_min, min(kp_max, self.kp)),
|
||||||
|
ki=max(ki_min, min(ki_max, self.ki)),
|
||||||
|
kd=max(kd_min, min(kd_max, self.kd)),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── PIDController ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class PIDController:
|
||||||
|
"""
|
||||||
|
Discrete PID controller with:
|
||||||
|
- Anti-windup: integral clamped to ±i_clamp
|
||||||
|
- Output clamping: result clamped to [output_min, output_max]
|
||||||
|
- Derivative on error (D-on-error, not D-on-measurement)
|
||||||
|
- Zero derivative on first step (no derivative kick)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
kp: float,
|
||||||
|
ki: float,
|
||||||
|
kd: float,
|
||||||
|
output_min: float = -1e9,
|
||||||
|
output_max: float = 1e9,
|
||||||
|
i_clamp: float = 1e9,
|
||||||
|
):
|
||||||
|
self._kp = kp
|
||||||
|
self._ki = ki
|
||||||
|
self._kd = kd
|
||||||
|
self._output_min = output_min
|
||||||
|
self._output_max = output_max
|
||||||
|
self._i_clamp = abs(i_clamp)
|
||||||
|
|
||||||
|
self._integral: float = 0.0
|
||||||
|
self._prev_error: float | None = None
|
||||||
|
|
||||||
|
# ── Properties ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def kp(self) -> float:
|
||||||
|
return self._kp
|
||||||
|
|
||||||
|
@property
|
||||||
|
def ki(self) -> float:
|
||||||
|
return self._ki
|
||||||
|
|
||||||
|
@property
|
||||||
|
def kd(self) -> float:
|
||||||
|
return self._kd
|
||||||
|
|
||||||
|
@property
|
||||||
|
def integral(self) -> float:
|
||||||
|
return self._integral
|
||||||
|
|
||||||
|
# ── Mutators ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update_gains(self, kp: float, ki: float, kd: float) -> None:
|
||||||
|
"""Update gains without resetting integrator or derivative state."""
|
||||||
|
self._kp = kp
|
||||||
|
self._ki = ki
|
||||||
|
self._kd = kd
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Clear integrator and derivative memory (e.g., after mode change)."""
|
||||||
|
self._integral = 0.0
|
||||||
|
self._prev_error = None
|
||||||
|
|
||||||
|
# ── Step ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def step(self, error: float, dt: float) -> float:
|
||||||
|
"""
|
||||||
|
Compute one PID step.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
error : setpoint − measurement
|
||||||
|
dt : elapsed time since last step (s); must be > 0
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Control output clamped to [output_min, output_max].
|
||||||
|
Returns 0.0 if dt ≤ 0.
|
||||||
|
"""
|
||||||
|
if dt <= 0.0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Proportional
|
||||||
|
p = self._kp * error
|
||||||
|
|
||||||
|
# Integral with anti-windup
|
||||||
|
self._integral += error * dt
|
||||||
|
self._integral = max(-self._i_clamp, min(self._i_clamp, self._integral))
|
||||||
|
i = self._ki * self._integral
|
||||||
|
|
||||||
|
# Derivative (zero on first call to avoid derivative kick)
|
||||||
|
if self._prev_error is None:
|
||||||
|
d = 0.0
|
||||||
|
else:
|
||||||
|
d = self._kd * (error - self._prev_error) / dt
|
||||||
|
self._prev_error = error
|
||||||
|
|
||||||
|
output = p + i + d
|
||||||
|
return max(self._output_min, min(self._output_max, output))
|
||||||
|
|
||||||
|
|
||||||
|
# ── GainScheduler ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class GainScheduler:
|
||||||
|
"""
|
||||||
|
Maps estimated robot weight to PID gains via a 3-class lookup table.
|
||||||
|
Transitions between classes are blended exponentially so the active gains
|
||||||
|
track the target smoothly.
|
||||||
|
|
||||||
|
Weight classes (ascending):
|
||||||
|
"empty" — up to weight_thresholds["empty_max"] kg
|
||||||
|
"light" — up to weight_thresholds["light_max"] kg
|
||||||
|
"heavy" — above light_max
|
||||||
|
|
||||||
|
Safety clamping is applied to every blended gain before returning.
|
||||||
|
"""
|
||||||
|
|
||||||
|
CLASSES = ("empty", "light", "heavy")
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
gain_table: dict[str, GainSet],
|
||||||
|
weight_thresholds: dict[str, float],
|
||||||
|
blend_alpha: float = 0.05,
|
||||||
|
kp_min: float = 0.0, kp_max: float = 100.0,
|
||||||
|
ki_min: float = 0.0, ki_max: float = 20.0,
|
||||||
|
kd_min: float = 0.0, kd_max: float = 20.0,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
gain_table : {"empty": GainSet, "light": GainSet, "heavy": GainSet}
|
||||||
|
weight_thresholds : {"empty_max": float, "light_max": float}
|
||||||
|
blend_alpha : convergence rate ∈ (0, 1] per control tick
|
||||||
|
"""
|
||||||
|
self._table = gain_table
|
||||||
|
self._thresholds = weight_thresholds
|
||||||
|
self._alpha = max(1e-6, min(1.0, blend_alpha))
|
||||||
|
self._bounds = {
|
||||||
|
"kp_min": kp_min, "kp_max": kp_max,
|
||||||
|
"ki_min": ki_min, "ki_max": ki_max,
|
||||||
|
"kd_min": kd_min, "kd_max": kd_max,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Active (blended) gains start at "empty" table values
|
||||||
|
self._active_class: str = "empty"
|
||||||
|
self._override_active: bool = False
|
||||||
|
raw = self._table["empty"]
|
||||||
|
self._active = self._apply_bounds(GainSet(raw.kp, raw.ki, raw.kd))
|
||||||
|
|
||||||
|
# ── Properties ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active_class(self) -> str:
|
||||||
|
return self._active_class
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active_gains(self) -> GainSet:
|
||||||
|
return self._active
|
||||||
|
|
||||||
|
@property
|
||||||
|
def override_active(self) -> bool:
|
||||||
|
return self._override_active
|
||||||
|
|
||||||
|
# ── Classification ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def classify(self, weight_kg: float) -> str:
|
||||||
|
"""Map weight (kg) to class name."""
|
||||||
|
if weight_kg <= self._thresholds.get("empty_max", 0.5):
|
||||||
|
return "empty"
|
||||||
|
if weight_kg <= self._thresholds.get("light_max", 2.0):
|
||||||
|
return "light"
|
||||||
|
return "heavy"
|
||||||
|
|
||||||
|
# ── Update ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update(self, weight_kg: float) -> tuple[GainSet, str, bool]:
|
||||||
|
"""
|
||||||
|
Blend active gains toward the target for the given weight.
|
||||||
|
Manual override bypasses scheduling.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(active_gains, active_class, class_changed)
|
||||||
|
"""
|
||||||
|
if self._override_active:
|
||||||
|
return self._active, self._active_class, False
|
||||||
|
|
||||||
|
target_class = self.classify(weight_kg)
|
||||||
|
class_changed = target_class != self._active_class
|
||||||
|
self._active_class = target_class
|
||||||
|
|
||||||
|
target = self._table[target_class]
|
||||||
|
a = self._alpha
|
||||||
|
blended = GainSet(
|
||||||
|
kp=(1.0 - a) * self._active.kp + a * target.kp,
|
||||||
|
ki=(1.0 - a) * self._active.ki + a * target.ki,
|
||||||
|
kd=(1.0 - a) * self._active.kd + a * target.kd,
|
||||||
|
)
|
||||||
|
self._active = self._apply_bounds(blended)
|
||||||
|
return self._active, self._active_class, class_changed
|
||||||
|
|
||||||
|
# ── Override / revert ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def override_gains(self, kp: float, ki: float, kd: float) -> None:
|
||||||
|
"""
|
||||||
|
Apply manual gain override (dynamic reconfigure / operator tuning).
|
||||||
|
Gains are clamped to safety bounds.
|
||||||
|
"""
|
||||||
|
self._override_active = True
|
||||||
|
self._active = self._apply_bounds(GainSet(kp, ki, kd))
|
||||||
|
|
||||||
|
def clear_override(self) -> None:
|
||||||
|
"""Release manual override; scheduling resumes on next update()."""
|
||||||
|
self._override_active = False
|
||||||
|
|
||||||
|
def revert_to_defaults(self, weight_kg: float) -> None:
|
||||||
|
"""
|
||||||
|
Immediately snap active gains to table values for the current weight
|
||||||
|
class (no blending). Used after instability is detected.
|
||||||
|
"""
|
||||||
|
self._override_active = False
|
||||||
|
cls = self.classify(weight_kg)
|
||||||
|
self._active_class = cls
|
||||||
|
raw = self._table[cls]
|
||||||
|
self._active = self._apply_bounds(GainSet(raw.kp, raw.ki, raw.kd))
|
||||||
|
|
||||||
|
# ── Internal ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _apply_bounds(self, gains: GainSet) -> GainSet:
|
||||||
|
b = self._bounds
|
||||||
|
return gains.clamp(
|
||||||
|
b["kp_min"], b["kp_max"],
|
||||||
|
b["ki_min"], b["ki_max"],
|
||||||
|
b["kd_min"], b["kd_max"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── InstabilityDetector ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class InstabilityDetector:
|
||||||
|
"""
|
||||||
|
Detects controller instability via two independent criteria:
|
||||||
|
|
||||||
|
1. Tilt threshold — more than 50 % of recent samples exceed
|
||||||
|
tilt_threshold_rad → unstable
|
||||||
|
2. Oscillations — sign-reversal count in window exceeds
|
||||||
|
oscillation_count → unstable (growing oscillations)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
tilt_threshold_rad: float = 0.4,
|
||||||
|
oscillation_count: int = 6,
|
||||||
|
window_size: int = 100,
|
||||||
|
):
|
||||||
|
self._tilt_threshold = tilt_threshold_rad
|
||||||
|
self._osc_count = oscillation_count
|
||||||
|
self._tilts: deque = deque(maxlen=window_size)
|
||||||
|
|
||||||
|
def update(self, tilt_rad: float) -> bool:
|
||||||
|
"""
|
||||||
|
Record tilt sample and return True if instability is detected.
|
||||||
|
False is always returned until the window has at least 4 samples.
|
||||||
|
"""
|
||||||
|
self._tilts.append(tilt_rad)
|
||||||
|
|
||||||
|
if len(self._tilts) < 4:
|
||||||
|
return False
|
||||||
|
|
||||||
|
tilts = list(self._tilts)
|
||||||
|
|
||||||
|
# Criterion 1: tilt amplitude
|
||||||
|
exceed = sum(1 for t in tilts if abs(t) > self._tilt_threshold)
|
||||||
|
if exceed > len(tilts) * 0.5:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Criterion 2: sign reversals (oscillation detection)
|
||||||
|
reversals = 0
|
||||||
|
prev_sign: int | None = None
|
||||||
|
for t in tilts:
|
||||||
|
if abs(t) < 0.01:
|
||||||
|
continue
|
||||||
|
sign = 1 if t > 0 else -1
|
||||||
|
if prev_sign is not None and sign != prev_sign:
|
||||||
|
reversals += 1
|
||||||
|
prev_sign = sign
|
||||||
|
|
||||||
|
return reversals >= self._osc_count
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._tilts.clear()
|
||||||
@ -0,0 +1,246 @@
|
|||||||
|
"""
|
||||||
|
weight_estimator.py — Motor-current weight estimator and calibration routine
|
||||||
|
for the SaltyBot adaptive balance controller (Issue #136).
|
||||||
|
|
||||||
|
Pure module — no ROS2 dependency; fully unit-testable.
|
||||||
|
|
||||||
|
Classes
|
||||||
|
───────
|
||||||
|
WeightEstimator — rolling-window current → weight, change detection
|
||||||
|
CalibrationRoutine — state-machine: IDLE → ROCKING → SETTLING → DONE/FAILED
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from collections import deque
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
# ── WeightEstimator ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class WeightEstimator:
|
||||||
|
"""
|
||||||
|
Estimates payload weight from motor current at steady state.
|
||||||
|
|
||||||
|
Model
|
||||||
|
─────
|
||||||
|
weight_kg = max(0, (I_avg − I_idle) / k_per_kg)
|
||||||
|
|
||||||
|
where I_avg is averaged over a rolling window of steady-state samples.
|
||||||
|
A sample is accepted only when |tilt| ≤ steady_tilt_threshold_rad.
|
||||||
|
|
||||||
|
Change detection
|
||||||
|
────────────────
|
||||||
|
Returns `changed=True` when the new estimate differs from the previous
|
||||||
|
by more than change_threshold_kg. The caller uses this to trigger gain
|
||||||
|
rescheduling.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
idle_current_a: float = 0.5,
|
||||||
|
current_per_kg_a: float = 0.3,
|
||||||
|
window_s: float = 3.0,
|
||||||
|
control_rate_hz: float = 100.0,
|
||||||
|
steady_tilt_threshold_rad: float = 0.05,
|
||||||
|
change_threshold_kg: float = 0.3,
|
||||||
|
):
|
||||||
|
self._idle = idle_current_a
|
||||||
|
self._per_kg = max(1e-6, current_per_kg_a) # avoid div-by-zero
|
||||||
|
self._steady_threshold = steady_tilt_threshold_rad
|
||||||
|
self._change_threshold = change_threshold_kg
|
||||||
|
|
||||||
|
window_size = max(10, int(window_s * control_rate_hz))
|
||||||
|
self._samples: deque = deque(maxlen=window_size)
|
||||||
|
self._last_estimate: float | None = None
|
||||||
|
|
||||||
|
# ── Properties ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last_estimate(self) -> float | None:
|
||||||
|
return self._last_estimate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sample_count(self) -> int:
|
||||||
|
return len(self._samples)
|
||||||
|
|
||||||
|
# ── Update ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update(
|
||||||
|
self,
|
||||||
|
current_a: float,
|
||||||
|
tilt_rad: float,
|
||||||
|
) -> tuple[float | None, bool]:
|
||||||
|
"""
|
||||||
|
Submit one sample (only accepted if in steady state).
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(weight_kg, changed)
|
||||||
|
weight_kg : None if fewer than min_samples collected yet
|
||||||
|
changed : True if estimate moved by ≥ change_threshold_kg
|
||||||
|
"""
|
||||||
|
if abs(tilt_rad) <= self._steady_threshold:
|
||||||
|
self._samples.append(current_a)
|
||||||
|
|
||||||
|
min_samples = max(5, self._samples.maxlen // 4)
|
||||||
|
if len(self._samples) < min_samples:
|
||||||
|
return None, False
|
||||||
|
|
||||||
|
avg_i = sum(self._samples) / len(self._samples)
|
||||||
|
weight = max(0.0, (avg_i - self._idle) / self._per_kg)
|
||||||
|
|
||||||
|
changed = (
|
||||||
|
self._last_estimate is None
|
||||||
|
or abs(weight - self._last_estimate) >= self._change_threshold
|
||||||
|
)
|
||||||
|
self._last_estimate = weight
|
||||||
|
return weight, changed
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Clear sample buffer and last estimate (e.g., after calibration)."""
|
||||||
|
self._samples.clear()
|
||||||
|
self._last_estimate = None
|
||||||
|
|
||||||
|
|
||||||
|
# ── CalibrationRoutine ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class CalibrationState(Enum):
|
||||||
|
IDLE = "idle"
|
||||||
|
ROCKING = "rocking"
|
||||||
|
SETTLING = "settling"
|
||||||
|
DONE = "done"
|
||||||
|
FAILED = "failed"
|
||||||
|
|
||||||
|
|
||||||
|
class CalibrationRoutine:
|
||||||
|
"""
|
||||||
|
On-demand calibration: rocks the robot back and forth to disturb the
|
||||||
|
balance, lets it settle, then measures steady-state current to produce
|
||||||
|
a weight estimate.
|
||||||
|
|
||||||
|
State machine
|
||||||
|
─────────────
|
||||||
|
IDLE ──(start)──► ROCKING ──(cycles done)──► SETTLING ──(samples ok)──► DONE
|
||||||
|
└──(timeout)────► FAILED
|
||||||
|
|
||||||
|
abort() returns to IDLE from any active state.
|
||||||
|
|
||||||
|
Usage
|
||||||
|
─────
|
||||||
|
Each control tick, call step(current_a, tilt_rad, dt).
|
||||||
|
The return value is a tilt offset (rad) to add to the balance
|
||||||
|
set-point. Returns 0.0 when not active.
|
||||||
|
When state == DONE, read weight_estimate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
amplitude_rad: float = 0.05,
|
||||||
|
frequency_hz: float = 0.5,
|
||||||
|
rock_cycles: int = 3,
|
||||||
|
settle_s: float = 2.0,
|
||||||
|
settle_tilt_threshold_rad: float = 0.03,
|
||||||
|
idle_current_a: float = 0.5,
|
||||||
|
current_per_kg_a: float = 0.3,
|
||||||
|
):
|
||||||
|
self._amplitude = amplitude_rad
|
||||||
|
self._frequency = max(1e-3, frequency_hz)
|
||||||
|
self._rock_cycles = max(1, rock_cycles)
|
||||||
|
self._settle_s = settle_s
|
||||||
|
self._settle_threshold = settle_tilt_threshold_rad
|
||||||
|
self._idle = idle_current_a
|
||||||
|
self._per_kg = max(1e-6, current_per_kg_a)
|
||||||
|
|
||||||
|
self._state = CalibrationState.IDLE
|
||||||
|
self._elapsed: float = 0.0
|
||||||
|
self._settle_elapsed: float = 0.0
|
||||||
|
self._settle_samples: list[float] = []
|
||||||
|
self._weight_estimate: float | None = None
|
||||||
|
|
||||||
|
# ── Properties ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self) -> CalibrationState:
|
||||||
|
return self._state
|
||||||
|
|
||||||
|
@property
|
||||||
|
def weight_estimate(self) -> float | None:
|
||||||
|
return self._weight_estimate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_active(self) -> bool:
|
||||||
|
return self._state in (CalibrationState.ROCKING, CalibrationState.SETTLING)
|
||||||
|
|
||||||
|
# ── Control ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def start(self) -> bool:
|
||||||
|
"""
|
||||||
|
Start calibration routine.
|
||||||
|
Returns False if already running (ROCKING or SETTLING).
|
||||||
|
"""
|
||||||
|
if self.is_active:
|
||||||
|
return False
|
||||||
|
self._state = CalibrationState.ROCKING
|
||||||
|
self._elapsed = 0.0
|
||||||
|
self._settle_elapsed = 0.0
|
||||||
|
self._settle_samples = []
|
||||||
|
self._weight_estimate = None
|
||||||
|
return True
|
||||||
|
|
||||||
|
def abort(self) -> None:
|
||||||
|
"""Immediately abort and return to IDLE."""
|
||||||
|
self._state = CalibrationState.IDLE
|
||||||
|
self._elapsed = 0.0
|
||||||
|
self._settle_elapsed = 0.0
|
||||||
|
|
||||||
|
# ── Step ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def step(self, current_a: float, tilt_rad: float, dt: float) -> float:
|
||||||
|
"""
|
||||||
|
Advance one control tick.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
current_a : motor current reading (A)
|
||||||
|
tilt_rad : current tilt angle (rad)
|
||||||
|
dt : elapsed time since last step (s)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
Tilt offset (rad) to superimpose on the balance set-point.
|
||||||
|
0.0 when IDLE / DONE / FAILED.
|
||||||
|
"""
|
||||||
|
if self._state == CalibrationState.ROCKING:
|
||||||
|
self._elapsed += dt
|
||||||
|
rock_duration = self._rock_cycles / self._frequency
|
||||||
|
|
||||||
|
if self._elapsed >= rock_duration:
|
||||||
|
self._state = CalibrationState.SETTLING
|
||||||
|
self._settle_elapsed = 0.0
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
return self._amplitude * math.sin(
|
||||||
|
2.0 * math.pi * self._frequency * self._elapsed
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._state == CalibrationState.SETTLING:
|
||||||
|
self._settle_elapsed += dt
|
||||||
|
|
||||||
|
if abs(tilt_rad) <= self._settle_threshold:
|
||||||
|
self._settle_samples.append(current_a)
|
||||||
|
|
||||||
|
if self._settle_elapsed >= self._settle_s:
|
||||||
|
if len(self._settle_samples) >= 5:
|
||||||
|
avg_i = sum(self._settle_samples) / len(self._settle_samples)
|
||||||
|
self._weight_estimate = max(
|
||||||
|
0.0, (avg_i - self._idle) / self._per_kg
|
||||||
|
)
|
||||||
|
self._state = CalibrationState.DONE
|
||||||
|
else:
|
||||||
|
self._state = CalibrationState.FAILED
|
||||||
|
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
return 0.0
|
||||||
5
jetson/ros2_ws/src/saltybot_adaptive_pid/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_adaptive_pid/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_adaptive_pid
|
||||||
|
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_adaptive_pid
|
||||||
32
jetson/ros2_ws/src/saltybot_adaptive_pid/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_adaptive_pid/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
|
package_name = "saltybot_adaptive_pid"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=find_packages(exclude=["test"]),
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages",
|
||||||
|
[f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(os.path.join("share", package_name, "config"),
|
||||||
|
glob("config/*.yaml")),
|
||||||
|
(os.path.join("share", package_name, "launch"),
|
||||||
|
glob("launch/*.py")),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description="Adaptive balance PID with weight estimation and gain scheduling",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
f"adaptive_pid_node = {package_name}.adaptive_pid_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,591 @@
|
|||||||
|
"""
|
||||||
|
test_adaptive_pid.py — Unit tests for pid_controller.py and weight_estimator.py
|
||||||
|
(Issue #136). No ROS2 runtime required — pure Python.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_adaptive_pid.pid_controller import (
|
||||||
|
GainSet,
|
||||||
|
GainScheduler,
|
||||||
|
InstabilityDetector,
|
||||||
|
PIDController,
|
||||||
|
)
|
||||||
|
from saltybot_adaptive_pid.weight_estimator import (
|
||||||
|
CalibrationRoutine,
|
||||||
|
CalibrationState,
|
||||||
|
WeightEstimator,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── GainSet ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGainSet:
|
||||||
|
|
||||||
|
def test_fields(self):
|
||||||
|
g = GainSet(kp=1.0, ki=0.1, kd=0.5)
|
||||||
|
assert g.kp == pytest.approx(1.0)
|
||||||
|
assert g.ki == pytest.approx(0.1)
|
||||||
|
assert g.kd == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_clamp_within_bounds_unchanged(self):
|
||||||
|
g = GainSet(5.0, 1.0, 2.0).clamp(0, 10, 0, 5, 0, 5)
|
||||||
|
assert g.kp == pytest.approx(5.0)
|
||||||
|
assert g.ki == pytest.approx(1.0)
|
||||||
|
assert g.kd == pytest.approx(2.0)
|
||||||
|
|
||||||
|
def test_clamp_below_min(self):
|
||||||
|
g = GainSet(-1.0, -0.5, -2.0).clamp(0, 10, 0, 5, 0, 5)
|
||||||
|
assert g.kp == pytest.approx(0.0)
|
||||||
|
assert g.ki == pytest.approx(0.0)
|
||||||
|
assert g.kd == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_clamp_above_max(self):
|
||||||
|
g = GainSet(200.0, 50.0, 100.0).clamp(0, 10, 0, 5, 0, 5)
|
||||||
|
assert g.kp == pytest.approx(10.0)
|
||||||
|
assert g.ki == pytest.approx(5.0)
|
||||||
|
assert g.kd == pytest.approx(5.0)
|
||||||
|
|
||||||
|
def test_clamp_returns_new_instance(self):
|
||||||
|
g = GainSet(5.0, 1.0, 2.0)
|
||||||
|
g2 = g.clamp(0, 10, 0, 5, 0, 5)
|
||||||
|
assert g2 is not g
|
||||||
|
|
||||||
|
|
||||||
|
# ─── PIDController ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestPIDController:
|
||||||
|
|
||||||
|
def _make(self, kp=1.0, ki=0.0, kd=0.0, **kwargs):
|
||||||
|
return PIDController(kp=kp, ki=ki, kd=kd, **kwargs)
|
||||||
|
|
||||||
|
def test_p_only(self):
|
||||||
|
pid = self._make(kp=2.0)
|
||||||
|
assert pid.step(1.0, 0.01) == pytest.approx(2.0)
|
||||||
|
|
||||||
|
def test_p_negative_error(self):
|
||||||
|
pid = self._make(kp=3.0)
|
||||||
|
assert pid.step(-1.0, 0.01) == pytest.approx(-3.0)
|
||||||
|
|
||||||
|
def test_i_accumulates(self):
|
||||||
|
pid = self._make(kp=0.0, ki=1.0) # isolate I term
|
||||||
|
# After 3 steps of error=1, dt=0.1: integral = 0.3, output = ki*integral = 0.3
|
||||||
|
pid.step(1.0, 0.1)
|
||||||
|
pid.step(1.0, 0.1)
|
||||||
|
out = pid.step(1.0, 0.1)
|
||||||
|
assert out == pytest.approx(0.3, abs=1e-9)
|
||||||
|
|
||||||
|
def test_d_zero_on_first_step(self):
|
||||||
|
pid = self._make(kp=0.0, kd=10.0) # isolate D term
|
||||||
|
out = pid.step(1.0, 0.01)
|
||||||
|
assert out == pytest.approx(0.0) # D is zero on first step
|
||||||
|
|
||||||
|
def test_d_on_rate_change(self):
|
||||||
|
pid = self._make(kp=0.0, kd=1.0) # isolate D term
|
||||||
|
pid.step(0.0, 0.1) # first step, D=0
|
||||||
|
out = pid.step(1.0, 0.1) # error changed by 1 over dt=0.1 → D=1/0.1=10
|
||||||
|
assert out == pytest.approx(10.0)
|
||||||
|
|
||||||
|
def test_output_min_clamp(self):
|
||||||
|
# Negative error → large negative output → clamped at output_min
|
||||||
|
pid = self._make(kp=100.0, output_min=-5.0)
|
||||||
|
out = pid.step(-1.0, 0.01)
|
||||||
|
assert out == pytest.approx(-5.0)
|
||||||
|
|
||||||
|
def test_output_max_clamp(self):
|
||||||
|
pid = self._make(kp=100.0, output_max=5.0)
|
||||||
|
out = pid.step(1.0, 0.01)
|
||||||
|
assert out == pytest.approx(5.0)
|
||||||
|
|
||||||
|
def test_anti_windup_clamps_integral(self):
|
||||||
|
pid = self._make(ki=1.0, i_clamp=0.5)
|
||||||
|
for _ in range(100):
|
||||||
|
pid.step(1.0, 0.1) # integrate a lot
|
||||||
|
assert abs(pid.integral) <= 0.5 + 1e-9
|
||||||
|
|
||||||
|
def test_reset_clears_integral(self):
|
||||||
|
pid = self._make(ki=1.0)
|
||||||
|
for _ in range(10):
|
||||||
|
pid.step(1.0, 0.1)
|
||||||
|
assert pid.integral > 0.0
|
||||||
|
pid.reset()
|
||||||
|
assert pid.integral == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_reset_clears_derivative_state(self):
|
||||||
|
pid = self._make(kp=0.0, kd=5.0) # isolate D term
|
||||||
|
pid.step(1.0, 0.1) # sets prev_error
|
||||||
|
pid.reset()
|
||||||
|
out = pid.step(1.0, 0.1) # prev_error=None after reset → D=0
|
||||||
|
assert out == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_update_gains_takes_effect(self):
|
||||||
|
pid = self._make(kp=1.0)
|
||||||
|
pid.update_gains(kp=3.0, ki=0.0, kd=0.0)
|
||||||
|
assert pid.step(1.0, 0.01) == pytest.approx(3.0)
|
||||||
|
|
||||||
|
def test_zero_dt_returns_zero(self):
|
||||||
|
pid = self._make(kp=5.0)
|
||||||
|
assert pid.step(1.0, 0.0) == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_negative_dt_returns_zero(self):
|
||||||
|
pid = self._make(kp=5.0)
|
||||||
|
assert pid.step(1.0, -0.1) == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_zero_error_no_pid_output(self):
|
||||||
|
pid = self._make(kp=5.0, ki=1.0, kd=1.0)
|
||||||
|
pid.step(0.0, 0.1)
|
||||||
|
out = pid.step(0.0, 0.1)
|
||||||
|
assert out == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_gain_properties(self):
|
||||||
|
pid = self._make(kp=2.0, ki=0.5, kd=0.3)
|
||||||
|
assert pid.kp == pytest.approx(2.0)
|
||||||
|
assert pid.ki == pytest.approx(0.5)
|
||||||
|
assert pid.kd == pytest.approx(0.3)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── GainScheduler ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_scheduler(**kwargs) -> GainScheduler:
|
||||||
|
table = {
|
||||||
|
"empty": GainSet(10.0, 0.5, 1.0),
|
||||||
|
"light": GainSet(15.0, 0.7, 1.5),
|
||||||
|
"heavy": GainSet(20.0, 1.0, 2.0),
|
||||||
|
}
|
||||||
|
thresholds = {"empty_max": 0.5, "light_max": 2.0}
|
||||||
|
defaults = dict(blend_alpha=0.05, kp_min=0.0, kp_max=100.0,
|
||||||
|
ki_min=0.0, ki_max=10.0, kd_min=0.0, kd_max=10.0)
|
||||||
|
defaults.update(kwargs)
|
||||||
|
return GainScheduler(table, thresholds, **defaults)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGainScheduler:
|
||||||
|
|
||||||
|
def test_classify_empty(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
assert s.classify(0.0) == "empty"
|
||||||
|
assert s.classify(0.5) == "empty"
|
||||||
|
|
||||||
|
def test_classify_light(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
assert s.classify(0.51) == "light"
|
||||||
|
assert s.classify(2.0) == "light"
|
||||||
|
|
||||||
|
def test_classify_heavy(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
assert s.classify(2.01) == "heavy"
|
||||||
|
assert s.classify(10.0) == "heavy"
|
||||||
|
|
||||||
|
def test_classify_boundary_empty_light(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
assert s.classify(0.5) == "empty"
|
||||||
|
assert s.classify(0.500001) == "light"
|
||||||
|
|
||||||
|
def test_initial_class_is_empty(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
assert s.active_class == "empty"
|
||||||
|
|
||||||
|
def test_update_blends_toward_target(self):
|
||||||
|
s = _make_scheduler(blend_alpha=0.1)
|
||||||
|
initial_kp = s.active_gains.kp
|
||||||
|
# Push toward "heavy"
|
||||||
|
for _ in range(50):
|
||||||
|
gains, _, _ = s.update(5.0)
|
||||||
|
# Should have moved significantly toward heavy (kp=20)
|
||||||
|
assert gains.kp > initial_kp + 1.0
|
||||||
|
|
||||||
|
def test_blend_does_not_overshoot(self):
|
||||||
|
s = _make_scheduler(blend_alpha=0.5)
|
||||||
|
target_kp = 20.0 # heavy
|
||||||
|
for _ in range(200):
|
||||||
|
gains, _, _ = s.update(5.0)
|
||||||
|
assert gains.kp <= target_kp + 1e-6
|
||||||
|
|
||||||
|
def test_class_changed_flag_true_on_change(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
s.update(0.1) # empty
|
||||||
|
_, _, changed = s.update(5.0) # → heavy
|
||||||
|
assert changed is True
|
||||||
|
|
||||||
|
def test_class_changed_flag_false_same_class(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
s.update(0.1)
|
||||||
|
_, _, changed = s.update(0.2) # still empty
|
||||||
|
assert changed is False
|
||||||
|
|
||||||
|
def test_override_gains_applied(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
|
||||||
|
assert s.active_gains.kp == pytest.approx(7.0)
|
||||||
|
assert s.active_gains.ki == pytest.approx(0.3)
|
||||||
|
assert s.override_active is True
|
||||||
|
|
||||||
|
def test_override_bypasses_scheduling(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
|
||||||
|
for _ in range(20):
|
||||||
|
gains, _, _ = s.update(5.0) # heavy weight, should not change
|
||||||
|
assert gains.kp == pytest.approx(7.0)
|
||||||
|
|
||||||
|
def test_override_gains_clamped(self):
|
||||||
|
s = _make_scheduler(kp_max=10.0)
|
||||||
|
s.override_gains(kp=999.0, ki=0.5, kd=1.0)
|
||||||
|
assert s.active_gains.kp == pytest.approx(10.0)
|
||||||
|
|
||||||
|
def test_clear_override_resumes_scheduling(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
|
||||||
|
s.clear_override()
|
||||||
|
assert s.override_active is False
|
||||||
|
|
||||||
|
def test_revert_to_defaults_snaps_immediately(self):
|
||||||
|
s = _make_scheduler(blend_alpha=0.001)
|
||||||
|
for _ in range(5):
|
||||||
|
s.update(5.0) # start blending toward heavy (kp=20)
|
||||||
|
s.revert_to_defaults(0.0) # snap back to empty (kp=10)
|
||||||
|
assert s.active_gains.kp == pytest.approx(10.0)
|
||||||
|
assert s.active_class == "empty"
|
||||||
|
|
||||||
|
def test_revert_clears_override(self):
|
||||||
|
s = _make_scheduler()
|
||||||
|
s.override_gains(kp=7.0, ki=0.3, kd=0.8)
|
||||||
|
s.revert_to_defaults(0.0)
|
||||||
|
assert s.override_active is False
|
||||||
|
|
||||||
|
def test_safety_bounds_clamp_table_gains(self):
|
||||||
|
s = _make_scheduler(kp_max=12.0) # cap below heavy table value (20)
|
||||||
|
for _ in range(200):
|
||||||
|
gains, _, _ = s.update(5.0) # heavy
|
||||||
|
assert gains.kp <= 12.0 + 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
# ─── InstabilityDetector ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestInstabilityDetector:
|
||||||
|
|
||||||
|
def _make(self, **kwargs):
|
||||||
|
defaults = dict(tilt_threshold_rad=0.4, oscillation_count=6, window_size=50)
|
||||||
|
defaults.update(kwargs)
|
||||||
|
return InstabilityDetector(**defaults)
|
||||||
|
|
||||||
|
def test_stable_small_tilts(self):
|
||||||
|
det = self._make()
|
||||||
|
for _ in range(50):
|
||||||
|
assert det.update(0.1) is False
|
||||||
|
|
||||||
|
def test_too_few_samples_always_stable(self):
|
||||||
|
det = self._make()
|
||||||
|
assert det.update(1.0) is False # only 1 sample
|
||||||
|
assert det.update(1.0) is False
|
||||||
|
assert det.update(1.0) is False
|
||||||
|
|
||||||
|
def test_unstable_tilt_exceeds_threshold(self):
|
||||||
|
det = self._make(tilt_threshold_rad=0.3, window_size=10)
|
||||||
|
for _ in range(6):
|
||||||
|
det.update(0.5) # > 0.3, majority
|
||||||
|
assert det.update(0.5) is True
|
||||||
|
|
||||||
|
def test_stable_below_threshold(self):
|
||||||
|
det = self._make(tilt_threshold_rad=0.5)
|
||||||
|
for _ in range(20):
|
||||||
|
result = det.update(0.3)
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_oscillation_triggers_unstable(self):
|
||||||
|
det = self._make(oscillation_count=4, window_size=50)
|
||||||
|
# Inject 8 alternating signs → 7 reversals ≥ 4
|
||||||
|
tilts = [0.1, -0.1] * 8
|
||||||
|
results = [det.update(t) for t in tilts]
|
||||||
|
assert any(results)
|
||||||
|
|
||||||
|
def test_reset_clears_window(self):
|
||||||
|
det = self._make(tilt_threshold_rad=0.3, window_size=10)
|
||||||
|
for _ in range(10):
|
||||||
|
det.update(0.5)
|
||||||
|
det.reset()
|
||||||
|
# After reset, too few samples → stable
|
||||||
|
assert det.update(0.5) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ─── WeightEstimator ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_estimator(**kwargs) -> WeightEstimator:
|
||||||
|
defaults = dict(
|
||||||
|
idle_current_a=0.5, current_per_kg_a=0.3,
|
||||||
|
window_s=1.0, control_rate_hz=10.0, # window_size=10
|
||||||
|
steady_tilt_threshold_rad=0.05, change_threshold_kg=0.3,
|
||||||
|
)
|
||||||
|
defaults.update(kwargs)
|
||||||
|
return WeightEstimator(**defaults)
|
||||||
|
|
||||||
|
|
||||||
|
class TestWeightEstimator:
|
||||||
|
|
||||||
|
def test_insufficient_samples_returns_none(self):
|
||||||
|
est = _make_estimator()
|
||||||
|
result, _ = est.update(0.5, tilt_rad=0.0)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_sufficient_samples_returns_estimate(self):
|
||||||
|
est = _make_estimator()
|
||||||
|
for _ in range(10):
|
||||||
|
est.update(0.8, tilt_rad=0.0) # 0.8A → (0.8-0.5)/0.3 = 1.0 kg
|
||||||
|
wt, _ = est.update(0.8, tilt_rad=0.0)
|
||||||
|
assert wt == pytest.approx(1.0, abs=0.05)
|
||||||
|
|
||||||
|
def test_non_steady_samples_rejected(self):
|
||||||
|
est = _make_estimator()
|
||||||
|
for _ in range(20):
|
||||||
|
est.update(0.8, tilt_rad=0.1) # |0.1| > 0.05 → rejected
|
||||||
|
result, _ = est.update(0.8, tilt_rad=0.1)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_known_current_gives_correct_weight(self):
|
||||||
|
# 1.1A, idle=0.5, per_kg=0.3 → (1.1-0.5)/0.3 = 2.0 kg
|
||||||
|
est = _make_estimator()
|
||||||
|
for _ in range(15):
|
||||||
|
est.update(1.1, tilt_rad=0.0)
|
||||||
|
wt, _ = est.update(1.1, tilt_rad=0.0)
|
||||||
|
assert wt == pytest.approx(2.0, abs=0.05)
|
||||||
|
|
||||||
|
def test_negative_estimate_clamped_to_zero(self):
|
||||||
|
# current below idle → weight should be 0
|
||||||
|
est = _make_estimator()
|
||||||
|
for _ in range(15):
|
||||||
|
est.update(0.1, tilt_rad=0.0)
|
||||||
|
wt, _ = est.update(0.1, tilt_rad=0.0)
|
||||||
|
assert wt == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_weight_changed_flag_on_first_estimate(self):
|
||||||
|
est = _make_estimator()
|
||||||
|
# Find the first non-None estimate — it must have changed=True
|
||||||
|
for _ in range(15):
|
||||||
|
wt, changed = est.update(0.8, tilt_rad=0.0)
|
||||||
|
if wt is not None:
|
||||||
|
assert changed is True
|
||||||
|
break
|
||||||
|
|
||||||
|
def test_weight_unchanged_below_threshold(self):
|
||||||
|
est = _make_estimator(change_threshold_kg=0.5)
|
||||||
|
for _ in range(15):
|
||||||
|
est.update(0.8, tilt_rad=0.0)
|
||||||
|
# Now feed very similar current (tiny perturbation)
|
||||||
|
for _ in range(15):
|
||||||
|
wt, changed = est.update(0.81, tilt_rad=0.0)
|
||||||
|
# delta is ~0.03 kg << threshold 0.5
|
||||||
|
assert changed is False
|
||||||
|
|
||||||
|
def test_reset_clears_samples_and_estimate(self):
|
||||||
|
est = _make_estimator()
|
||||||
|
for _ in range(15):
|
||||||
|
est.update(0.8, tilt_rad=0.0)
|
||||||
|
est.reset()
|
||||||
|
assert est.last_estimate is None
|
||||||
|
result, _ = est.update(0.8, tilt_rad=0.0)
|
||||||
|
assert result is None # need to re-fill window
|
||||||
|
|
||||||
|
|
||||||
|
# ─── CalibrationRoutine ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_cal(**kwargs) -> CalibrationRoutine:
|
||||||
|
defaults = dict(
|
||||||
|
amplitude_rad=0.05, frequency_hz=1.0,
|
||||||
|
rock_cycles=2, settle_s=1.0,
|
||||||
|
settle_tilt_threshold_rad=0.03,
|
||||||
|
idle_current_a=0.5, current_per_kg_a=0.3,
|
||||||
|
)
|
||||||
|
defaults.update(kwargs)
|
||||||
|
return CalibrationRoutine(**defaults)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCalibrationRoutine:
|
||||||
|
|
||||||
|
def test_initial_state_idle(self):
|
||||||
|
cal = _make_cal()
|
||||||
|
assert cal.state == CalibrationState.IDLE
|
||||||
|
assert cal.is_active is False
|
||||||
|
|
||||||
|
def test_start_transitions_to_rocking(self):
|
||||||
|
cal = _make_cal()
|
||||||
|
assert cal.start() is True
|
||||||
|
assert cal.state == CalibrationState.ROCKING
|
||||||
|
assert cal.is_active is True
|
||||||
|
|
||||||
|
def test_start_returns_false_when_active(self):
|
||||||
|
cal = _make_cal()
|
||||||
|
cal.start()
|
||||||
|
assert cal.start() is False
|
||||||
|
|
||||||
|
def test_start_succeeds_from_done(self):
|
||||||
|
cal = _make_cal(rock_cycles=1, settle_s=0.01)
|
||||||
|
cal.start()
|
||||||
|
dt = 0.01
|
||||||
|
for _ in range(200):
|
||||||
|
cal.step(0.8, tilt_rad=0.0, dt=dt)
|
||||||
|
# Should be DONE or FAILED at this point
|
||||||
|
assert cal.state in (CalibrationState.DONE, CalibrationState.FAILED)
|
||||||
|
assert cal.start() is True # can restart
|
||||||
|
|
||||||
|
def test_abort_returns_to_idle(self):
|
||||||
|
cal = _make_cal()
|
||||||
|
cal.start()
|
||||||
|
cal.abort()
|
||||||
|
assert cal.state == CalibrationState.IDLE
|
||||||
|
assert cal.is_active is False
|
||||||
|
|
||||||
|
def test_rocking_output_sinusoidal(self):
|
||||||
|
# freq=1Hz, cycles=2 → duration=2s; use 110 steps at dt=0.01 to cover >0.5 period
|
||||||
|
cal = _make_cal(amplitude_rad=0.1, frequency_hz=1.0)
|
||||||
|
cal.start()
|
||||||
|
outputs = [cal.step(0.5, 0.0, 0.01) for _ in range(110)]
|
||||||
|
# Sinusoidal over >1 s: must cross zero and include both signs
|
||||||
|
assert any(o > 0.0 for o in outputs)
|
||||||
|
assert any(o < 0.0 for o in outputs)
|
||||||
|
|
||||||
|
def test_rocking_output_bounded_by_amplitude(self):
|
||||||
|
cal = _make_cal(amplitude_rad=0.05)
|
||||||
|
cal.start()
|
||||||
|
for _ in range(200):
|
||||||
|
out = cal.step(0.5, 0.0, 0.005)
|
||||||
|
assert abs(out) <= 0.05 + 1e-9
|
||||||
|
|
||||||
|
def test_idle_output_is_zero(self):
|
||||||
|
cal = _make_cal()
|
||||||
|
assert cal.step(0.5, 0.0, 0.01) == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_rocking_transitions_to_settling(self):
|
||||||
|
# rock_cycles=2, freq=1Hz → rock_duration=2s; dt=0.1 → 20 steps
|
||||||
|
cal = _make_cal(rock_cycles=2, frequency_hz=1.0)
|
||||||
|
cal.start()
|
||||||
|
for _ in range(22):
|
||||||
|
cal.step(0.5, 0.0, 0.1)
|
||||||
|
assert cal.state == CalibrationState.SETTLING
|
||||||
|
|
||||||
|
def test_settling_output_is_zero(self):
|
||||||
|
cal = _make_cal(rock_cycles=1, frequency_hz=2.0)
|
||||||
|
cal.start()
|
||||||
|
# Skip through rocking
|
||||||
|
for _ in range(15):
|
||||||
|
cal.step(0.5, 0.0, 0.1)
|
||||||
|
# Now in settling
|
||||||
|
assert cal.state == CalibrationState.SETTLING
|
||||||
|
out = cal.step(0.5, 0.0, 0.1)
|
||||||
|
assert out == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_done_with_enough_settled_samples(self):
|
||||||
|
# freq=2Hz, cycles=1 → rock_duration=0.5s; dt=0.05 → 10 steps
|
||||||
|
# settle_s=0.5s → 10 more steps
|
||||||
|
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
|
||||||
|
cal.start()
|
||||||
|
for _ in range(100):
|
||||||
|
cal.step(0.8, tilt_rad=0.0, dt=0.05)
|
||||||
|
assert cal.state == CalibrationState.DONE
|
||||||
|
assert cal.weight_estimate is not None
|
||||||
|
assert cal.weight_estimate >= 0.0
|
||||||
|
|
||||||
|
def test_weight_estimate_known_current(self):
|
||||||
|
# 0.8A, idle=0.5, per_kg=0.3 → 1.0 kg
|
||||||
|
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
|
||||||
|
cal.start()
|
||||||
|
for _ in range(100):
|
||||||
|
cal.step(0.8, tilt_rad=0.0, dt=0.05)
|
||||||
|
assert cal.state == CalibrationState.DONE
|
||||||
|
assert cal.weight_estimate == pytest.approx(1.0, abs=0.1)
|
||||||
|
|
||||||
|
def test_failed_when_no_settled_samples(self):
|
||||||
|
# Robot never settles (large tilt)
|
||||||
|
cal = _make_cal(
|
||||||
|
rock_cycles=1, frequency_hz=2.0, settle_s=0.5,
|
||||||
|
settle_tilt_threshold_rad=0.01
|
||||||
|
)
|
||||||
|
cal.start()
|
||||||
|
for _ in range(100):
|
||||||
|
cal.step(0.8, tilt_rad=0.1, dt=0.05) # tilt > threshold → no samples
|
||||||
|
assert cal.state == CalibrationState.FAILED
|
||||||
|
assert cal.weight_estimate is None
|
||||||
|
|
||||||
|
def test_rocking_duration(self):
|
||||||
|
"""Rocking lasts exactly rock_cycles / frequency_hz seconds."""
|
||||||
|
freq = 1.0
|
||||||
|
cycles = 3
|
||||||
|
cal = _make_cal(rock_cycles=cycles, frequency_hz=freq, settle_s=100.0)
|
||||||
|
cal.start()
|
||||||
|
rock_dur = cycles / freq # 3 s
|
||||||
|
dt = 0.01
|
||||||
|
t = 0.0
|
||||||
|
while cal.state == CalibrationState.ROCKING and t < rock_dur + 0.1:
|
||||||
|
cal.step(0.5, 0.0, dt)
|
||||||
|
t += dt
|
||||||
|
assert t == pytest.approx(rock_dur, abs=0.02)
|
||||||
|
|
||||||
|
def test_weight_clamped_to_zero_below_idle(self):
|
||||||
|
# Current below idle → should clamp to 0, not negative
|
||||||
|
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
|
||||||
|
cal.start()
|
||||||
|
for _ in range(100):
|
||||||
|
cal.step(0.2, tilt_rad=0.0, dt=0.05) # 0.2 < 0.5 idle
|
||||||
|
if cal.state == CalibrationState.DONE:
|
||||||
|
assert cal.weight_estimate == pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Integration ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestIntegration:
|
||||||
|
|
||||||
|
def test_pid_with_gain_scheduler_round_trip(self):
|
||||||
|
"""GainScheduler feeds gains into PID; both operate consistently."""
|
||||||
|
sched = _make_scheduler()
|
||||||
|
pid = PIDController(kp=10.0, ki=0.5, kd=1.0,
|
||||||
|
output_min=-50.0, output_max=50.0)
|
||||||
|
|
||||||
|
# Simulate 1 s at 100 Hz, weight=0.2 kg (empty class)
|
||||||
|
dt = 0.01
|
||||||
|
total_effort = 0.0
|
||||||
|
for _ in range(100):
|
||||||
|
gains, _, _ = sched.update(0.2)
|
||||||
|
pid.update_gains(gains.kp, gains.ki, gains.kd)
|
||||||
|
error = 0.1 # constant tilt error
|
||||||
|
total_effort += pid.step(error, dt)
|
||||||
|
|
||||||
|
assert total_effort > 0.0 # robot should be correcting
|
||||||
|
|
||||||
|
def test_instability_reverts_pid_gains(self):
|
||||||
|
sched = _make_scheduler()
|
||||||
|
det = InstabilityDetector(tilt_threshold_rad=0.3, window_size=10)
|
||||||
|
pid = PIDController(kp=10.0, ki=0.0, kd=0.0)
|
||||||
|
|
||||||
|
# Inject large oscillations
|
||||||
|
tilts = [0.5, -0.5] * 10
|
||||||
|
for t in tilts:
|
||||||
|
det.update(t)
|
||||||
|
if det.update(t):
|
||||||
|
sched.revert_to_defaults(0.0)
|
||||||
|
gains = sched.active_gains
|
||||||
|
pid.update_gains(gains.kp, gains.ki, gains.kd)
|
||||||
|
pid.reset()
|
||||||
|
break
|
||||||
|
|
||||||
|
assert sched.active_class == "empty"
|
||||||
|
assert pid.integral == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_calibration_updates_weight_estimator_chain(self):
|
||||||
|
"""Full: calibration → weight estimate → gain scheduling."""
|
||||||
|
cal = _make_cal(rock_cycles=1, frequency_hz=2.0, settle_s=0.5)
|
||||||
|
sched = _make_scheduler()
|
||||||
|
|
||||||
|
# Run calibration
|
||||||
|
cal.start()
|
||||||
|
for _ in range(100):
|
||||||
|
cal.step(0.8, tilt_rad=0.0, dt=0.05)
|
||||||
|
|
||||||
|
assert cal.state == CalibrationState.DONE
|
||||||
|
wt = cal.weight_estimate
|
||||||
|
assert wt is not None
|
||||||
|
|
||||||
|
# Feed weight into scheduler
|
||||||
|
for _ in range(5):
|
||||||
|
gains, cls, _ = sched.update(wt)
|
||||||
|
|
||||||
|
# ~1 kg → "light" class
|
||||||
|
assert cls == "light"
|
||||||
Loading…
x
Reference in New Issue
Block a user