feat: Add battery-aware speed limiter ROS2 node (Issue #321)

Implements saltybot_battery_speed_limiter package:
- Subscribes to /saltybot/battery_state, publishes speed limit factor
- Battery percentage thresholds: 100-50%=1.0, 50-25%=0.7, 25-15%=0.4, <15%=0.0
- Reduces speed to preserve battery when running low
- Automatically stops movement below 15% to prevent deep discharge
- Configurable thresholds and speed factors
- Comprehensive test suite: 30+ tests covering all threshold transitions
- Handles both direct percentage and voltage-based fallback calculation

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
sl-controls 2026-03-03 00:46:12 -05:00
parent ffc69a05c0
commit 45828b1cc2
10 changed files with 589 additions and 0 deletions

View File

@ -0,0 +1,23 @@
# Battery Speed Limiter Configuration
battery_speed_limiter:
ros__parameters:
# Battery percentage thresholds
threshold_full_1: 100 # Upper bound for full speed
threshold_full_2: 50 # Lower bound for full speed
threshold_reduced_1: 50 # Upper bound for reduced speed
threshold_reduced_2: 25 # Lower bound for reduced speed
threshold_critical_1: 25 # Upper bound for critical speed
threshold_critical_2: 15 # Lower bound for critical speed
threshold_stop: 15 # Below this: stop
# Speed limit factors
speed_factor_full: 1.0 # 100-50%: full speed
speed_factor_reduced: 0.7 # 50-25%: 70% speed
speed_factor_critical: 0.4 # 25-15%: 40% speed
speed_factor_stop: 0.0 # <15%: stop
# Publishing frequency (Hz)
publish_frequency: 10
# Enable/disable limiting
enable_limiting: true

View File

@ -0,0 +1,28 @@
import os
from launch import LaunchDescription
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch.substitutions import PathJoinSubstitution
def generate_launch_description():
config_dir = PathJoinSubstitution(
[FindPackageShare('saltybot_battery_speed_limiter'), 'config']
)
config_file = PathJoinSubstitution([config_dir, 'battery_limiter_config.yaml'])
battery_limiter = Node(
package='saltybot_battery_speed_limiter',
executable='battery_speed_limiter_node',
name='battery_speed_limiter',
output='screen',
parameters=[config_file],
remappings=[
('/saltybot/battery_state', '/saltybot/battery_state'),
('/saltybot/speed_limit_factor', '/saltybot/speed_limit_factor'),
],
)
return LaunchDescription([
battery_limiter,
])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_battery_speed_limiter</name>
<version>0.1.0</version>
<description>Battery-aware speed limiter that adjusts robot speed based on battery state to preserve charge</description>
<maintainer email="sl-controls@saltybot.local">SaltyBot Controls</maintainer>
<license>MIT</license>
<author email="sl-controls@saltybot.local">SaltyBot Controls Team</author>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>ament_cmake_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""Battery-aware speed limiter for SaltyBot.
Subscribes to /saltybot/battery_state and publishes a speed limit factor
on /saltybot/speed_limit_factor based on remaining battery percentage.
Thresholds:
100-50%: 1.0 (full speed)
50-25%: 0.7 (70% speed)
25-15%: 0.4 (40% speed)
<15%: 0.0 (stop, preserve battery)
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
from sensor_msgs.msg import BatteryState
class BatterySpeedLimiterNode(Node):
"""ROS2 node for battery-aware speed limiting."""
def __init__(self):
super().__init__('battery_speed_limiter')
# Parameters
self.declare_parameter('threshold_full_1', 100) # 100%
self.declare_parameter('threshold_full_2', 50) # 50%
self.declare_parameter('threshold_reduced_1', 50) # 50%
self.declare_parameter('threshold_reduced_2', 25) # 25%
self.declare_parameter('threshold_critical_1', 25) # 25%
self.declare_parameter('threshold_critical_2', 15) # 15%
self.declare_parameter('threshold_stop', 15) # <15%
self.declare_parameter('speed_factor_full', 1.0)
self.declare_parameter('speed_factor_reduced', 0.7)
self.declare_parameter('speed_factor_critical', 0.4)
self.declare_parameter('speed_factor_stop', 0.0)
self.declare_parameter('publish_frequency', 10)
self.declare_parameter('enable_limiting', True)
# Read parameters
self.threshold_full_1 = self.get_parameter('threshold_full_1').value
self.threshold_full_2 = self.get_parameter('threshold_full_2').value
self.threshold_reduced_1 = self.get_parameter('threshold_reduced_1').value
self.threshold_reduced_2 = self.get_parameter('threshold_reduced_2').value
self.threshold_critical_1 = self.get_parameter('threshold_critical_1').value
self.threshold_critical_2 = self.get_parameter('threshold_critical_2').value
self.threshold_stop = self.get_parameter('threshold_stop').value
self.speed_factor_full = self.get_parameter('speed_factor_full').value
self.speed_factor_reduced = self.get_parameter('speed_factor_reduced').value
self.speed_factor_critical = self.get_parameter('speed_factor_critical').value
self.speed_factor_stop = self.get_parameter('speed_factor_stop').value
publish_frequency = self.get_parameter('publish_frequency').value
self.enable_limiting = self.get_parameter('enable_limiting').value
# Current battery percentage
self.current_battery_percent = 100.0
# Subscription to battery state
self.sub_battery = self.create_subscription(
BatteryState, '/saltybot/battery_state', self._on_battery_state, 10
)
# Publisher for speed limit factor
self.pub_speed_limit = self.create_publisher(
Float32, '/saltybot/speed_limit_factor', 10
)
# Timer for publishing at fixed frequency
period = 1.0 / publish_frequency
self.timer = self.create_timer(period, self._timer_callback)
self.get_logger().info(
f"Battery speed limiter initialized. "
f"Thresholds: Full={self.threshold_full_2}%, "
f"Reduced={self.threshold_reduced_2}%, "
f"Critical={self.threshold_critical_2}%, "
f"Stop={self.threshold_stop}%"
)
def _on_battery_state(self, msg: BatteryState) -> None:
"""Callback for battery state messages.
Extract percentage from BatteryState message.
Handles both direct percentage field and calculated percentage.
"""
# Try to get percentage from message
if msg.percentage >= 0:
self.current_battery_percent = msg.percentage * 100.0
else:
# Calculate from voltage if percentage not available
# Standard LiPo cells: 3V min, 4.2V max per cell
# Assume 2S (8.4V max) or 3S (12.6V max) pack
# This is a fallback - ideally percentage is provided
if msg.voltage > 0:
# Normalize to 0-1 based on typical LiPo range
min_voltage = 6.0 # 2S minimum
max_voltage = 8.4 # 2S maximum
normalized = (msg.voltage - min_voltage) / (max_voltage - min_voltage)
self.current_battery_percent = max(0.0, min(100.0, normalized * 100.0))
def _calculate_speed_limit(self, battery_percent: float) -> float:
"""Calculate speed limit factor based on battery percentage.
Args:
battery_percent: Battery state as percentage (0-100)
Returns:
Speed limit factor (0.0-1.0)
"""
if not self.enable_limiting:
return 1.0
# Apply thresholds
if battery_percent >= self.threshold_full_2:
# 100-50%: Full speed
return self.speed_factor_full
elif battery_percent >= self.threshold_reduced_2:
# 50-25%: Reduced speed (70%)
return self.speed_factor_reduced
elif battery_percent >= self.threshold_critical_2:
# 25-15%: Critical speed (40%)
return self.speed_factor_critical
else:
# <15%: Stop to preserve battery
return self.speed_factor_stop
def _timer_callback(self) -> None:
"""Periodically publish speed limit factor based on battery state."""
speed_limit = self._calculate_speed_limit(self.current_battery_percent)
msg = Float32(data=speed_limit)
self.pub_speed_limit.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = BatterySpeedLimiterNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_battery_speed_limiter
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,34 @@
from setuptools import setup, find_packages
package_name = 'saltybot_battery_speed_limiter'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/saltybot_battery_speed_limiter']),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/config', ['config/battery_limiter_config.yaml']),
('share/' + package_name + '/launch', ['launch/battery_speed_limiter.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyBot Controls',
author_email='sl-controls@saltybot.local',
maintainer='SaltyBot Controls',
maintainer_email='sl-controls@saltybot.local',
keywords=['ROS2', 'battery', 'speed', 'limiter'],
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3',
'Topic :: Software Development',
],
entry_points={
'console_scripts': [
'battery_speed_limiter_node=saltybot_battery_speed_limiter.battery_speed_limiter_node:main',
],
},
)

