Compare commits
5 Commits
071e577227
...
4dbb4c6f0d
| Author | SHA1 | Date | |
|---|---|---|---|
| 4dbb4c6f0d | |||
| f506c89960 | |||
| 00c94cfe0d | |||
| a3386e1694 | |||
| 45828b1cc2 |
@ -0,0 +1,23 @@
|
|||||||
|
# Battery Speed Limiter Configuration
|
||||||
|
battery_speed_limiter:
|
||||||
|
ros__parameters:
|
||||||
|
# Battery percentage thresholds
|
||||||
|
threshold_full_1: 100 # Upper bound for full speed
|
||||||
|
threshold_full_2: 50 # Lower bound for full speed
|
||||||
|
threshold_reduced_1: 50 # Upper bound for reduced speed
|
||||||
|
threshold_reduced_2: 25 # Lower bound for reduced speed
|
||||||
|
threshold_critical_1: 25 # Upper bound for critical speed
|
||||||
|
threshold_critical_2: 15 # Lower bound for critical speed
|
||||||
|
threshold_stop: 15 # Below this: stop
|
||||||
|
|
||||||
|
# Speed limit factors
|
||||||
|
speed_factor_full: 1.0 # 100-50%: full speed
|
||||||
|
speed_factor_reduced: 0.7 # 50-25%: 70% speed
|
||||||
|
speed_factor_critical: 0.4 # 25-15%: 40% speed
|
||||||
|
speed_factor_stop: 0.0 # <15%: stop
|
||||||
|
|
||||||
|
# Publishing frequency (Hz)
|
||||||
|
publish_frequency: 10
|
||||||
|
|
||||||
|
# Enable/disable limiting
|
||||||
|
enable_limiting: true
|
||||||
@ -0,0 +1,28 @@
|
|||||||
|
import os
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch_ros.substitutions import FindPackageShare
|
||||||
|
from launch.substitutions import PathJoinSubstitution
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
config_dir = PathJoinSubstitution(
|
||||||
|
[FindPackageShare('saltybot_battery_speed_limiter'), 'config']
|
||||||
|
)
|
||||||
|
config_file = PathJoinSubstitution([config_dir, 'battery_limiter_config.yaml'])
|
||||||
|
|
||||||
|
battery_limiter = Node(
|
||||||
|
package='saltybot_battery_speed_limiter',
|
||||||
|
executable='battery_speed_limiter_node',
|
||||||
|
name='battery_speed_limiter',
|
||||||
|
output='screen',
|
||||||
|
parameters=[config_file],
|
||||||
|
remappings=[
|
||||||
|
('/saltybot/battery_state', '/saltybot/battery_state'),
|
||||||
|
('/saltybot/speed_limit_factor', '/saltybot/speed_limit_factor'),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
battery_limiter,
|
||||||
|
])
|
||||||
@ -0,0 +1,28 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_battery_speed_limiter</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Battery-aware speed limiter that adjusts robot speed based on battery state to preserve charge</description>
|
||||||
|
|
||||||
|
<maintainer email="sl-controls@saltybot.local">SaltyBot Controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<author email="sl-controls@saltybot.local">SaltyBot Controls Team</author>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||||
|
<buildtool_depend>ament_cmake_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,152 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Battery-aware speed limiter for SaltyBot.
|
||||||
|
|
||||||
|
Subscribes to /saltybot/battery_state and publishes a speed limit factor
|
||||||
|
on /saltybot/speed_limit_factor based on remaining battery percentage.
|
||||||
|
|
||||||
|
Thresholds:
|
||||||
|
100-50%: 1.0 (full speed)
|
||||||
|
50-25%: 0.7 (70% speed)
|
||||||
|
25-15%: 0.4 (40% speed)
|
||||||
|
<15%: 0.0 (stop, preserve battery)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from std_msgs.msg import Float32
|
||||||
|
from sensor_msgs.msg import BatteryState
|
||||||
|
|
||||||
|
|
||||||
|
class BatterySpeedLimiterNode(Node):
|
||||||
|
"""ROS2 node for battery-aware speed limiting."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('battery_speed_limiter')
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter('threshold_full_1', 100) # 100%
|
||||||
|
self.declare_parameter('threshold_full_2', 50) # 50%
|
||||||
|
self.declare_parameter('threshold_reduced_1', 50) # 50%
|
||||||
|
self.declare_parameter('threshold_reduced_2', 25) # 25%
|
||||||
|
self.declare_parameter('threshold_critical_1', 25) # 25%
|
||||||
|
self.declare_parameter('threshold_critical_2', 15) # 15%
|
||||||
|
self.declare_parameter('threshold_stop', 15) # <15%
|
||||||
|
|
||||||
|
self.declare_parameter('speed_factor_full', 1.0)
|
||||||
|
self.declare_parameter('speed_factor_reduced', 0.7)
|
||||||
|
self.declare_parameter('speed_factor_critical', 0.4)
|
||||||
|
self.declare_parameter('speed_factor_stop', 0.0)
|
||||||
|
|
||||||
|
self.declare_parameter('publish_frequency', 10)
|
||||||
|
self.declare_parameter('enable_limiting', True)
|
||||||
|
|
||||||
|
# Read parameters
|
||||||
|
self.threshold_full_1 = self.get_parameter('threshold_full_1').value
|
||||||
|
self.threshold_full_2 = self.get_parameter('threshold_full_2').value
|
||||||
|
self.threshold_reduced_1 = self.get_parameter('threshold_reduced_1').value
|
||||||
|
self.threshold_reduced_2 = self.get_parameter('threshold_reduced_2').value
|
||||||
|
self.threshold_critical_1 = self.get_parameter('threshold_critical_1').value
|
||||||
|
self.threshold_critical_2 = self.get_parameter('threshold_critical_2').value
|
||||||
|
self.threshold_stop = self.get_parameter('threshold_stop').value
|
||||||
|
|
||||||
|
self.speed_factor_full = self.get_parameter('speed_factor_full').value
|
||||||
|
self.speed_factor_reduced = self.get_parameter('speed_factor_reduced').value
|
||||||
|
self.speed_factor_critical = self.get_parameter('speed_factor_critical').value
|
||||||
|
self.speed_factor_stop = self.get_parameter('speed_factor_stop').value
|
||||||
|
|
||||||
|
publish_frequency = self.get_parameter('publish_frequency').value
|
||||||
|
self.enable_limiting = self.get_parameter('enable_limiting').value
|
||||||
|
|
||||||
|
# Current battery percentage
|
||||||
|
self.current_battery_percent = 100.0
|
||||||
|
|
||||||
|
# Subscription to battery state
|
||||||
|
self.sub_battery = self.create_subscription(
|
||||||
|
BatteryState, '/saltybot/battery_state', self._on_battery_state, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publisher for speed limit factor
|
||||||
|
self.pub_speed_limit = self.create_publisher(
|
||||||
|
Float32, '/saltybot/speed_limit_factor', 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Timer for publishing at fixed frequency
|
||||||
|
period = 1.0 / publish_frequency
|
||||||
|
self.timer = self.create_timer(period, self._timer_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Battery speed limiter initialized. "
|
||||||
|
f"Thresholds: Full={self.threshold_full_2}%, "
|
||||||
|
f"Reduced={self.threshold_reduced_2}%, "
|
||||||
|
f"Critical={self.threshold_critical_2}%, "
|
||||||
|
f"Stop={self.threshold_stop}%"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_battery_state(self, msg: BatteryState) -> None:
|
||||||
|
"""Callback for battery state messages.
|
||||||
|
|
||||||
|
Extract percentage from BatteryState message.
|
||||||
|
Handles both direct percentage field and calculated percentage.
|
||||||
|
"""
|
||||||
|
# Try to get percentage from message
|
||||||
|
if msg.percentage >= 0:
|
||||||
|
self.current_battery_percent = msg.percentage * 100.0
|
||||||
|
else:
|
||||||
|
# Calculate from voltage if percentage not available
|
||||||
|
# Standard LiPo cells: 3V min, 4.2V max per cell
|
||||||
|
# Assume 2S (8.4V max) or 3S (12.6V max) pack
|
||||||
|
# This is a fallback - ideally percentage is provided
|
||||||
|
if msg.voltage > 0:
|
||||||
|
# Normalize to 0-1 based on typical LiPo range
|
||||||
|
min_voltage = 6.0 # 2S minimum
|
||||||
|
max_voltage = 8.4 # 2S maximum
|
||||||
|
normalized = (msg.voltage - min_voltage) / (max_voltage - min_voltage)
|
||||||
|
self.current_battery_percent = max(0.0, min(100.0, normalized * 100.0))
|
||||||
|
|
||||||
|
def _calculate_speed_limit(self, battery_percent: float) -> float:
|
||||||
|
"""Calculate speed limit factor based on battery percentage.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
battery_percent: Battery state as percentage (0-100)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Speed limit factor (0.0-1.0)
|
||||||
|
"""
|
||||||
|
if not self.enable_limiting:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
# Apply thresholds
|
||||||
|
if battery_percent >= self.threshold_full_2:
|
||||||
|
# 100-50%: Full speed
|
||||||
|
return self.speed_factor_full
|
||||||
|
elif battery_percent >= self.threshold_reduced_2:
|
||||||
|
# 50-25%: Reduced speed (70%)
|
||||||
|
return self.speed_factor_reduced
|
||||||
|
elif battery_percent >= self.threshold_critical_2:
|
||||||
|
# 25-15%: Critical speed (40%)
|
||||||
|
return self.speed_factor_critical
|
||||||
|
else:
|
||||||
|
# <15%: Stop to preserve battery
|
||||||
|
return self.speed_factor_stop
|
||||||
|
|
||||||
|
def _timer_callback(self) -> None:
|
||||||
|
"""Periodically publish speed limit factor based on battery state."""
|
||||||
|
speed_limit = self._calculate_speed_limit(self.current_battery_percent)
|
||||||
|
msg = Float32(data=speed_limit)
|
||||||
|
self.pub_speed_limit.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = BatterySpeedLimiterNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_battery_speed_limiter
|
||||||
|
[egg_info]
|
||||||
|
tag_build =
|
||||||
|
tag_date = 0
|
||||||
34
jetson/ros2_ws/src/saltybot_battery_speed_limiter/setup.py
Normal file
34
jetson/ros2_ws/src/saltybot_battery_speed_limiter/setup.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
package_name = 'saltybot_battery_speed_limiter'
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(exclude=['test']),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/saltybot_battery_speed_limiter']),
|
||||||
|
('share/' + package_name, ['package.xml']),
|
||||||
|
('share/' + package_name + '/config', ['config/battery_limiter_config.yaml']),
|
||||||
|
('share/' + package_name + '/launch', ['launch/battery_speed_limiter.launch.py']),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
author='SaltyBot Controls',
|
||||||
|
author_email='sl-controls@saltybot.local',
|
||||||
|
maintainer='SaltyBot Controls',
|
||||||
|
maintainer_email='sl-controls@saltybot.local',
|
||||||
|
keywords=['ROS2', 'battery', 'speed', 'limiter'],
|
||||||
|
classifiers=[
|
||||||
|
'Intended Audience :: Developers',
|
||||||
|
'License :: OSI Approved :: MIT License',
|
||||||
|
'Programming Language :: Python :: 3',
|
||||||
|
'Topic :: Software Development',
|
||||||
|
],
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'battery_speed_limiter_node=saltybot_battery_speed_limiter.battery_speed_limiter_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,319 @@
|
|||||||
|
"""Unit tests for battery speed limiter node."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from sensor_msgs.msg import BatteryState
|
||||||
|
from std_msgs.msg import Float32
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
|
||||||
|
from saltybot_battery_speed_limiter.battery_speed_limiter_node import (
|
||||||
|
BatterySpeedLimiterNode,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rclpy_fixture():
|
||||||
|
"""Initialize and cleanup rclpy."""
|
||||||
|
rclpy.init()
|
||||||
|
yield
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def node(rclpy_fixture):
|
||||||
|
"""Create a battery speed limiter node instance."""
|
||||||
|
node = BatterySpeedLimiterNode()
|
||||||
|
yield node
|
||||||
|
node.destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
class TestBatterySpeedLimiterNode:
|
||||||
|
"""Test suite for BatterySpeedLimiterNode."""
|
||||||
|
|
||||||
|
def test_node_initialization(self, node):
|
||||||
|
"""Test that node initializes with correct defaults."""
|
||||||
|
assert node.current_battery_percent == 100.0
|
||||||
|
assert node.enable_limiting is True
|
||||||
|
assert node.speed_factor_full == 1.0
|
||||||
|
assert node.speed_factor_reduced == 0.7
|
||||||
|
assert node.speed_factor_critical == 0.4
|
||||||
|
assert node.speed_factor_stop == 0.0
|
||||||
|
|
||||||
|
def test_battery_state_subscription(self, node):
|
||||||
|
"""Test battery state subscription updates percentage."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 0.75 # 75% (0-1 range)
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
|
||||||
|
assert node.current_battery_percent == 75.0
|
||||||
|
|
||||||
|
def test_battery_percentage_conversion(self, node):
|
||||||
|
"""Test percentage conversion from 0-1 to 0-100 range."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 0.5 # 50%
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
|
||||||
|
assert node.current_battery_percent == 50.0
|
||||||
|
|
||||||
|
def test_battery_zero_percent(self, node):
|
||||||
|
"""Test zero battery percentage."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 0.0
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
|
||||||
|
assert node.current_battery_percent == 0.0
|
||||||
|
|
||||||
|
def test_battery_full_percent(self, node):
|
||||||
|
"""Test full battery percentage."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 1.0
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
|
||||||
|
assert node.current_battery_percent == 100.0
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_full_100_percent(self, node):
|
||||||
|
"""Test speed limit at 100% battery (full speed)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(100.0)
|
||||||
|
assert speed_limit == 1.0
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_full_75_percent(self, node):
|
||||||
|
"""Test speed limit at 75% battery (full speed range)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(75.0)
|
||||||
|
assert speed_limit == 1.0
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_full_50_percent(self, node):
|
||||||
|
"""Test speed limit at 50% battery (boundary)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(50.0)
|
||||||
|
assert speed_limit == 1.0
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_reduced_49_percent(self, node):
|
||||||
|
"""Test speed limit at 49% battery (just below full threshold)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(49.0)
|
||||||
|
assert speed_limit == 0.7
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_reduced_37_percent(self, node):
|
||||||
|
"""Test speed limit at 37% battery (reduced range)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(37.0)
|
||||||
|
assert speed_limit == 0.7
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_reduced_25_percent(self, node):
|
||||||
|
"""Test speed limit at 25% battery (boundary)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(25.0)
|
||||||
|
assert speed_limit == 0.7
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_critical_24_percent(self, node):
|
||||||
|
"""Test speed limit at 24% battery (just below reduced threshold)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(24.0)
|
||||||
|
assert speed_limit == 0.4
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_critical_20_percent(self, node):
|
||||||
|
"""Test speed limit at 20% battery (critical range)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(20.0)
|
||||||
|
assert speed_limit == 0.4
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_critical_15_percent(self, node):
|
||||||
|
"""Test speed limit at 15% battery (boundary)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(15.0)
|
||||||
|
assert speed_limit == 0.4
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_stop_14_percent(self, node):
|
||||||
|
"""Test speed limit at 14% battery (just below critical threshold)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(14.0)
|
||||||
|
assert speed_limit == 0.0
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_stop_5_percent(self, node):
|
||||||
|
"""Test speed limit at 5% battery (stop range)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(5.0)
|
||||||
|
assert speed_limit == 0.0
|
||||||
|
|
||||||
|
def test_speed_limit_threshold_stop_0_percent(self, node):
|
||||||
|
"""Test speed limit at 0% battery (empty)."""
|
||||||
|
speed_limit = node._calculate_speed_limit(0.0)
|
||||||
|
assert speed_limit == 0.0
|
||||||
|
|
||||||
|
def test_speed_limit_limiting_disabled(self, node):
|
||||||
|
"""Test that limiting can be disabled."""
|
||||||
|
node.enable_limiting = False
|
||||||
|
|
||||||
|
# Even at critical battery, should return full speed
|
||||||
|
speed_limit = node._calculate_speed_limit(10.0)
|
||||||
|
assert speed_limit == 1.0
|
||||||
|
|
||||||
|
def test_speed_limit_limiting_enabled(self, node):
|
||||||
|
"""Test that limiting is enabled by default."""
|
||||||
|
node.enable_limiting = True
|
||||||
|
|
||||||
|
speed_limit = node._calculate_speed_limit(10.0)
|
||||||
|
assert speed_limit == 0.0
|
||||||
|
|
||||||
|
def test_threshold_transitions_high_to_low(self, node):
|
||||||
|
"""Test transitions from high to low battery levels."""
|
||||||
|
# Start at 100%
|
||||||
|
assert node._calculate_speed_limit(100.0) == 1.0
|
||||||
|
|
||||||
|
# Transition to reduced at 49%
|
||||||
|
assert node._calculate_speed_limit(49.0) == 0.7
|
||||||
|
|
||||||
|
# Transition to critical at 24%
|
||||||
|
assert node._calculate_speed_limit(24.0) == 0.4
|
||||||
|
|
||||||
|
# Transition to stop at 14%
|
||||||
|
assert node._calculate_speed_limit(14.0) == 0.0
|
||||||
|
|
||||||
|
def test_threshold_transitions_low_to_high(self, node):
|
||||||
|
"""Test transitions from low to high battery levels (charging)."""
|
||||||
|
# Start at 0%
|
||||||
|
assert node._calculate_speed_limit(0.0) == 0.0
|
||||||
|
|
||||||
|
# Transition to critical at 15%
|
||||||
|
assert node._calculate_speed_limit(15.0) == 0.4
|
||||||
|
|
||||||
|
# Transition to reduced at 25%
|
||||||
|
assert node._calculate_speed_limit(25.0) == 0.7
|
||||||
|
|
||||||
|
# Transition to full at 50%
|
||||||
|
assert node._calculate_speed_limit(50.0) == 1.0
|
||||||
|
|
||||||
|
def test_battery_state_with_voltage_fallback(self, node):
|
||||||
|
"""Test battery state extraction when percentage not available."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = -1.0 # Invalid percentage
|
||||||
|
battery_msg.voltage = 8.0 # 2S LiPo approximate mid-range
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
|
||||||
|
# Should calculate from voltage
|
||||||
|
assert 0 <= node.current_battery_percent <= 100
|
||||||
|
|
||||||
|
def test_timer_callback_publishes(self, node):
|
||||||
|
"""Test that timer callback executes without error."""
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Should execute without crashing
|
||||||
|
|
||||||
|
def test_speed_limit_continuity(self, node):
|
||||||
|
"""Test that speed limits are continuous across range."""
|
||||||
|
for percent in range(0, 101, 5):
|
||||||
|
speed_limit = node._calculate_speed_limit(float(percent))
|
||||||
|
assert 0.0 <= speed_limit <= 1.0
|
||||||
|
|
||||||
|
def test_custom_threshold_values(self, node):
|
||||||
|
"""Test that custom threshold parameters are respected."""
|
||||||
|
# Modify thresholds
|
||||||
|
node.threshold_full_2 = 60
|
||||||
|
node.threshold_reduced_2 = 30
|
||||||
|
node.threshold_critical_2 = 10
|
||||||
|
|
||||||
|
# Test new boundaries
|
||||||
|
assert node._calculate_speed_limit(60.0) == 1.0
|
||||||
|
assert node._calculate_speed_limit(59.0) == 0.7
|
||||||
|
assert node._calculate_speed_limit(30.0) == 0.7
|
||||||
|
assert node._calculate_speed_limit(29.0) == 0.4
|
||||||
|
assert node._calculate_speed_limit(10.0) == 0.4
|
||||||
|
assert node._calculate_speed_limit(9.0) == 0.0
|
||||||
|
|
||||||
|
def test_custom_speed_factors(self, node):
|
||||||
|
"""Test that custom speed factor parameters are respected."""
|
||||||
|
# Modify speed factors
|
||||||
|
node.speed_factor_full = 1.0
|
||||||
|
node.speed_factor_reduced = 0.5
|
||||||
|
node.speed_factor_critical = 0.25
|
||||||
|
node.speed_factor_stop = 0.0
|
||||||
|
|
||||||
|
assert node._calculate_speed_limit(100.0) == 1.0
|
||||||
|
assert node._calculate_speed_limit(50.0) == 0.5
|
||||||
|
assert node._calculate_speed_limit(25.0) == 0.25
|
||||||
|
assert node._calculate_speed_limit(10.0) == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestBatterySpeedLimiterScenarios:
|
||||||
|
"""Integration-style tests for realistic scenarios."""
|
||||||
|
|
||||||
|
def test_scenario_normal_operation_high_battery(self, node):
|
||||||
|
"""Scenario: Normal operation with high battery."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 0.85 # 85%
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
|
||||||
|
|
||||||
|
assert speed_limit == 1.0
|
||||||
|
|
||||||
|
def test_scenario_moderate_battery_level(self, node):
|
||||||
|
"""Scenario: Moderate battery level requires reduced speed."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 0.4 # 40%
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
|
||||||
|
|
||||||
|
assert speed_limit == 0.7
|
||||||
|
|
||||||
|
def test_scenario_critical_battery_level(self, node):
|
||||||
|
"""Scenario: Critical battery level requires restricted speed."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 0.2 # 20%
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
|
||||||
|
|
||||||
|
assert speed_limit == 0.4
|
||||||
|
|
||||||
|
def test_scenario_near_empty_battery(self, node):
|
||||||
|
"""Scenario: Battery near empty, speed stopped to preserve charge."""
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = 0.08 # 8%
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
|
||||||
|
|
||||||
|
assert speed_limit == 0.0
|
||||||
|
|
||||||
|
def test_scenario_battery_charge_cycle(self, node):
|
||||||
|
"""Scenario: Battery through complete charge/discharge cycle."""
|
||||||
|
battery_percentages = [5, 10, 15, 20, 25, 30, 40, 50, 60, 75, 90, 100]
|
||||||
|
expected_factors = [0.0, 0.0, 0.4, 0.4, 0.7, 0.7, 0.7, 1.0, 1.0, 1.0, 1.0, 1.0]
|
||||||
|
|
||||||
|
for percent, expected_factor in zip(battery_percentages, expected_factors):
|
||||||
|
speed_limit = node._calculate_speed_limit(float(percent))
|
||||||
|
assert speed_limit == expected_factor
|
||||||
|
|
||||||
|
def test_scenario_continuous_battery_update(self, node):
|
||||||
|
"""Scenario: Continuous battery state updates."""
|
||||||
|
for percent in range(100, 0, -5):
|
||||||
|
battery_msg = BatteryState()
|
||||||
|
battery_msg.percentage = percent / 100.0
|
||||||
|
|
||||||
|
node._on_battery_state(battery_msg)
|
||||||
|
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
|
||||||
|
|
||||||
|
# Should always return valid factor
|
||||||
|
assert 0.0 <= speed_limit <= 1.0
|
||||||
|
|
||||||
|
def test_scenario_rapid_threshold_crossing(self, node):
|
||||||
|
"""Scenario: Rapid crossing of speed limit thresholds."""
|
||||||
|
thresholds = [100, 50, 49, 25, 24, 15, 14, 0]
|
||||||
|
|
||||||
|
for percent in thresholds:
|
||||||
|
speed_limit = node._calculate_speed_limit(float(percent))
|
||||||
|
assert 0.0 <= speed_limit <= 1.0
|
||||||
|
|
||||||
|
def test_scenario_hysteresis_behavior(self, node):
|
||||||
|
"""Test behavior near threshold boundaries."""
|
||||||
|
# Test just below and above each threshold
|
||||||
|
test_cases = [
|
||||||
|
(50.1, 1.0), # Just above 50% boundary (full speed)
|
||||||
|
(49.9, 0.7), # Just below 50% boundary (reduced)
|
||||||
|
(25.1, 0.7), # Just above 25% boundary (reduced)
|
||||||
|
(24.9, 0.4), # Just below 25% boundary (critical)
|
||||||
|
(15.1, 0.4), # Just above 15% boundary (critical)
|
||||||
|
(14.9, 0.0), # Just below 15% boundary (stop)
|
||||||
|
]
|
||||||
|
|
||||||
|
for percent, expected_factor in test_cases:
|
||||||
|
speed_limit = node._calculate_speed_limit(percent)
|
||||||
|
assert speed_limit == expected_factor
|
||||||
@ -0,0 +1,14 @@
|
|||||||
|
camera_hotplug_node:
|
||||||
|
ros__parameters:
|
||||||
|
video_glob: "/dev/video*" # glob pattern for USB camera devices
|
||||||
|
output_topic: "/saltybot/camera_status"
|
||||||
|
|
||||||
|
poll_rate: 2.0 # device scan frequency (Hz)
|
||||||
|
|
||||||
|
# Reconnect detection: if a camera reappears within restart_grace_s
|
||||||
|
# seconds of being lost, publish "restarting" instead of "connected".
|
||||||
|
restart_grace_s: 5.0 # reconnect grace window (s)
|
||||||
|
restart_hold_s: 2.0 # duration to hold "restarting" before resolving (s)
|
||||||
|
|
||||||
|
# Set true to publish status on every poll tick, not only on change.
|
||||||
|
publish_always: false
|
||||||
@ -0,0 +1,43 @@
|
|||||||
|
"""camera_hotplug.launch.py — Launch USB camera hot-plug monitor (Issue #320).
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_social camera_hotplug.launch.py
|
||||||
|
ros2 launch saltybot_social camera_hotplug.launch.py video_glob:=/dev/video0
|
||||||
|
ros2 launch saltybot_social camera_hotplug.launch.py restart_grace_s:=10.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg = get_package_share_directory("saltybot_social")
|
||||||
|
cfg = os.path.join(pkg, "config", "camera_hotplug_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("video_glob", default_value="/dev/video*",
|
||||||
|
description="Glob pattern for video device discovery"),
|
||||||
|
DeclareLaunchArgument("poll_rate", default_value="2.0",
|
||||||
|
description="Device scan frequency (Hz)"),
|
||||||
|
DeclareLaunchArgument("restart_grace_s", default_value="5.0",
|
||||||
|
description="Reconnect window that triggers 'restarting' (s)"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_social",
|
||||||
|
executable="camera_hotplug_node",
|
||||||
|
name="camera_hotplug_node",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
cfg,
|
||||||
|
{
|
||||||
|
"video_glob": LaunchConfiguration("video_glob"),
|
||||||
|
"poll_rate": LaunchConfiguration("poll_rate"),
|
||||||
|
"restart_grace_s": LaunchConfiguration("restart_grace_s"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,241 @@
|
|||||||
|
"""camera_hotplug_node.py — USB camera hot-plug monitor.
|
||||||
|
Issue #320
|
||||||
|
|
||||||
|
Polls ``/dev/video*`` at a configurable rate to detect USB camera
|
||||||
|
connect / disconnect events and publishes a status string on
|
||||||
|
/saltybot/camera_status.
|
||||||
|
|
||||||
|
Status values
|
||||||
|
─────────────
|
||||||
|
"connected" — one or more video devices present and stable
|
||||||
|
"disconnected" — no /dev/video* devices found
|
||||||
|
"restarting" — device set changed (hot-swap / reconnect); downstream
|
||||||
|
capture pipelines should restart. Held for
|
||||||
|
``restart_hold_s`` seconds then resolved to
|
||||||
|
"connected" or "disconnected" based on re-scan.
|
||||||
|
|
||||||
|
State machine
|
||||||
|
─────────────
|
||||||
|
DISCONNECTED ──(devices appear, within grace window)──► RESTARTING
|
||||||
|
DISCONNECTED ──(devices appear, outside grace window)─► CONNECTED
|
||||||
|
CONNECTED ──(all devices lost)──────────────────────► DISCONNECTED
|
||||||
|
CONNECTED ──(device set changed)────────────────────► RESTARTING
|
||||||
|
RESTARTING ──(hold_s elapsed, devices present)───────► CONNECTED
|
||||||
|
RESTARTING ──(hold_s elapsed, no devices)────────────► DISCONNECTED
|
||||||
|
|
||||||
|
Initial state is set by the first scan at startup (no "restarting" on boot).
|
||||||
|
|
||||||
|
Publications
|
||||||
|
────────────
|
||||||
|
/saltybot/camera_status std_msgs/String — "connected" | "disconnected"
|
||||||
|
| "restarting"
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
──────────
|
||||||
|
video_glob (str, "/dev/video*") glob for device discovery
|
||||||
|
poll_rate (float, 2.0) scan frequency (Hz)
|
||||||
|
restart_grace_s (float, 5.0) reconnect window; if a camera
|
||||||
|
reappears within this many seconds
|
||||||
|
of being lost → "restarting"
|
||||||
|
restart_hold_s (float, 2.0) how long to hold "restarting"
|
||||||
|
publish_always (bool, False) publish on every tick, not only
|
||||||
|
on state change
|
||||||
|
output_topic (str, "/saltybot/camera_status")
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import glob as _glob_mod
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import FrozenSet, Tuple
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
|
||||||
|
# ── Status constants ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
STATUS_CONNECTED = "connected"
|
||||||
|
STATUS_DISCONNECTED = "disconnected"
|
||||||
|
STATUS_RESTARTING = "restarting"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure helpers ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def scan_video_devices(video_glob: str) -> FrozenSet[str]:
|
||||||
|
"""Return the frozenset of /dev/video* paths currently present.
|
||||||
|
|
||||||
|
Catches permission / OS errors and returns an empty frozenset.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return frozenset(_glob_mod.glob(video_glob))
|
||||||
|
except Exception:
|
||||||
|
return frozenset()
|
||||||
|
|
||||||
|
|
||||||
|
def compute_transition(
|
||||||
|
state: str,
|
||||||
|
prev_devices: FrozenSet[str],
|
||||||
|
curr_devices: FrozenSet[str],
|
||||||
|
now: float,
|
||||||
|
disconnect_time: float,
|
||||||
|
restart_until: float,
|
||||||
|
restart_grace_s: float,
|
||||||
|
restart_hold_s: float,
|
||||||
|
) -> Tuple[str, float, float]:
|
||||||
|
"""Pure state-machine step — no I/O, no ROS.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
state : current status string
|
||||||
|
prev_devices : device set from the previous scan
|
||||||
|
curr_devices : device set from the current scan
|
||||||
|
now : current monotonic time (s)
|
||||||
|
disconnect_time : monotonic time of the last CONNECTED → DISCONNECTED
|
||||||
|
transition (0.0 = never disconnected)
|
||||||
|
restart_until : monotonic time at which RESTARTING resolves
|
||||||
|
restart_grace_s : reconnect window that triggers RESTARTING
|
||||||
|
restart_hold_s : duration to hold RESTARTING before resolving
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(new_state, new_disconnect_time, new_restart_until)
|
||||||
|
"""
|
||||||
|
if state == STATUS_RESTARTING:
|
||||||
|
if now >= restart_until:
|
||||||
|
if curr_devices:
|
||||||
|
return STATUS_CONNECTED, disconnect_time, restart_until
|
||||||
|
return STATUS_DISCONNECTED, now, restart_until
|
||||||
|
return STATUS_RESTARTING, disconnect_time, restart_until
|
||||||
|
|
||||||
|
if state == STATUS_CONNECTED:
|
||||||
|
if not curr_devices:
|
||||||
|
# All cameras lost
|
||||||
|
return STATUS_DISCONNECTED, now, restart_until
|
||||||
|
if curr_devices != prev_devices:
|
||||||
|
# Hot-swap or device renumbering
|
||||||
|
return STATUS_RESTARTING, disconnect_time, now + restart_hold_s
|
||||||
|
return STATUS_CONNECTED, disconnect_time, restart_until
|
||||||
|
|
||||||
|
# STATUS_DISCONNECTED
|
||||||
|
if curr_devices:
|
||||||
|
# Reconnect: check if we're inside the grace window
|
||||||
|
if disconnect_time > 0.0 and (now - disconnect_time) <= restart_grace_s:
|
||||||
|
return STATUS_RESTARTING, disconnect_time, now + restart_hold_s
|
||||||
|
return STATUS_CONNECTED, disconnect_time, restart_until
|
||||||
|
return STATUS_DISCONNECTED, disconnect_time, restart_until
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 node ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class CameraHotplugNode(Node):
|
||||||
|
"""Monitors /dev/video* and publishes camera connection status."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("camera_hotplug_node")
|
||||||
|
|
||||||
|
self.declare_parameter("video_glob", "/dev/video*")
|
||||||
|
self.declare_parameter("poll_rate", 2.0)
|
||||||
|
self.declare_parameter("restart_grace_s", 5.0)
|
||||||
|
self.declare_parameter("restart_hold_s", 2.0)
|
||||||
|
self.declare_parameter("publish_always", False)
|
||||||
|
self.declare_parameter("output_topic", "/saltybot/camera_status")
|
||||||
|
|
||||||
|
self._glob = self.get_parameter("video_glob").value
|
||||||
|
poll_rate = float(self.get_parameter("poll_rate").value)
|
||||||
|
self._grace_s = float(self.get_parameter("restart_grace_s").value)
|
||||||
|
self._hold_s = float(self.get_parameter("restart_hold_s").value)
|
||||||
|
self._pub_always = bool(self.get_parameter("publish_always").value)
|
||||||
|
output_topic = self.get_parameter("output_topic").value
|
||||||
|
|
||||||
|
# Injected scan function — allows unit-test mocking
|
||||||
|
self._scan_fn = scan_video_devices
|
||||||
|
|
||||||
|
# Perform initial scan (before any lock or timer)
|
||||||
|
initial = self._scan_fn(self._glob)
|
||||||
|
self._state = STATUS_CONNECTED if initial else STATUS_DISCONNECTED
|
||||||
|
self._prev_devices = initial
|
||||||
|
self._disconnect_time: float = 0.0
|
||||||
|
self._restart_until: float = 0.0
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
qos = QoSProfile(depth=10)
|
||||||
|
self._pub = self.create_publisher(String, output_topic, qos)
|
||||||
|
self._timer = self.create_timer(1.0 / poll_rate, self._scan_cb)
|
||||||
|
|
||||||
|
# Publish initial state
|
||||||
|
self._publish(self._state)
|
||||||
|
|
||||||
|
n_dev = len(initial)
|
||||||
|
self.get_logger().info(
|
||||||
|
f"CameraHotplugNode ready — initial state: {self._state} "
|
||||||
|
f"({n_dev} device(s): {sorted(initial)})"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Scan callback ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _scan_cb(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
curr = self._scan_fn(self._glob)
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
prev = self._prev_devices
|
||||||
|
state = self._state
|
||||||
|
disc_t = self._disconnect_time
|
||||||
|
rup = self._restart_until
|
||||||
|
|
||||||
|
new_state, new_disc_t, new_rup = compute_transition(
|
||||||
|
state, prev, curr, now, disc_t, rup,
|
||||||
|
self._grace_s, self._hold_s,
|
||||||
|
)
|
||||||
|
|
||||||
|
changed = new_state != state
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._prev_devices = curr
|
||||||
|
self._state = new_state
|
||||||
|
self._disconnect_time = new_disc_t
|
||||||
|
self._restart_until = new_rup
|
||||||
|
|
||||||
|
if changed:
|
||||||
|
self.get_logger().info(
|
||||||
|
f"CameraHotplug: {state!r} → {new_state!r} "
|
||||||
|
f"(devices: {sorted(curr) or 'none'})"
|
||||||
|
)
|
||||||
|
|
||||||
|
if changed or self._pub_always:
|
||||||
|
self._publish(new_state)
|
||||||
|
|
||||||
|
# ── Publish ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish(self, status: str) -> None:
|
||||||
|
msg = String()
|
||||||
|
msg.data = status
|
||||||
|
self._pub.publish(msg)
|
||||||
|
|
||||||
|
# ── Public accessors (for tests / introspection) ─────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self) -> str:
|
||||||
|
with self._lock:
|
||||||
|
return self._state
|
||||||
|
|
||||||
|
@property
|
||||||
|
def devices(self) -> FrozenSet[str]:
|
||||||
|
with self._lock:
|
||||||
|
return self._prev_devices
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = CameraHotplugNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
@ -57,8 +57,10 @@ setup(
|
|||||||
'topic_memory_node = saltybot_social.topic_memory_node:main',
|
'topic_memory_node = saltybot_social.topic_memory_node:main',
|
||||||
# Personal space respector (Issue #310)
|
# Personal space respector (Issue #310)
|
||||||
'personal_space_node = saltybot_social.personal_space_node:main',
|
'personal_space_node = saltybot_social.personal_space_node:main',
|
||||||
# Audio wake-word detector — 'hey salty' (Issue #320)
|
# Audio wake-word detector — 'hey salty'
|
||||||
'wake_word_node = saltybot_social.wake_word_node:main',
|
'wake_word_node = saltybot_social.wake_word_node:main',
|
||||||
|
# USB camera hot-plug monitor (Issue #320)
|
||||||
|
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
630
jetson/ros2_ws/src/saltybot_social/test/test_camera_hotplug.py
Normal file
630
jetson/ros2_ws/src/saltybot_social/test/test_camera_hotplug.py
Normal file
@ -0,0 +1,630 @@
|
|||||||
|
"""test_camera_hotplug.py — Offline tests for camera_hotplug_node (Issue #320).
|
||||||
|
|
||||||
|
Stubs out rclpy and ROS message types so tests run without a ROS install.
|
||||||
|
Filesystem calls are avoided by injecting a mock scan function.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_ros_stubs():
|
||||||
|
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
|
||||||
|
"std_msgs", "std_msgs.msg"):
|
||||||
|
if mod_name not in sys.modules:
|
||||||
|
sys.modules[mod_name] = types.ModuleType(mod_name)
|
||||||
|
|
||||||
|
class _Node:
|
||||||
|
def __init__(self, name="node"):
|
||||||
|
self._name = name
|
||||||
|
if not hasattr(self, "_params"):
|
||||||
|
self._params = {}
|
||||||
|
self._pubs = {}
|
||||||
|
self._subs = {}
|
||||||
|
self._logs = []
|
||||||
|
self._timers = []
|
||||||
|
|
||||||
|
def declare_parameter(self, name, default):
|
||||||
|
if name not in self._params:
|
||||||
|
self._params[name] = default
|
||||||
|
|
||||||
|
def get_parameter(self, name):
|
||||||
|
class _P:
|
||||||
|
def __init__(self, v): self.value = v
|
||||||
|
return _P(self._params.get(name))
|
||||||
|
|
||||||
|
def create_publisher(self, msg_type, topic, qos):
|
||||||
|
pub = _FakePub()
|
||||||
|
self._pubs[topic] = pub
|
||||||
|
return pub
|
||||||
|
|
||||||
|
def create_subscription(self, msg_type, topic, cb, qos):
|
||||||
|
self._subs[topic] = cb
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def create_timer(self, period, cb):
|
||||||
|
self._timers.append(cb)
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def get_logger(self):
|
||||||
|
node = self
|
||||||
|
class _L:
|
||||||
|
def info(self, m): node._logs.append(("INFO", m))
|
||||||
|
def warn(self, m): node._logs.append(("WARN", m))
|
||||||
|
def error(self, m): node._logs.append(("ERROR", m))
|
||||||
|
return _L()
|
||||||
|
|
||||||
|
def destroy_node(self): pass
|
||||||
|
|
||||||
|
class _FakePub:
|
||||||
|
def __init__(self):
|
||||||
|
self.msgs = []
|
||||||
|
def publish(self, msg):
|
||||||
|
self.msgs.append(msg)
|
||||||
|
|
||||||
|
class _QoSProfile:
|
||||||
|
def __init__(self, depth=10): self.depth = depth
|
||||||
|
|
||||||
|
class _String:
|
||||||
|
def __init__(self): self.data = ""
|
||||||
|
|
||||||
|
rclpy_mod = sys.modules["rclpy"]
|
||||||
|
rclpy_mod.init = lambda args=None: None
|
||||||
|
rclpy_mod.spin = lambda node: None
|
||||||
|
rclpy_mod.shutdown = lambda: None
|
||||||
|
|
||||||
|
sys.modules["rclpy.node"].Node = _Node
|
||||||
|
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
|
||||||
|
sys.modules["std_msgs.msg"].String = _String
|
||||||
|
|
||||||
|
return _Node, _FakePub, _String
|
||||||
|
|
||||||
|
|
||||||
|
_Node, _FakePub, _String = _make_ros_stubs()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Module loader ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_SRC = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/saltybot_social/camera_hotplug_node.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_mod():
|
||||||
|
spec = importlib.util.spec_from_file_location("camera_hotplug_testmod", _SRC)
|
||||||
|
mod = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(mod)
|
||||||
|
return mod
|
||||||
|
|
||||||
|
|
||||||
|
def _make_node(mod, initial_devices=frozenset(), **kwargs):
|
||||||
|
"""Construct a CameraHotplugNode with a mocked scan function."""
|
||||||
|
node = mod.CameraHotplugNode.__new__(mod.CameraHotplugNode)
|
||||||
|
defaults = {
|
||||||
|
"video_glob": "/dev/video*",
|
||||||
|
"poll_rate": 2.0,
|
||||||
|
"restart_grace_s": 5.0,
|
||||||
|
"restart_hold_s": 2.0,
|
||||||
|
"publish_always": False,
|
||||||
|
"output_topic": "/saltybot/camera_status",
|
||||||
|
}
|
||||||
|
defaults.update(kwargs)
|
||||||
|
node._params = dict(defaults)
|
||||||
|
|
||||||
|
# Patch scan so the initial scan (inside __init__) returns initial_devices
|
||||||
|
orig = mod.scan_video_devices
|
||||||
|
mod.scan_video_devices = lambda g: frozenset(initial_devices)
|
||||||
|
try:
|
||||||
|
mod.CameraHotplugNode.__init__(node)
|
||||||
|
finally:
|
||||||
|
mod.scan_video_devices = orig
|
||||||
|
|
||||||
|
# Keep using the mock for subsequent _scan_cb calls
|
||||||
|
node._scan_fn = lambda g: frozenset(initial_devices)
|
||||||
|
return node
|
||||||
|
|
||||||
|
|
||||||
|
def _set_scan(node, devices):
|
||||||
|
"""Replace node._scan_fn to return a fixed device set."""
|
||||||
|
node._scan_fn = lambda g: frozenset(devices)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: pure helpers ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestScanVideoDevices(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_returns_frozenset(self):
|
||||||
|
result = self.mod.scan_video_devices("/dev/video*")
|
||||||
|
self.assertIsInstance(result, frozenset)
|
||||||
|
|
||||||
|
def test_nonexistent_glob_returns_empty(self):
|
||||||
|
result = self.mod.scan_video_devices("/nonexistent_path_xyz_abc/video*")
|
||||||
|
self.assertEqual(result, frozenset())
|
||||||
|
|
||||||
|
def test_empty_glob_string(self):
|
||||||
|
result = self.mod.scan_video_devices("")
|
||||||
|
self.assertIsInstance(result, frozenset)
|
||||||
|
|
||||||
|
|
||||||
|
class TestStatusConstants(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_connected_string(self):
|
||||||
|
self.assertEqual(self.mod.STATUS_CONNECTED, "connected")
|
||||||
|
|
||||||
|
def test_disconnected_string(self):
|
||||||
|
self.assertEqual(self.mod.STATUS_DISCONNECTED, "disconnected")
|
||||||
|
|
||||||
|
def test_restarting_string(self):
|
||||||
|
self.assertEqual(self.mod.STATUS_RESTARTING, "restarting")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: compute_transition ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestComputeTransition(unittest.TestCase):
|
||||||
|
"""Tests for the pure state-machine function."""
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def _tr(self, state, prev, curr, now=100.0,
|
||||||
|
disc_t=0.0, restart_until=0.0, grace=5.0, hold=2.0):
|
||||||
|
return self.mod.compute_transition(
|
||||||
|
state, frozenset(prev), frozenset(curr),
|
||||||
|
now, disc_t, restart_until, grace, hold,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── DISCONNECTED transitions ──────────────────────────────────────
|
||||||
|
|
||||||
|
def test_disc_no_devices_stays_disconnected(self):
|
||||||
|
s, _, _ = self._tr("disconnected", [], [])
|
||||||
|
self.assertEqual(s, "disconnected")
|
||||||
|
|
||||||
|
def test_disc_devices_appear_fresh_connect(self):
|
||||||
|
"""No prior disconnect → fresh connect, not restarting."""
|
||||||
|
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
|
||||||
|
disc_t=0.0, now=100.0)
|
||||||
|
self.assertEqual(s, "connected")
|
||||||
|
|
||||||
|
def test_disc_devices_appear_within_grace(self):
|
||||||
|
"""Reappears within grace window → restarting."""
|
||||||
|
# disconnected at t=95, now=t=99, grace=5 → 4 s elapsed → within
|
||||||
|
s, _, rup = self._tr("disconnected", [], ["/dev/video0"],
|
||||||
|
now=99.0, disc_t=95.0, grace=5.0, hold=2.0)
|
||||||
|
self.assertEqual(s, "restarting")
|
||||||
|
self.assertAlmostEqual(rup, 99.0 + 2.0)
|
||||||
|
|
||||||
|
def test_disc_devices_appear_outside_grace(self):
|
||||||
|
"""Reappears after grace window → connected directly."""
|
||||||
|
# disconnected at t=80, now=100, grace=5 → 20 s elapsed → outside
|
||||||
|
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
|
||||||
|
now=100.0, disc_t=80.0, grace=5.0)
|
||||||
|
self.assertEqual(s, "connected")
|
||||||
|
|
||||||
|
def test_disc_devices_appear_exactly_at_grace(self):
|
||||||
|
"""At exactly grace_s seconds the window is still inclusive → restarting."""
|
||||||
|
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
|
||||||
|
now=100.0, disc_t=95.0, grace=5.0)
|
||||||
|
# now - disc_t = 5.0, grace = 5.0 → 5.0 <= 5.0 is True → restarting
|
||||||
|
self.assertEqual(s, "restarting")
|
||||||
|
|
||||||
|
# ── CONNECTED transitions ─────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_conn_same_devices_stays_connected(self):
|
||||||
|
s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video0"])
|
||||||
|
self.assertEqual(s, "connected")
|
||||||
|
|
||||||
|
def test_conn_no_devices_disconnects(self):
|
||||||
|
s, disc_t, _ = self._tr("connected", ["/dev/video0"], [], now=200.0)
|
||||||
|
self.assertEqual(s, "disconnected")
|
||||||
|
self.assertAlmostEqual(disc_t, 200.0)
|
||||||
|
|
||||||
|
def test_conn_device_added_restarting(self):
|
||||||
|
"""New device appears alongside existing → restarting."""
|
||||||
|
s, _, rup = self._tr("connected",
|
||||||
|
["/dev/video0"],
|
||||||
|
["/dev/video0", "/dev/video1"],
|
||||||
|
now=50.0, hold=2.0)
|
||||||
|
self.assertEqual(s, "restarting")
|
||||||
|
self.assertAlmostEqual(rup, 52.0)
|
||||||
|
|
||||||
|
def test_conn_device_replaced_restarting(self):
|
||||||
|
"""Different device path → restarting."""
|
||||||
|
s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video2"])
|
||||||
|
self.assertEqual(s, "restarting")
|
||||||
|
|
||||||
|
def test_conn_device_removed_partial_restarting(self):
|
||||||
|
"""One of multiple devices lost (not all) → restarting."""
|
||||||
|
s, _, _ = self._tr("connected",
|
||||||
|
["/dev/video0", "/dev/video1"],
|
||||||
|
["/dev/video0"])
|
||||||
|
self.assertEqual(s, "restarting")
|
||||||
|
|
||||||
|
def test_conn_multiple_same_stays_connected(self):
|
||||||
|
devs = ["/dev/video0", "/dev/video1"]
|
||||||
|
s, _, _ = self._tr("connected", devs, devs)
|
||||||
|
self.assertEqual(s, "connected")
|
||||||
|
|
||||||
|
# ── RESTARTING transitions ────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_restarting_hold_not_elapsed_stays(self):
|
||||||
|
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
|
||||||
|
now=50.0, restart_until=52.0)
|
||||||
|
self.assertEqual(s, "restarting")
|
||||||
|
|
||||||
|
def test_restarting_hold_elapsed_with_devices_connected(self):
|
||||||
|
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
|
||||||
|
now=55.0, restart_until=52.0)
|
||||||
|
self.assertEqual(s, "connected")
|
||||||
|
|
||||||
|
def test_restarting_hold_elapsed_no_devices_disconnected(self):
|
||||||
|
s, disc_t, _ = self._tr("restarting", ["/dev/video0"], [],
|
||||||
|
now=55.0, restart_until=52.0)
|
||||||
|
self.assertEqual(s, "disconnected")
|
||||||
|
self.assertAlmostEqual(disc_t, 55.0)
|
||||||
|
|
||||||
|
def test_restarting_at_exact_hold_elapsed(self):
|
||||||
|
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
|
||||||
|
now=52.0, restart_until=52.0)
|
||||||
|
# now >= restart_until → resolves
|
||||||
|
self.assertEqual(s, "connected")
|
||||||
|
|
||||||
|
def test_restarting_preserves_disconnect_time(self):
|
||||||
|
_, disc_t, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
|
||||||
|
now=50.0, disc_t=40.0, restart_until=49.0)
|
||||||
|
# hold elapsed → connected; disconnect_time unchanged
|
||||||
|
self.assertAlmostEqual(disc_t, 40.0)
|
||||||
|
|
||||||
|
# ── Return value shapes ───────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_returns_tuple_of_three(self):
|
||||||
|
result = self._tr("disconnected", [], [])
|
||||||
|
self.assertEqual(len(result), 3)
|
||||||
|
|
||||||
|
def test_new_state_is_string(self):
|
||||||
|
s, _, _ = self._tr("disconnected", [], [])
|
||||||
|
self.assertIsInstance(s, str)
|
||||||
|
|
||||||
|
def test_disconnect_time_float(self):
|
||||||
|
_, dt, _ = self._tr("connected", ["/dev/video0"], [], now=42.5)
|
||||||
|
self.assertIsInstance(dt, float)
|
||||||
|
|
||||||
|
def test_restart_until_float(self):
|
||||||
|
_, _, rup = self._tr("disconnected", [], ["/dev/video0"],
|
||||||
|
disc_t=99.0, now=100.0, grace=5.0, hold=3.0)
|
||||||
|
self.assertIsInstance(rup, float)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: node init ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNodeInit(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_instantiates_no_devices(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
self.assertIsNotNone(node)
|
||||||
|
|
||||||
|
def test_instantiates_with_devices(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"])
|
||||||
|
self.assertIsNotNone(node)
|
||||||
|
|
||||||
|
def test_initial_state_disconnected(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
self.assertEqual(node.state, "disconnected")
|
||||||
|
|
||||||
|
def test_initial_state_connected(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"])
|
||||||
|
self.assertEqual(node.state, "connected")
|
||||||
|
|
||||||
|
def test_publishes_initial_state(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
pub = node._pubs["/saltybot/camera_status"]
|
||||||
|
self.assertEqual(len(pub.msgs), 1)
|
||||||
|
self.assertEqual(pub.msgs[0].data, "disconnected")
|
||||||
|
|
||||||
|
def test_publishes_initial_connected(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"])
|
||||||
|
pub = node._pubs["/saltybot/camera_status"]
|
||||||
|
self.assertEqual(pub.msgs[0].data, "connected")
|
||||||
|
|
||||||
|
def test_publisher_registered(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/saltybot/camera_status", node._pubs)
|
||||||
|
|
||||||
|
def test_timer_registered(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertGreater(len(node._timers), 0)
|
||||||
|
|
||||||
|
def test_custom_output_topic(self):
|
||||||
|
node = _make_node(self.mod, output_topic="/my/cam_status")
|
||||||
|
self.assertIn("/my/cam_status", node._pubs)
|
||||||
|
|
||||||
|
def test_devices_property(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"])
|
||||||
|
self.assertEqual(node.devices, frozenset(["/dev/video0"]))
|
||||||
|
|
||||||
|
def test_no_devices_property_empty(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
self.assertEqual(node.devices, frozenset())
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: _scan_cb state transitions ────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestScanCb(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def _pub(self, node):
|
||||||
|
return node._pubs["/saltybot/camera_status"]
|
||||||
|
|
||||||
|
def test_no_change_no_publish(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
pub = self._pub(node)
|
||||||
|
initial_count = len(pub.msgs)
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(len(pub.msgs), initial_count)
|
||||||
|
|
||||||
|
def test_device_appears_publishes_connected(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(self._pub(node).msgs[-1].data, "connected")
|
||||||
|
|
||||||
|
def test_device_lost_publishes_disconnected(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"])
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(self._pub(node).msgs[-1].data, "disconnected")
|
||||||
|
|
||||||
|
def test_device_replaced_publishes_restarting(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"])
|
||||||
|
_set_scan(node, ["/dev/video2"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(self._pub(node).msgs[-1].data, "restarting")
|
||||||
|
|
||||||
|
def test_reconnect_within_grace_publishes_restarting(self):
|
||||||
|
"""Simulate: connected → disconnected → reconnected within grace."""
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"],
|
||||||
|
restart_grace_s=10.0)
|
||||||
|
# Simulate disconnect
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "disconnected")
|
||||||
|
# Quick reconnect — disconnect_time was just set, well within grace
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "restarting")
|
||||||
|
self.assertEqual(self._pub(node).msgs[-1].data, "restarting")
|
||||||
|
|
||||||
|
def test_restarting_resolves_to_connected_after_hold(self):
|
||||||
|
"""After hold expires, restarting → connected."""
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"],
|
||||||
|
restart_hold_s=0.0) # instant hold
|
||||||
|
# Device changes → restarting
|
||||||
|
_set_scan(node, ["/dev/video2"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "restarting")
|
||||||
|
# Next tick: hold = 0.0 → restart_until is in the past → resolves
|
||||||
|
_set_scan(node, ["/dev/video2"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "connected")
|
||||||
|
|
||||||
|
def test_restarting_resolves_to_disconnected_when_no_device(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=["/dev/video0"],
|
||||||
|
restart_hold_s=0.0)
|
||||||
|
_set_scan(node, ["/dev/video2"])
|
||||||
|
node._scan_cb() # → restarting
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb() # → disconnected
|
||||||
|
self.assertEqual(node.state, "disconnected")
|
||||||
|
|
||||||
|
def test_publish_always_publishes_every_tick(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[], publish_always=True)
|
||||||
|
pub = self._pub(node)
|
||||||
|
before = len(pub.msgs)
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb()
|
||||||
|
node._scan_cb()
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(len(pub.msgs), before + 3)
|
||||||
|
|
||||||
|
def test_no_publish_always_skips_unchanged(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[], publish_always=False)
|
||||||
|
pub = self._pub(node)
|
||||||
|
before = len(pub.msgs)
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb()
|
||||||
|
node._scan_cb()
|
||||||
|
# State unchanged → no additional publishes
|
||||||
|
self.assertEqual(len(pub.msgs), before)
|
||||||
|
|
||||||
|
def test_state_transitions_logged(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
infos = [m for lvl, m in node._logs if lvl == "INFO"]
|
||||||
|
self.assertTrue(any("connected" in m.lower() for m in infos))
|
||||||
|
|
||||||
|
def test_multiple_devices_connected(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
_set_scan(node, ["/dev/video0", "/dev/video1"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "connected")
|
||||||
|
|
||||||
|
def test_partial_loss_restarting(self):
|
||||||
|
node = _make_node(self.mod,
|
||||||
|
initial_devices=["/dev/video0", "/dev/video1"])
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "restarting")
|
||||||
|
|
||||||
|
def test_state_property_updated(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
self.assertEqual(node.state, "disconnected")
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "connected")
|
||||||
|
|
||||||
|
def test_devices_property_updated(self):
|
||||||
|
node = _make_node(self.mod, initial_devices=[])
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertIn("/dev/video0", node.devices)
|
||||||
|
|
||||||
|
def test_full_lifecycle(self):
|
||||||
|
"""Full cycle: disconnected → connected → restarting → disconnected."""
|
||||||
|
node = _make_node(self.mod, initial_devices=[],
|
||||||
|
restart_grace_s=60.0, restart_hold_s=0.0)
|
||||||
|
pub = self._pub(node)
|
||||||
|
|
||||||
|
# Boot: disconnected
|
||||||
|
self.assertEqual(node.state, "disconnected")
|
||||||
|
|
||||||
|
# Camera plugged in
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "connected")
|
||||||
|
|
||||||
|
# Camera hot-unplugged then immediately replugged
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "disconnected")
|
||||||
|
|
||||||
|
_set_scan(node, ["/dev/video0"])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "restarting")
|
||||||
|
|
||||||
|
# Hold expires (hold_s=0) → resolves to connected
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "connected")
|
||||||
|
|
||||||
|
# Camera fully removed
|
||||||
|
_set_scan(node, [])
|
||||||
|
node._scan_cb()
|
||||||
|
self.assertEqual(node.state, "disconnected")
|
||||||
|
|
||||||
|
status_sequence = [m.data for m in pub.msgs]
|
||||||
|
# Must contain all three statuses in the sequence
|
||||||
|
self.assertIn("connected", status_sequence)
|
||||||
|
self.assertIn("disconnected", status_sequence)
|
||||||
|
self.assertIn("restarting", status_sequence)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: _publish ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestPublish(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.node = _make_node(self.mod, initial_devices=[])
|
||||||
|
self.pub = self.node._pubs["/saltybot/camera_status"]
|
||||||
|
|
||||||
|
def test_publish_connected(self):
|
||||||
|
self.node._publish("connected")
|
||||||
|
self.assertEqual(self.pub.msgs[-1].data, "connected")
|
||||||
|
|
||||||
|
def test_publish_disconnected(self):
|
||||||
|
self.node._publish("disconnected")
|
||||||
|
self.assertEqual(self.pub.msgs[-1].data, "disconnected")
|
||||||
|
|
||||||
|
def test_publish_restarting(self):
|
||||||
|
self.node._publish("restarting")
|
||||||
|
self.assertEqual(self.pub.msgs[-1].data, "restarting")
|
||||||
|
|
||||||
|
def test_publish_increments_msg_count(self):
|
||||||
|
before = len(self.pub.msgs)
|
||||||
|
self.node._publish("connected")
|
||||||
|
self.assertEqual(len(self.pub.msgs), before + 1)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: source content ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNodeSrc(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
with open(_SRC) as f: cls.src = f.read()
|
||||||
|
|
||||||
|
def test_issue_tag(self): self.assertIn("#320", self.src)
|
||||||
|
def test_output_topic(self): self.assertIn("/saltybot/camera_status", self.src)
|
||||||
|
def test_dev_video_glob(self): self.assertIn("/dev/video*", self.src)
|
||||||
|
def test_status_connected(self): self.assertIn('"connected"', self.src)
|
||||||
|
def test_status_disconnected(self): self.assertIn('"disconnected"', self.src)
|
||||||
|
def test_status_restarting(self): self.assertIn('"restarting"', self.src)
|
||||||
|
def test_compute_transition_fn(self): self.assertIn("compute_transition", self.src)
|
||||||
|
def test_scan_video_devices_fn(self): self.assertIn("scan_video_devices", self.src)
|
||||||
|
def test_restart_grace_param(self): self.assertIn("restart_grace_s", self.src)
|
||||||
|
def test_restart_hold_param(self): self.assertIn("restart_hold_s", self.src)
|
||||||
|
def test_poll_rate_param(self): self.assertIn("poll_rate", self.src)
|
||||||
|
def test_publish_always_param(self): self.assertIn("publish_always", self.src)
|
||||||
|
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
|
||||||
|
def test_glob_used(self): self.assertIn("glob", self.src)
|
||||||
|
def test_main_defined(self): self.assertIn("def main", self.src)
|
||||||
|
def test_scan_fn_injectable(self): self.assertIn("_scan_fn", self.src)
|
||||||
|
def test_string_msg(self): self.assertIn("String", self.src)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: config / launch / setup ────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestConfig(unittest.TestCase):
|
||||||
|
_CONFIG = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/config/camera_hotplug_params.yaml"
|
||||||
|
)
|
||||||
|
_LAUNCH = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/launch/camera_hotplug.launch.py"
|
||||||
|
)
|
||||||
|
_SETUP = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/setup.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_config_exists(self):
|
||||||
|
import os; self.assertTrue(os.path.exists(self._CONFIG))
|
||||||
|
|
||||||
|
def test_config_video_glob(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("video_glob", c)
|
||||||
|
|
||||||
|
def test_config_poll_rate(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("poll_rate", c)
|
||||||
|
|
||||||
|
def test_config_restart_grace(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("restart_grace_s", c)
|
||||||
|
|
||||||
|
def test_config_restart_hold(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("restart_hold_s", c)
|
||||||
|
|
||||||
|
def test_launch_exists(self):
|
||||||
|
import os; self.assertTrue(os.path.exists(self._LAUNCH))
|
||||||
|
|
||||||
|
def test_launch_video_glob_arg(self):
|
||||||
|
with open(self._LAUNCH) as f: c = f.read()
|
||||||
|
self.assertIn("video_glob", c)
|
||||||
|
|
||||||
|
def test_launch_restart_grace_arg(self):
|
||||||
|
with open(self._LAUNCH) as f: c = f.read()
|
||||||
|
self.assertIn("restart_grace_s", c)
|
||||||
|
|
||||||
|
def test_entry_point_in_setup(self):
|
||||||
|
with open(self._SETUP) as f: c = f.read()
|
||||||
|
self.assertIn("camera_hotplug_node", c)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user