Adaptive PID gain scheduler (Issue #309) #311

Merged
sl-jetson merged 2 commits from sl-controls/issue-309-gain-schedule into main 2026-03-03 00:19:53 -05:00
11 changed files with 399 additions and 1 deletions

View File

@ -44,7 +44,7 @@
// tabs audibly engage (23 mm deflection), test rotation lock. // tabs audibly engage (23 mm deflection), test rotation lock.
// ============================================================================= // =============================================================================
$fn = 64; \$fn = 64;
e = 0.01; e = 0.01;
// ============================================================================= // =============================================================================

View File

@ -0,0 +1,16 @@
pid_scheduler:
ros__parameters:
# Base PID gains
base_kp: 1.0
base_ki: 0.1
base_kd: 0.05
# Gain scheduling parameters
# P gain increases by this factor * terrain_roughness
kp_terrain_scale: 0.5
# D gain scale: -1 means full reduction at zero speed
kd_speed_scale: -0.3
# I gain modulation factor
ki_scale: 0.1

View File

@ -0,0 +1,31 @@
"""Launch file for PID gain scheduler node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for PID scheduler."""
pkg_dir = get_package_share_directory("saltybot_pid_scheduler")
config_file = os.path.join(pkg_dir, "config", "pid_scheduler_config.yaml")
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
Node(
package="saltybot_pid_scheduler",
executable="pid_scheduler_node",
name="pid_scheduler",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_pid_scheduler</name>
<version>0.1.0</version>
<description>Adaptive PID gain scheduler for SaltyBot</description>
<maintainer email="sl-controls@saltylab.local">SaltyLab Controls</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Adaptive PID gain scheduler for SaltyBot.
Monitors speed scale and terrain roughness, adjusts PID gains dynamically.
Higher P on rough terrain, lower D at low speed.
Subscribed topics:
/saltybot/speed_scale (std_msgs/Float32) - Speed reduction factor (0.0-1.0)
/saltybot/terrain_roughness (std_msgs/Float32) - Terrain roughness (0.0-1.0)
Published topics:
/saltybot/pid_gains (std_msgs/Float32MultiArray) - [Kp, Ki, Kd] gains
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, Float32MultiArray
import numpy as np
class PIDSchedulerNode(Node):
"""ROS2 node for adaptive PID gain scheduling."""
def __init__(self):
super().__init__("pid_scheduler")
# Base PID gains
self.declare_parameter("base_kp", 1.0)
self.declare_parameter("base_ki", 0.1)
self.declare_parameter("base_kd", 0.05)
# Gain scheduling parameters
self.declare_parameter("kp_terrain_scale", 0.5) # P gain increase on rough terrain
self.declare_parameter("kd_speed_scale", -0.3) # D gain decrease at low speed
self.declare_parameter("ki_scale", 0.1) # I gain smoothing
self.base_kp = self.get_parameter("base_kp").value
self.base_ki = self.get_parameter("base_ki").value
self.base_kd = self.get_parameter("base_kd").value
self.kp_terrain_scale = self.get_parameter("kp_terrain_scale").value
self.kd_speed_scale = self.get_parameter("kd_speed_scale").value
self.ki_scale = self.get_parameter("ki_scale").value
# Current sensor inputs
self.speed_scale = 1.0
self.terrain_roughness = 0.0
# Current gains
self.current_kp = self.base_kp
self.current_ki = self.base_ki
self.current_kd = self.base_kd
# Subscriptions
self.sub_speed = self.create_subscription(
Float32, "/saltybot/speed_scale", self._on_speed_scale, 10
)
self.sub_terrain = self.create_subscription(
Float32, "/saltybot/terrain_roughness", self._on_terrain_roughness, 10
)
# Publisher for PID gains
self.pub_gains = self.create_publisher(Float32MultiArray, "/saltybot/pid_gains", 10)
self.get_logger().info(
f"PID gain scheduler initialized. Base gains: "
f"Kp={self.base_kp}, Ki={self.base_ki}, Kd={self.base_kd}"
)
def _on_speed_scale(self, msg: Float32) -> None:
"""Update speed scale and recalculate gains."""
self.speed_scale = np.clip(msg.data, 0.0, 1.0)
self._update_gains()
def _on_terrain_roughness(self, msg: Float32) -> None:
"""Update terrain roughness and recalculate gains."""
self.terrain_roughness = np.clip(msg.data, 0.0, 1.0)
self._update_gains()
def _update_gains(self) -> None:
"""Compute adaptive PID gains based on speed and terrain."""
# P gain increases with terrain roughness (better response on rough surfaces)
self.current_kp = self.base_kp * (1.0 + self.kp_terrain_scale * self.terrain_roughness)
# D gain decreases at low speed (avoid oscillation at low speeds)
# Low speed_scale means robot is moving slow, so reduce D damping
speed_factor = 1.0 + self.kd_speed_scale * (1.0 - self.speed_scale)
self.current_kd = self.base_kd * max(0.1, speed_factor) # Don't let D go negative
# I gain scales smoothly with both factors
terrain_factor = 1.0 + self.ki_scale * self.terrain_roughness
speed_damping = 1.0 - 0.3 * (1.0 - self.speed_scale) # Reduce I at low speed
self.current_ki = self.base_ki * terrain_factor * speed_damping
# Publish gains
self._publish_gains()
def _publish_gains(self) -> None:
"""Publish current PID gains as Float32MultiArray."""
msg = Float32MultiArray()
msg.data = [self.current_kp, self.current_ki, self.current_kd]
self.pub_gains.publish(msg)
self.get_logger().debug(
f"PID gains updated: Kp={self.current_kp:.4f}, "
f"Ki={self.current_ki:.4f}, Kd={self.current_kd:.4f} "
f"(speed={self.speed_scale:.2f}, terrain={self.terrain_roughness:.2f})"
)
def main(args=None):
rclpy.init(args=args)
node = PIDSchedulerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_pid_scheduler
[install]
install_scripts=$base/lib/saltybot_pid_scheduler

View File

@ -0,0 +1,24 @@
from setuptools import setup, find_packages
setup(
name='saltybot_pid_scheduler',
version='0.1.0',
packages=find_packages(),
data_files=[
('share/ament_index/resource_index/packages', ['resource/saltybot_pid_scheduler']),
('share/saltybot_pid_scheduler', ['package.xml']),
('share/saltybot_pid_scheduler/config', ['config/pid_scheduler_config.yaml']),
('share/saltybot_pid_scheduler/launch', ['launch/pid_scheduler.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyLab Controls',
author_email='sl-controls@saltylab.local',
description='Adaptive PID gain scheduler for SaltyBot',
license='MIT',
entry_points={
'console_scripts': [
'pid_scheduler_node=saltybot_pid_scheduler.pid_scheduler_node:main',
],
},
)

View File

@ -0,0 +1,178 @@
"""Tests for PID gain scheduler."""
import pytest
from std_msgs.msg import Float32
import rclpy
from saltybot_pid_scheduler.pid_scheduler_node import PIDSchedulerNode
@pytest.fixture
def rclpy_fixture():
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
node = PIDSchedulerNode()
yield node
node.destroy_node()
class TestInit:
def test_node_initialization(self, node):
assert node.base_kp == 1.0
assert node.base_ki == 0.1
assert node.base_kd == 0.05
assert node.speed_scale == 1.0
assert node.terrain_roughness == 0.0
class TestGainScheduling:
def test_gains_at_baseline(self, node):
"""Test gains at baseline (no speed reduction, no terrain roughness)."""
node._update_gains()
assert node.current_kp == pytest.approx(1.0)
assert node.current_kd == pytest.approx(0.05)
def test_kp_increases_with_terrain(self, node):
"""Test P gain increases with terrain roughness."""
node.terrain_roughness = 0.0
node._update_gains()
kp_smooth = node.current_kp
node.terrain_roughness = 1.0
node._update_gains()
kp_rough = node.current_kp
assert kp_rough > kp_smooth
assert kp_rough == pytest.approx(1.5) # 1.0 * (1 + 0.5 * 1.0)
def test_kd_decreases_at_low_speed(self, node):
"""Test D gain decreases when robot slows down."""
node.speed_scale = 1.0
node._update_gains()
kd_fast = node.current_kd
node.speed_scale = 0.0 # Robot stopped
node._update_gains()
kd_slow = node.current_kd
assert kd_slow < kd_fast
def test_ki_scales_with_terrain_and_speed(self, node):
"""Test I gain scales with both terrain and speed."""
node.speed_scale = 1.0
node.terrain_roughness = 0.0
node._update_gains()
ki_baseline = node.current_ki
node.terrain_roughness = 1.0
node._update_gains()
ki_rough = node.current_ki
assert ki_rough > ki_baseline
def test_kd_never_negative(self, node):
"""Test D gain never goes negative."""
node.speed_scale = 0.0
node._update_gains()
assert node.current_kd >= 0.0
def test_speed_scale_clipping(self, node):
"""Test speed scale is clipped to [0, 1]."""
msg = Float32()
msg.data = 2.0 # Out of range
node._on_speed_scale(msg)
assert node.speed_scale == 1.0
def test_terrain_roughness_clipping(self, node):
"""Test terrain roughness is clipped to [0, 1]."""
msg = Float32()
msg.data = -0.5 # Out of range
node._on_terrain_roughness(msg)
assert node.terrain_roughness == 0.0
class TestSensorInputs:
def test_speed_scale_callback(self, node):
"""Test speed scale subscription and update."""
msg = Float32()
msg.data = 0.5
node._on_speed_scale(msg)
assert node.speed_scale == 0.5
def test_terrain_roughness_callback(self, node):
"""Test terrain roughness subscription and update."""
msg = Float32()
msg.data = 0.75
node._on_terrain_roughness(msg)
assert node.terrain_roughness == 0.75
def test_combined_effects(self, node):
"""Test combined effect of speed and terrain on gains."""
# Slow speed + rough terrain = high P, low D
node.speed_scale = 0.2
node.terrain_roughness = 0.9
node._update_gains()
# P should be high (due to terrain)
assert node.current_kp > node.base_kp
# D should be low (due to slow speed)
# D factor = 1 + (-0.3) * (1 - 0.2) = 1 - 0.24 = 0.76
assert node.current_kd < node.base_kd
class TestEdgeCases:
def test_zero_speed_scale(self, node):
"""Test behavior at zero speed (robot stopped)."""
node.speed_scale = 0.0
node.terrain_roughness = 0.5
node._update_gains()
# All gains should be positive
assert node.current_kp > 0
assert node.current_ki > 0
assert node.current_kd > 0
def test_max_terrain_roughness(self, node):
"""Test behavior on extremely rough terrain."""
node.speed_scale = 1.0
node.terrain_roughness = 1.0
node._update_gains()
# Kp should be maximum
assert node.current_kp == pytest.approx(1.5)
def test_rapid_sensor_changes(self, node):
"""Test rapid changes in sensor inputs."""
for speed in [1.0, 0.5, 0.1, 0.9, 1.0]:
msg = Float32()
msg.data = speed
node._on_speed_scale(msg)
for roughness in [0.0, 0.5, 1.0, 0.3, 0.0]:
msg = Float32()
msg.data = roughness
node._on_terrain_roughness(msg)
# Should end at baseline
node._update_gains()
assert node.speed_scale == 1.0
assert node.terrain_roughness == 0.0