View File

@ -0,0 +1,319 @@
"""Unit tests for battery speed limiter node."""
import pytest
from sensor_msgs.msg import BatteryState
from std_msgs.msg import Float32
import rclpy
from saltybot_battery_speed_limiter.battery_speed_limiter_node import (
BatterySpeedLimiterNode,
)
@pytest.fixture
def rclpy_fixture():
"""Initialize and cleanup rclpy."""
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
"""Create a battery speed limiter node instance."""
node = BatterySpeedLimiterNode()
yield node
node.destroy_node()
class TestBatterySpeedLimiterNode:
"""Test suite for BatterySpeedLimiterNode."""
def test_node_initialization(self, node):
"""Test that node initializes with correct defaults."""
assert node.current_battery_percent == 100.0
assert node.enable_limiting is True
assert node.speed_factor_full == 1.0
assert node.speed_factor_reduced == 0.7
assert node.speed_factor_critical == 0.4
assert node.speed_factor_stop == 0.0
def test_battery_state_subscription(self, node):
"""Test battery state subscription updates percentage."""
battery_msg = BatteryState()
battery_msg.percentage = 0.75 # 75% (0-1 range)
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 75.0
def test_battery_percentage_conversion(self, node):
"""Test percentage conversion from 0-1 to 0-100 range."""
battery_msg = BatteryState()
battery_msg.percentage = 0.5 # 50%
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 50.0
def test_battery_zero_percent(self, node):
"""Test zero battery percentage."""
battery_msg = BatteryState()
battery_msg.percentage = 0.0
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 0.0
def test_battery_full_percent(self, node):
"""Test full battery percentage."""
battery_msg = BatteryState()
battery_msg.percentage = 1.0
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 100.0
def test_speed_limit_threshold_full_100_percent(self, node):
"""Test speed limit at 100% battery (full speed)."""
speed_limit = node._calculate_speed_limit(100.0)
assert speed_limit == 1.0
def test_speed_limit_threshold_full_75_percent(self, node):
"""Test speed limit at 75% battery (full speed range)."""
speed_limit = node._calculate_speed_limit(75.0)
assert speed_limit == 1.0
def test_speed_limit_threshold_full_50_percent(self, node):
"""Test speed limit at 50% battery (boundary)."""
speed_limit = node._calculate_speed_limit(50.0)
assert speed_limit == 1.0
def test_speed_limit_threshold_reduced_49_percent(self, node):
"""Test speed limit at 49% battery (just below full threshold)."""
speed_limit = node._calculate_speed_limit(49.0)
assert speed_limit == 0.7
def test_speed_limit_threshold_reduced_37_percent(self, node):
"""Test speed limit at 37% battery (reduced range)."""
speed_limit = node._calculate_speed_limit(37.0)
assert speed_limit == 0.7
def test_speed_limit_threshold_reduced_25_percent(self, node):
"""Test speed limit at 25% battery (boundary)."""
speed_limit = node._calculate_speed_limit(25.0)
assert speed_limit == 0.7
def test_speed_limit_threshold_critical_24_percent(self, node):
"""Test speed limit at 24% battery (just below reduced threshold)."""
speed_limit = node._calculate_speed_limit(24.0)
assert speed_limit == 0.4
def test_speed_limit_threshold_critical_20_percent(self, node):
"""Test speed limit at 20% battery (critical range)."""
speed_limit = node._calculate_speed_limit(20.0)
assert speed_limit == 0.4
def test_speed_limit_threshold_critical_15_percent(self, node):
"""Test speed limit at 15% battery (boundary)."""
speed_limit = node._calculate_speed_limit(15.0)
assert speed_limit == 0.4
def test_speed_limit_threshold_stop_14_percent(self, node):
"""Test speed limit at 14% battery (just below critical threshold)."""
speed_limit = node._calculate_speed_limit(14.0)
assert speed_limit == 0.0
def test_speed_limit_threshold_stop_5_percent(self, node):
"""Test speed limit at 5% battery (stop range)."""
speed_limit = node._calculate_speed_limit(5.0)
assert speed_limit == 0.0
def test_speed_limit_threshold_stop_0_percent(self, node):
"""Test speed limit at 0% battery (empty)."""
speed_limit = node._calculate_speed_limit(0.0)
assert speed_limit == 0.0
def test_speed_limit_limiting_disabled(self, node):
"""Test that limiting can be disabled."""
node.enable_limiting = False
# Even at critical battery, should return full speed
speed_limit = node._calculate_speed_limit(10.0)
assert speed_limit == 1.0
def test_speed_limit_limiting_enabled(self, node):
"""Test that limiting is enabled by default."""
node.enable_limiting = True
speed_limit = node._calculate_speed_limit(10.0)
assert speed_limit == 0.0
def test_threshold_transitions_high_to_low(self, node):
"""Test transitions from high to low battery levels."""
# Start at 100%
assert node._calculate_speed_limit(100.0) == 1.0
# Transition to reduced at 49%
assert node._calculate_speed_limit(49.0) == 0.7
# Transition to critical at 24%
assert node._calculate_speed_limit(24.0) == 0.4
# Transition to stop at 14%
assert node._calculate_speed_limit(14.0) == 0.0
def test_threshold_transitions_low_to_high(self, node):
"""Test transitions from low to high battery levels (charging)."""
# Start at 0%
assert node._calculate_speed_limit(0.0) == 0.0
# Transition to critical at 15%
assert node._calculate_speed_limit(15.0) == 0.4
# Transition to reduced at 25%
assert node._calculate_speed_limit(25.0) == 0.7
# Transition to full at 50%
assert node._calculate_speed_limit(50.0) == 1.0
def test_battery_state_with_voltage_fallback(self, node):
"""Test battery state extraction when percentage not available."""
battery_msg = BatteryState()
battery_msg.percentage = -1.0 # Invalid percentage
battery_msg.voltage = 8.0 # 2S LiPo approximate mid-range
node._on_battery_state(battery_msg)
# Should calculate from voltage
assert 0 <= node.current_battery_percent <= 100
def test_timer_callback_publishes(self, node):
"""Test that timer callback executes without error."""
node._timer_callback()
# Should execute without crashing
def test_speed_limit_continuity(self, node):
"""Test that speed limits are continuous across range."""
for percent in range(0, 101, 5):
speed_limit = node._calculate_speed_limit(float(percent))
assert 0.0 <= speed_limit <= 1.0
def test_custom_threshold_values(self, node):
"""Test that custom threshold parameters are respected."""
# Modify thresholds
node.threshold_full_2 = 60
node.threshold_reduced_2 = 30
node.threshold_critical_2 = 10
# Test new boundaries
assert node._calculate_speed_limit(60.0) == 1.0
assert node._calculate_speed_limit(59.0) == 0.7
assert node._calculate_speed_limit(30.0) == 0.7
assert node._calculate_speed_limit(29.0) == 0.4
assert node._calculate_speed_limit(10.0) == 0.4
assert node._calculate_speed_limit(9.0) == 0.0
def test_custom_speed_factors(self, node):
"""Test that custom speed factor parameters are respected."""
# Modify speed factors
node.speed_factor_full = 1.0
node.speed_factor_reduced = 0.5
node.speed_factor_critical = 0.25
node.speed_factor_stop = 0.0
assert node._calculate_speed_limit(100.0) == 1.0
assert node._calculate_speed_limit(50.0) == 0.5
assert node._calculate_speed_limit(25.0) == 0.25
assert node._calculate_speed_limit(10.0) == 0.0
class TestBatterySpeedLimiterScenarios:
"""Integration-style tests for realistic scenarios."""
def test_scenario_normal_operation_high_battery(self, node):
"""Scenario: Normal operation with high battery."""
battery_msg = BatteryState()
battery_msg.percentage = 0.85 # 85%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 1.0
def test_scenario_moderate_battery_level(self, node):
"""Scenario: Moderate battery level requires reduced speed."""
battery_msg = BatteryState()
battery_msg.percentage = 0.4 # 40%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 0.7
def test_scenario_critical_battery_level(self, node):
"""Scenario: Critical battery level requires restricted speed."""
battery_msg = BatteryState()
battery_msg.percentage = 0.2 # 20%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 0.4
def test_scenario_near_empty_battery(self, node):
"""Scenario: Battery near empty, speed stopped to preserve charge."""
battery_msg = BatteryState()
battery_msg.percentage = 0.08 # 8%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 0.0
def test_scenario_battery_charge_cycle(self, node):
"""Scenario: Battery through complete charge/discharge cycle."""
battery_percentages = [5, 10, 15, 20, 25, 30, 40, 50, 60, 75, 90, 100]
expected_factors = [0.0, 0.0, 0.4, 0.4, 0.7, 0.7, 0.7, 1.0, 1.0, 1.0, 1.0, 1.0]
for percent, expected_factor in zip(battery_percentages, expected_factors):
speed_limit = node._calculate_speed_limit(float(percent))
assert speed_limit == expected_factor
def test_scenario_continuous_battery_update(self, node):
"""Scenario: Continuous battery state updates."""
for percent in range(100, 0, -5):
battery_msg = BatteryState()
battery_msg.percentage = percent / 100.0
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
# Should always return valid factor
assert 0.0 <= speed_limit <= 1.0
def test_scenario_rapid_threshold_crossing(self, node):
"""Scenario: Rapid crossing of speed limit thresholds."""
thresholds = [100, 50, 49, 25, 24, 15, 14, 0]
for percent in thresholds:
speed_limit = node._calculate_speed_limit(float(percent))
assert 0.0 <= speed_limit <= 1.0
def test_scenario_hysteresis_behavior(self, node):
"""Test behavior near threshold boundaries."""
# Test just below and above each threshold
test_cases = [
(50.1, 1.0), # Just above 50% boundary (full speed)
(49.9, 0.7), # Just below 50% boundary (reduced)
(25.1, 0.7), # Just above 25% boundary (reduced)
(24.9, 0.4), # Just below 25% boundary (critical)
(15.1, 0.4), # Just above 15% boundary (critical)
(14.9, 0.0), # Just below 15% boundary (stop)
]
for percent, expected_factor in test_cases:
speed_limit = node._calculate_speed_limit(percent)
assert speed_limit == expected_factor