Compare commits

...

5 Commits

Author SHA1 Message Date
4dbb4c6f0d feat(perception): appearance-based person re-identification (Issue #322)
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 12s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Adds PersonTrack/PersonTrackArray msgs and a PersonReidNode that matches
individuals across camera views using HSV colour histogram appearance
features and cosine similarity, with EMA gallery update and 30s stale timeout.

New messages (saltybot_scene_msgs):
  msg/PersonTrack.msg        — track_id, camera_id, bbox, confidence,
                               first_seen, last_seen, is_stale
  msg/PersonTrackArray.msg   — array wrapper with header

New files (saltybot_bringup):
  saltybot_bringup/_person_reid.py    — pure kinematics (no ROS2 deps)
    extract_hsv_histogram()  2-D HS histogram (H=16, S=8 → 128-dim, L2-norm)
    cosine_similarity()      handles zero/non-unit vectors
    match_track()            best gallery match above threshold (strict >)
    TrackGallery             add/update/match/mark_stale/prune_stale
    TrackEntry               mutable dataclass; EMA feature blend (α=0.3)
  saltybot_bringup/person_reid_node.py
    Subscribes /camera/color/image_raw + /saltybot/scene/objects (BEST_EFFORT)
    Crops COCO person (class_id=0) ROIs; extracts features; matches gallery
    Publishes PersonTrackArray on /saltybot/person_tracks at 5 Hz
    Parameters: camera_id, similarity_threshold=0.75, stale_timeout_s=30,
                max_tracks=20, publish_hz=5.0
  test/test_person_reid.py   — 50 tests, all passing

Modified:
  saltybot_scene_msgs/CMakeLists.txt  — register PersonTrack/Array msgs
  saltybot_bringup/setup.py           — add person_reid console_script

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 06:45:43 -05:00
f506c89960 Merge pull request 'feat(social): USB camera hot-plug monitor (Issue #320)' (#328) from sl-jetson/issue-320-camera-hotplug into main
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 4s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled
2026-03-03 06:45:23 -05:00
00c94cfe0d Merge pull request 'feat: Add battery-aware speed limiter ROS2 node (Issue #321)' (#327) from sl-controls/issue-321-battery-speed into main 2026-03-03 06:45:18 -05:00
a3386e1694 feat(social): USB camera hot-plug monitor (Issue #320)
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 2s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 5s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Polls /dev/video* at 2 Hz, drives a three-state machine
(connected/disconnected/restarting) and publishes to
/saltybot/camera_status (std_msgs/String).  Reconnects within
restart_grace_s (5 s) → 'restarting' held for restart_hold_s (2 s)
to signal downstream capture pipelines to restart.  Scan function
is injected for offline testing. 82/82 tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 00:47:27 -05:00
45828b1cc2 feat: Add battery-aware speed limiter ROS2 node (Issue #321)
Implements saltybot_battery_speed_limiter package:
- Subscribes to /saltybot/battery_state, publishes speed limit factor
- Battery percentage thresholds: 100-50%=1.0, 50-25%=0.7, 25-15%=0.4, <15%=0.0
- Reduces speed to preserve battery when running low
- Automatically stops movement below 15% to prevent deep discharge
- Configurable thresholds and speed factors
- Comprehensive test suite: 30+ tests covering all threshold transitions
- Handles both direct percentage and voltage-based fallback calculation

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-03-03 00:46:12 -05:00
22 changed files with 2550 additions and 1 deletions

View File

@ -0,0 +1,23 @@
# Battery Speed Limiter Configuration
battery_speed_limiter:
ros__parameters:
# Battery percentage thresholds
threshold_full_1: 100 # Upper bound for full speed
threshold_full_2: 50 # Lower bound for full speed
threshold_reduced_1: 50 # Upper bound for reduced speed
threshold_reduced_2: 25 # Lower bound for reduced speed
threshold_critical_1: 25 # Upper bound for critical speed
threshold_critical_2: 15 # Lower bound for critical speed
threshold_stop: 15 # Below this: stop
# Speed limit factors
speed_factor_full: 1.0 # 100-50%: full speed
speed_factor_reduced: 0.7 # 50-25%: 70% speed
speed_factor_critical: 0.4 # 25-15%: 40% speed
speed_factor_stop: 0.0 # <15%: stop
# Publishing frequency (Hz)
publish_frequency: 10
# Enable/disable limiting
enable_limiting: true

View File

@ -0,0 +1,28 @@
import os
from launch import LaunchDescription
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from launch.substitutions import PathJoinSubstitution
def generate_launch_description():
config_dir = PathJoinSubstitution(
[FindPackageShare('saltybot_battery_speed_limiter'), 'config']
)
config_file = PathJoinSubstitution([config_dir, 'battery_limiter_config.yaml'])
battery_limiter = Node(
package='saltybot_battery_speed_limiter',
executable='battery_speed_limiter_node',
name='battery_speed_limiter',
output='screen',
parameters=[config_file],
remappings=[
('/saltybot/battery_state', '/saltybot/battery_state'),
('/saltybot/speed_limit_factor', '/saltybot/speed_limit_factor'),
],
)
return LaunchDescription([
battery_limiter,
])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_battery_speed_limiter</name>
<version>0.1.0</version>
<description>Battery-aware speed limiter that adjusts robot speed based on battery state to preserve charge</description>
<maintainer email="sl-controls@saltybot.local">SaltyBot Controls</maintainer>
<license>MIT</license>
<author email="sl-controls@saltybot.local">SaltyBot Controls Team</author>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>ament_cmake_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""Battery-aware speed limiter for SaltyBot.
Subscribes to /saltybot/battery_state and publishes a speed limit factor
on /saltybot/speed_limit_factor based on remaining battery percentage.
Thresholds:
100-50%: 1.0 (full speed)
50-25%: 0.7 (70% speed)
25-15%: 0.4 (40% speed)
<15%: 0.0 (stop, preserve battery)
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
from sensor_msgs.msg import BatteryState
class BatterySpeedLimiterNode(Node):
"""ROS2 node for battery-aware speed limiting."""
def __init__(self):
super().__init__('battery_speed_limiter')
# Parameters
self.declare_parameter('threshold_full_1', 100) # 100%
self.declare_parameter('threshold_full_2', 50) # 50%
self.declare_parameter('threshold_reduced_1', 50) # 50%
self.declare_parameter('threshold_reduced_2', 25) # 25%
self.declare_parameter('threshold_critical_1', 25) # 25%
self.declare_parameter('threshold_critical_2', 15) # 15%
self.declare_parameter('threshold_stop', 15) # <15%
self.declare_parameter('speed_factor_full', 1.0)
self.declare_parameter('speed_factor_reduced', 0.7)
self.declare_parameter('speed_factor_critical', 0.4)
self.declare_parameter('speed_factor_stop', 0.0)
self.declare_parameter('publish_frequency', 10)
self.declare_parameter('enable_limiting', True)
# Read parameters
self.threshold_full_1 = self.get_parameter('threshold_full_1').value
self.threshold_full_2 = self.get_parameter('threshold_full_2').value
self.threshold_reduced_1 = self.get_parameter('threshold_reduced_1').value
self.threshold_reduced_2 = self.get_parameter('threshold_reduced_2').value
self.threshold_critical_1 = self.get_parameter('threshold_critical_1').value
self.threshold_critical_2 = self.get_parameter('threshold_critical_2').value
self.threshold_stop = self.get_parameter('threshold_stop').value
self.speed_factor_full = self.get_parameter('speed_factor_full').value
self.speed_factor_reduced = self.get_parameter('speed_factor_reduced').value
self.speed_factor_critical = self.get_parameter('speed_factor_critical').value
self.speed_factor_stop = self.get_parameter('speed_factor_stop').value
publish_frequency = self.get_parameter('publish_frequency').value
self.enable_limiting = self.get_parameter('enable_limiting').value
# Current battery percentage
self.current_battery_percent = 100.0
# Subscription to battery state
self.sub_battery = self.create_subscription(
BatteryState, '/saltybot/battery_state', self._on_battery_state, 10
)
# Publisher for speed limit factor
self.pub_speed_limit = self.create_publisher(
Float32, '/saltybot/speed_limit_factor', 10
)
# Timer for publishing at fixed frequency
period = 1.0 / publish_frequency
self.timer = self.create_timer(period, self._timer_callback)
self.get_logger().info(
f"Battery speed limiter initialized. "
f"Thresholds: Full={self.threshold_full_2}%, "
f"Reduced={self.threshold_reduced_2}%, "
f"Critical={self.threshold_critical_2}%, "
f"Stop={self.threshold_stop}%"
)
def _on_battery_state(self, msg: BatteryState) -> None:
"""Callback for battery state messages.
Extract percentage from BatteryState message.
Handles both direct percentage field and calculated percentage.
"""
# Try to get percentage from message
if msg.percentage >= 0:
self.current_battery_percent = msg.percentage * 100.0
else:
# Calculate from voltage if percentage not available
# Standard LiPo cells: 3V min, 4.2V max per cell
# Assume 2S (8.4V max) or 3S (12.6V max) pack
# This is a fallback - ideally percentage is provided
if msg.voltage > 0:
# Normalize to 0-1 based on typical LiPo range
min_voltage = 6.0 # 2S minimum
max_voltage = 8.4 # 2S maximum
normalized = (msg.voltage - min_voltage) / (max_voltage - min_voltage)
self.current_battery_percent = max(0.0, min(100.0, normalized * 100.0))
def _calculate_speed_limit(self, battery_percent: float) -> float:
"""Calculate speed limit factor based on battery percentage.
Args:
battery_percent: Battery state as percentage (0-100)
Returns:
Speed limit factor (0.0-1.0)
"""
if not self.enable_limiting:
return 1.0
# Apply thresholds
if battery_percent >= self.threshold_full_2:
# 100-50%: Full speed
return self.speed_factor_full
elif battery_percent >= self.threshold_reduced_2:
# 50-25%: Reduced speed (70%)
return self.speed_factor_reduced
elif battery_percent >= self.threshold_critical_2:
# 25-15%: Critical speed (40%)
return self.speed_factor_critical
else:
# <15%: Stop to preserve battery
return self.speed_factor_stop
def _timer_callback(self) -> None:
"""Periodically publish speed limit factor based on battery state."""
speed_limit = self._calculate_speed_limit(self.current_battery_percent)
msg = Float32(data=speed_limit)
self.pub_speed_limit.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = BatterySpeedLimiterNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_battery_speed_limiter
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,34 @@
from setuptools import setup, find_packages
package_name = 'saltybot_battery_speed_limiter'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/saltybot_battery_speed_limiter']),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/config', ['config/battery_limiter_config.yaml']),
('share/' + package_name + '/launch', ['launch/battery_speed_limiter.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyBot Controls',
author_email='sl-controls@saltybot.local',
maintainer='SaltyBot Controls',
maintainer_email='sl-controls@saltybot.local',
keywords=['ROS2', 'battery', 'speed', 'limiter'],
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3',
'Topic :: Software Development',
],
entry_points={
'console_scripts': [
'battery_speed_limiter_node=saltybot_battery_speed_limiter.battery_speed_limiter_node:main',
],
},
)

View File

@ -0,0 +1,319 @@
"""Unit tests for battery speed limiter node."""
import pytest
from sensor_msgs.msg import BatteryState
from std_msgs.msg import Float32
import rclpy
from saltybot_battery_speed_limiter.battery_speed_limiter_node import (
BatterySpeedLimiterNode,
)
@pytest.fixture
def rclpy_fixture():
"""Initialize and cleanup rclpy."""
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
"""Create a battery speed limiter node instance."""
node = BatterySpeedLimiterNode()
yield node
node.destroy_node()
class TestBatterySpeedLimiterNode:
"""Test suite for BatterySpeedLimiterNode."""
def test_node_initialization(self, node):
"""Test that node initializes with correct defaults."""
assert node.current_battery_percent == 100.0
assert node.enable_limiting is True
assert node.speed_factor_full == 1.0
assert node.speed_factor_reduced == 0.7
assert node.speed_factor_critical == 0.4
assert node.speed_factor_stop == 0.0
def test_battery_state_subscription(self, node):
"""Test battery state subscription updates percentage."""
battery_msg = BatteryState()
battery_msg.percentage = 0.75 # 75% (0-1 range)
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 75.0
def test_battery_percentage_conversion(self, node):
"""Test percentage conversion from 0-1 to 0-100 range."""
battery_msg = BatteryState()
battery_msg.percentage = 0.5 # 50%
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 50.0
def test_battery_zero_percent(self, node):
"""Test zero battery percentage."""
battery_msg = BatteryState()
battery_msg.percentage = 0.0
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 0.0
def test_battery_full_percent(self, node):
"""Test full battery percentage."""
battery_msg = BatteryState()
battery_msg.percentage = 1.0
node._on_battery_state(battery_msg)
assert node.current_battery_percent == 100.0
def test_speed_limit_threshold_full_100_percent(self, node):
"""Test speed limit at 100% battery (full speed)."""
speed_limit = node._calculate_speed_limit(100.0)
assert speed_limit == 1.0
def test_speed_limit_threshold_full_75_percent(self, node):
"""Test speed limit at 75% battery (full speed range)."""
speed_limit = node._calculate_speed_limit(75.0)
assert speed_limit == 1.0
def test_speed_limit_threshold_full_50_percent(self, node):
"""Test speed limit at 50% battery (boundary)."""
speed_limit = node._calculate_speed_limit(50.0)
assert speed_limit == 1.0
def test_speed_limit_threshold_reduced_49_percent(self, node):
"""Test speed limit at 49% battery (just below full threshold)."""
speed_limit = node._calculate_speed_limit(49.0)
assert speed_limit == 0.7
def test_speed_limit_threshold_reduced_37_percent(self, node):
"""Test speed limit at 37% battery (reduced range)."""
speed_limit = node._calculate_speed_limit(37.0)
assert speed_limit == 0.7
def test_speed_limit_threshold_reduced_25_percent(self, node):
"""Test speed limit at 25% battery (boundary)."""
speed_limit = node._calculate_speed_limit(25.0)
assert speed_limit == 0.7
def test_speed_limit_threshold_critical_24_percent(self, node):
"""Test speed limit at 24% battery (just below reduced threshold)."""
speed_limit = node._calculate_speed_limit(24.0)
assert speed_limit == 0.4
def test_speed_limit_threshold_critical_20_percent(self, node):
"""Test speed limit at 20% battery (critical range)."""
speed_limit = node._calculate_speed_limit(20.0)
assert speed_limit == 0.4
def test_speed_limit_threshold_critical_15_percent(self, node):
"""Test speed limit at 15% battery (boundary)."""
speed_limit = node._calculate_speed_limit(15.0)
assert speed_limit == 0.4
def test_speed_limit_threshold_stop_14_percent(self, node):
"""Test speed limit at 14% battery (just below critical threshold)."""
speed_limit = node._calculate_speed_limit(14.0)
assert speed_limit == 0.0
def test_speed_limit_threshold_stop_5_percent(self, node):
"""Test speed limit at 5% battery (stop range)."""
speed_limit = node._calculate_speed_limit(5.0)
assert speed_limit == 0.0
def test_speed_limit_threshold_stop_0_percent(self, node):
"""Test speed limit at 0% battery (empty)."""
speed_limit = node._calculate_speed_limit(0.0)
assert speed_limit == 0.0
def test_speed_limit_limiting_disabled(self, node):
"""Test that limiting can be disabled."""
node.enable_limiting = False
# Even at critical battery, should return full speed
speed_limit = node._calculate_speed_limit(10.0)
assert speed_limit == 1.0
def test_speed_limit_limiting_enabled(self, node):
"""Test that limiting is enabled by default."""
node.enable_limiting = True
speed_limit = node._calculate_speed_limit(10.0)
assert speed_limit == 0.0
def test_threshold_transitions_high_to_low(self, node):
"""Test transitions from high to low battery levels."""
# Start at 100%
assert node._calculate_speed_limit(100.0) == 1.0
# Transition to reduced at 49%
assert node._calculate_speed_limit(49.0) == 0.7
# Transition to critical at 24%
assert node._calculate_speed_limit(24.0) == 0.4
# Transition to stop at 14%
assert node._calculate_speed_limit(14.0) == 0.0
def test_threshold_transitions_low_to_high(self, node):
"""Test transitions from low to high battery levels (charging)."""
# Start at 0%
assert node._calculate_speed_limit(0.0) == 0.0
# Transition to critical at 15%
assert node._calculate_speed_limit(15.0) == 0.4
# Transition to reduced at 25%
assert node._calculate_speed_limit(25.0) == 0.7
# Transition to full at 50%
assert node._calculate_speed_limit(50.0) == 1.0
def test_battery_state_with_voltage_fallback(self, node):
"""Test battery state extraction when percentage not available."""
battery_msg = BatteryState()
battery_msg.percentage = -1.0 # Invalid percentage
battery_msg.voltage = 8.0 # 2S LiPo approximate mid-range
node._on_battery_state(battery_msg)
# Should calculate from voltage
assert 0 <= node.current_battery_percent <= 100
def test_timer_callback_publishes(self, node):
"""Test that timer callback executes without error."""
node._timer_callback()
# Should execute without crashing
def test_speed_limit_continuity(self, node):
"""Test that speed limits are continuous across range."""
for percent in range(0, 101, 5):
speed_limit = node._calculate_speed_limit(float(percent))
assert 0.0 <= speed_limit <= 1.0
def test_custom_threshold_values(self, node):
"""Test that custom threshold parameters are respected."""
# Modify thresholds
node.threshold_full_2 = 60
node.threshold_reduced_2 = 30
node.threshold_critical_2 = 10
# Test new boundaries
assert node._calculate_speed_limit(60.0) == 1.0
assert node._calculate_speed_limit(59.0) == 0.7
assert node._calculate_speed_limit(30.0) == 0.7
assert node._calculate_speed_limit(29.0) == 0.4
assert node._calculate_speed_limit(10.0) == 0.4
assert node._calculate_speed_limit(9.0) == 0.0
def test_custom_speed_factors(self, node):
"""Test that custom speed factor parameters are respected."""
# Modify speed factors
node.speed_factor_full = 1.0
node.speed_factor_reduced = 0.5
node.speed_factor_critical = 0.25
node.speed_factor_stop = 0.0
assert node._calculate_speed_limit(100.0) == 1.0
assert node._calculate_speed_limit(50.0) == 0.5
assert node._calculate_speed_limit(25.0) == 0.25
assert node._calculate_speed_limit(10.0) == 0.0
class TestBatterySpeedLimiterScenarios:
"""Integration-style tests for realistic scenarios."""
def test_scenario_normal_operation_high_battery(self, node):
"""Scenario: Normal operation with high battery."""
battery_msg = BatteryState()
battery_msg.percentage = 0.85 # 85%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 1.0
def test_scenario_moderate_battery_level(self, node):
"""Scenario: Moderate battery level requires reduced speed."""
battery_msg = BatteryState()
battery_msg.percentage = 0.4 # 40%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 0.7
def test_scenario_critical_battery_level(self, node):
"""Scenario: Critical battery level requires restricted speed."""
battery_msg = BatteryState()
battery_msg.percentage = 0.2 # 20%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 0.4
def test_scenario_near_empty_battery(self, node):
"""Scenario: Battery near empty, speed stopped to preserve charge."""
battery_msg = BatteryState()
battery_msg.percentage = 0.08 # 8%
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
assert speed_limit == 0.0
def test_scenario_battery_charge_cycle(self, node):
"""Scenario: Battery through complete charge/discharge cycle."""
battery_percentages = [5, 10, 15, 20, 25, 30, 40, 50, 60, 75, 90, 100]
expected_factors = [0.0, 0.0, 0.4, 0.4, 0.7, 0.7, 0.7, 1.0, 1.0, 1.0, 1.0, 1.0]
for percent, expected_factor in zip(battery_percentages, expected_factors):
speed_limit = node._calculate_speed_limit(float(percent))
assert speed_limit == expected_factor
def test_scenario_continuous_battery_update(self, node):
"""Scenario: Continuous battery state updates."""
for percent in range(100, 0, -5):
battery_msg = BatteryState()
battery_msg.percentage = percent / 100.0
node._on_battery_state(battery_msg)
speed_limit = node._calculate_speed_limit(node.current_battery_percent)
# Should always return valid factor
assert 0.0 <= speed_limit <= 1.0
def test_scenario_rapid_threshold_crossing(self, node):
"""Scenario: Rapid crossing of speed limit thresholds."""
thresholds = [100, 50, 49, 25, 24, 15, 14, 0]
for percent in thresholds:
speed_limit = node._calculate_speed_limit(float(percent))
assert 0.0 <= speed_limit <= 1.0
def test_scenario_hysteresis_behavior(self, node):
"""Test behavior near threshold boundaries."""
# Test just below and above each threshold
test_cases = [
(50.1, 1.0), # Just above 50% boundary (full speed)
(49.9, 0.7), # Just below 50% boundary (reduced)
(25.1, 0.7), # Just above 25% boundary (reduced)
(24.9, 0.4), # Just below 25% boundary (critical)
(15.1, 0.4), # Just above 15% boundary (critical)
(14.9, 0.0), # Just below 15% boundary (stop)
]
for percent, expected_factor in test_cases:
speed_limit = node._calculate_speed_limit(percent)
assert speed_limit == expected_factor

View File

@ -0,0 +1,259 @@
"""
_person_reid.py Appearance-based person re-identification (no ROS2 deps).
Algorithm
---------
For each person detection (bounding-box crop from a camera image):
1. Extract a 2-D HS colour histogram:
H axis: 16 bins over [0, 180] (OpenCV hue)
S axis: 8 bins over [0, 256] (saturation)
The Value channel is omitted for illumination robustness.
Result: 128-dimensional float32 vector, L2-normalised.
2. Match against the gallery of active TrackEntry objects using cosine
similarity (= dot product of L2-normalised vectors). The best match
whose similarity exceeds `threshold` is selected; otherwise a new track
is created.
3. On a successful match the gallery entry is updated with an exponential
moving average (α = 0.3) of the new feature to adapt gradually to
appearance changes (lighting, pose) while staying robust to outliers.
4. Tracks not updated for `timeout_s` wall-clock seconds are flagged
`is_stale`; stale tracks are pruned from the gallery on the next cycle.
Public API
----------
H_BINS, S_BINS, FEAT_DIM histogram shape constants
TrackEntry mutable dataclass for one active track
extract_hsv_histogram(bgr_crop) -> np.ndarray (FEAT_DIM,) L2-normalised
cosine_similarity(a, b) -> float in [-1, 1]
match_track(feature, tracks, threshold)
-> (track_id, similarity) | (None, 0.0)
TrackGallery mutable store of active tracks
.add(camera_id, feature, bbox, now) -> int (new track_id)
.update(track_id, camera_id, feature, bbox, confidence, now)
.match(feature, threshold) -> (track_id, similarity) | (None, 0.0)
.mark_stale(now, timeout_s)
.prune_stale()
.tracks -> Dict[int, TrackEntry]
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Dict, Optional, Tuple
import cv2
import numpy as np
# ── Histogram constants ───────────────────────────────────────────────────────
H_BINS: int = 16
S_BINS: int = 8
FEAT_DIM: int = H_BINS * S_BINS # 128
_EMA_ALPHA: float = 0.3 # weight of newest observation in EMA update
# ── Track entry ───────────────────────────────────────────────────────────────
@dataclass
class TrackEntry:
"""Mutable state for one active person track."""
track_id: int
camera_id: str
feature: np.ndarray # (FEAT_DIM,) L2-normalised appearance
confidence: float # last match similarity (1.0 for new)
first_seen: float # time.time() wall-clock seconds
last_seen: float # time.time() wall-clock seconds
bbox: Tuple[float, float, float, float] = (0.0, 0.0, 0.0, 0.0)
# (centre_x, centre_y, size_x, size_y) in image pixels
is_stale: bool = False
# ── Feature extraction ────────────────────────────────────────────────────────
def extract_hsv_histogram(bgr_crop: np.ndarray) -> np.ndarray:
"""
Extract a 2-D HS colour histogram from a BGR image crop.
The Value channel is excluded for illumination robustness.
The result is L2-normalised; a zero-area or empty crop returns a zero vector.
Parameters
----------
bgr_crop : uint8 BGR image, any size 1×1.
Returns
-------
np.ndarray, shape (FEAT_DIM,) = (128,), dtype float32, L2-normalised.
"""
if bgr_crop is None or bgr_crop.size == 0:
return np.zeros(FEAT_DIM, dtype=np.float32)
hsv = cv2.cvtColor(bgr_crop, cv2.COLOR_BGR2HSV)
hist = cv2.calcHist(
[hsv], [0, 1], None,
[H_BINS, S_BINS],
[0, 180, 0, 256],
)
flat = hist.flatten().astype(np.float32)
norm = np.linalg.norm(flat)
if norm > 0.0:
flat /= norm
return flat
# ── Similarity ────────────────────────────────────────────────────────────────
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""
Cosine similarity between two vectors.
Works correctly for both unit-normalised and unnormalised inputs.
Returns 0.0 when either vector has (near-)zero norm.
Returns
-------
float in [-1, 1].
"""
na = float(np.linalg.norm(a))
nb = float(np.linalg.norm(b))
if na < 1e-12 or nb < 1e-12:
return 0.0
return float(np.dot(a, b) / (na * nb))
# ── Gallery matching ──────────────────────────────────────────────────────────
def match_track(
feature: np.ndarray,
tracks: Dict[int, TrackEntry],
threshold: float,
) -> Tuple[Optional[int], float]:
"""
Find the gallery entry with the highest cosine similarity to `feature`.
Stale tracks are excluded from matching.
Parameters
----------
feature : query appearance vector (FEAT_DIM,)
tracks : gallery dict keyed by track_id
threshold : minimum cosine similarity required to accept a match
Returns
-------
(track_id, similarity) if a match is found, else (None, 0.0).
"""
best_id: Optional[int] = None
best_sim: float = threshold # strict: new track if sim == threshold exactly
for tid, entry in tracks.items():
if entry.is_stale:
continue
sim = cosine_similarity(feature, entry.feature)
if sim > best_sim:
best_sim = sim
best_id = tid
if best_id is not None:
return best_id, best_sim
return None, 0.0
# ── Gallery ───────────────────────────────────────────────────────────────────
class TrackGallery:
"""
Mutable store of active person tracks.
Thread-safety: not thread-safe; call from a single ROS2 callback thread.
"""
def __init__(self) -> None:
self._tracks: Dict[int, TrackEntry] = {}
self._next_id: int = 1
@property
def tracks(self) -> Dict[int, TrackEntry]:
return self._tracks
def add(
self,
camera_id: str,
feature: np.ndarray,
bbox: Tuple[float, float, float, float],
now: float,
) -> int:
"""Create a new track and return its assigned track_id."""
tid = self._next_id
self._next_id += 1
self._tracks[tid] = TrackEntry(
track_id=tid,
camera_id=camera_id,
feature=feature.copy(),
confidence=1.0,
first_seen=now,
last_seen=now,
bbox=bbox,
)
return tid
def update(
self,
track_id: int,
camera_id: str,
feature: np.ndarray,
bbox: Tuple[float, float, float, float],
confidence: float,
now: float,
) -> None:
"""
Update an existing track with a new observation.
The appearance feature is blended via EMA (α=0.3) for robustness.
"""
entry = self._tracks[track_id]
# EMA blend: α * new + (1-α) * old, then re-normalise
updated = _EMA_ALPHA * feature + (1.0 - _EMA_ALPHA) * entry.feature
norm = np.linalg.norm(updated)
if norm > 0.0:
updated /= norm
self._tracks[track_id] = TrackEntry(
track_id=track_id,
camera_id=camera_id,
feature=updated,
confidence=confidence,
first_seen=entry.first_seen,
last_seen=now,
bbox=bbox,
is_stale=False,
)
def match(
self,
feature: np.ndarray,
threshold: float,
) -> Tuple[Optional[int], float]:
"""Delegate to module-level match_track."""
return match_track(feature, self._tracks, threshold)
def mark_stale(self, now: float, timeout_s: float) -> None:
"""Flag tracks whose last_seen age exceeds timeout_s."""
for entry in self._tracks.values():
if (now - entry.last_seen) > timeout_s:
entry.is_stale = True
def prune_stale(self) -> None:
"""Remove all stale tracks from the gallery."""
self._tracks = {
tid: e for tid, e in self._tracks.items() if not e.is_stale
}

View File

@ -0,0 +1,215 @@
"""
person_reid_node.py Appearance-based person re-identification (Issue #322).
Subscribes to a camera RGB stream and person detection bounding boxes,
extracts per-person HSV colour histograms, matches against an in-memory
gallery using cosine similarity, and publishes stable PersonTrack IDs.
Running one instance per camera (remapping /camera/color/image_raw and
/saltybot/scene/objects) achieves cross-camera re-id: the same physical
person is assigned the same track_id regardless of which camera view
they appear in, because appearance matching is camera-agnostic.
Subscribes
----------
/camera/color/image_raw sensor_msgs/Image (BEST_EFFORT)
/saltybot/scene/objects saltybot_scene_msgs/SceneObjectArray (BEST_EFFORT)
Publishes
---------
/saltybot/person_tracks saltybot_scene_msgs/PersonTrackArray at publish_hz
Parameters
----------
camera_id str "front" Label stored in every PersonTrack
similarity_threshold float 0.75 Cosine similarity threshold for re-id
stale_timeout_s float 30.0 Seconds before inactive track is stale
max_tracks int 20 Maximum concurrent active tracks
publish_hz float 5.0 Track list publication rate (Hz)
"""
from __future__ import annotations
import time
import cv2
import cv_bridge
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from builtin_interfaces.msg import Time as TimeMsg
from sensor_msgs.msg import Image
from saltybot_scene_msgs.msg import (
SceneObjectArray,
PersonTrack,
PersonTrackArray,
)
from ._person_reid import TrackGallery, extract_hsv_histogram
# COCO class 0 = person
_PERSON_CLASS_ID = 0
_BEST_EFFORT_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
def _float_to_ros_time(wall_secs: float) -> TimeMsg:
"""Convert a time.time() float to builtin_interfaces/Time."""
sec_int = int(wall_secs)
nanosec = int((wall_secs - sec_int) * 1e9)
msg = TimeMsg()
msg.sec = sec_int
msg.nanosec = nanosec
return msg
class PersonReidNode(Node):
def __init__(self) -> None:
super().__init__('person_reid_node')
self.declare_parameter('camera_id', 'front')
self.declare_parameter('similarity_threshold', 0.75)
self.declare_parameter('stale_timeout_s', 30.0)
self.declare_parameter('max_tracks', 20)
self.declare_parameter('publish_hz', 5.0)
self._camera_id = str(self.get_parameter('camera_id').value)
self._threshold = float(self.get_parameter('similarity_threshold').value)
self._timeout = float(self.get_parameter('stale_timeout_s').value)
self._max_tracks = int(self.get_parameter('max_tracks').value)
publish_hz = float(self.get_parameter('publish_hz').value)
self._bridge: cv_bridge.CvBridge = cv_bridge.CvBridge()
self._gallery: TrackGallery = TrackGallery()
self._latest_image: np.ndarray | None = None
self._img_sub = self.create_subscription(
Image,
'/camera/color/image_raw',
self._on_image,
_BEST_EFFORT_QOS,
)
self._det_sub = self.create_subscription(
SceneObjectArray,
'/saltybot/scene/objects',
self._on_detections,
_BEST_EFFORT_QOS,
)
self._pub = self.create_publisher(
PersonTrackArray,
'/saltybot/person_tracks',
10,
)
period = 1.0 / max(publish_hz, 0.1)
self._timer = self.create_timer(period, self._on_timer)
self.get_logger().info(
f'person_reid_node ready — '
f'camera_id={self._camera_id} '
f'threshold={self._threshold} '
f'timeout={self._timeout}s '
f'max_tracks={self._max_tracks}'
)
# ── Image callback ────────────────────────────────────────────────────────
def _on_image(self, msg: Image) -> None:
try:
self._latest_image = self._bridge.imgmsg_to_cv2(msg, 'bgr8')
except Exception as exc:
self.get_logger().warning(f'cv_bridge error: {exc}', throttle_duration_sec=5.0)
# ── Detection callback ────────────────────────────────────────────────────
def _on_detections(self, msg: SceneObjectArray) -> None:
if self._latest_image is None:
return
now = time.time()
img = self._latest_image
h, w = img.shape[:2]
for obj in msg.objects:
if obj.class_id != _PERSON_CLASS_ID:
continue
# Crop the person ROI from the cached image
cx = obj.bbox.center.position.x
cy = obj.bbox.center.position.y
hsw = obj.bbox.size_x / 2.0
hsh = obj.bbox.size_y / 2.0
x1 = max(0, int(cx - hsw))
y1 = max(0, int(cy - hsh))
x2 = min(w, int(cx + hsw))
y2 = min(h, int(cy + hsh))
if x2 <= x1 or y2 <= y1:
continue
crop = img[y1:y2, x1:x2]
feature = extract_hsv_histogram(crop)
bbox = (cx, cy, obj.bbox.size_x, obj.bbox.size_y)
track_id, sim = self._gallery.match(feature, self._threshold)
if track_id is not None:
self._gallery.update(
track_id, self._camera_id, feature, bbox, float(sim), now
)
elif len(self._gallery.tracks) < self._max_tracks:
self._gallery.add(self._camera_id, feature, bbox, now)
# ── Timer callback — publish + housekeeping ───────────────────────────────
def _on_timer(self) -> None:
now = time.time()
self._gallery.mark_stale(now, self._timeout)
self._gallery.prune_stale()
stamp = self.get_clock().now().to_msg()
out = PersonTrackArray()
out.header.stamp = stamp
out.header.frame_id = self._camera_id
for entry in self._gallery.tracks.values():
t = PersonTrack()
t.header.stamp = stamp
t.header.frame_id = self._camera_id
t.track_id = entry.track_id
t.camera_id = entry.camera_id
t.confidence = float(entry.confidence)
t.first_seen = _float_to_ros_time(entry.first_seen)
t.last_seen = _float_to_ros_time(entry.last_seen)
t.is_stale = entry.is_stale
# Populate last-known bounding box
cx, cy, sx, sy = entry.bbox
t.bbox.center.position.x = cx
t.bbox.center.position.y = cy
t.bbox.size_x = sx
t.bbox.size_y = sy
out.tracks.append(t)
self._pub.publish(out)
def main(args=None) -> None:
rclpy.init(args=args)
node = PersonReidNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -49,6 +49,8 @@ setup(
'sky_detector = saltybot_bringup.sky_detect_node:main',
# Wheel encoder differential drive odometry (Issue #184)
'wheel_odom = saltybot_bringup.wheel_odom_node:main',
# Appearance-based person re-identification (Issue #322)
'person_reid = saltybot_bringup.person_reid_node:main',
],
},
)

View File

@ -0,0 +1,528 @@
"""
test_person_reid.py Unit tests for person re-identification helpers (no ROS2).
Covers:
extract_hsv_histogram:
- output shape is FEAT_DIM (128)
- None input zero vector
- zero-size array input zero vector
- solid-colour image L2-normalised (norm 1)
- same colour on different image sizes same histogram
- different solid colours dissimilar histograms
- all-white image (low saturation) histogram concentrated in low-S bins
- single-pixel image valid unit histogram
cosine_similarity:
- identical unit vectors 1.0
- zero vector (a) 0.0
- zero vector (b) 0.0
- both zero 0.0
- anti-parallel vectors -1.0
- orthogonal vectors 0.0
- non-unit same-direction vectors 1.0
- non-unit opposite-direction vectors -1.0
- partial overlap in (0, 1)
match_track:
- empty gallery (None, 0.0)
- perfect match above threshold correct track_id, sim 1.0
- similarity exactly at threshold no match (strict >)
- below threshold (None, 0.0)
- stale track excluded from matching
- multiple tracks best one returned
- all tracks stale (None, 0.0)
TrackGallery:
- add assigns track_id = 1, 2, 3 ...
- add stores a copy of feature (mutation of original doesn't affect gallery)
- update changes last_seen, camera_id, confidence
- update applies EMA: feature shifts toward new, stays unit-norm
- update clears is_stale flag
- update preserves first_seen
- mark_stale: recent track stays fresh
- mark_stale: old track flagged stale
- prune_stale removes stale, keeps fresh
- empty gallery match (None, 0.0)
Re-id integration:
- same appearance from two camera_ids same track_id
- different appearance different track_id
- stale + prune + re-add new track_id
- EMA: track adapts over multiple updates (feature norm stays 1)
"""
import sys
import os
import math
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_bringup._person_reid import (
H_BINS,
S_BINS,
FEAT_DIM,
TrackEntry,
TrackGallery,
cosine_similarity,
extract_hsv_histogram,
match_track,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _solid_bgr(b: int, g: int, r: int, size: int = 32) -> np.ndarray:
"""Return a solid-colour BGR uint8 image of given size."""
import cv2
img = np.full((size, size, 3), (b, g, r), dtype=np.uint8)
return img
def _unit(v: np.ndarray) -> np.ndarray:
n = np.linalg.norm(v)
return v / n if n > 1e-12 else v
def _make_feature(seed: int) -> np.ndarray:
"""Reproducible random L2-normalised feature vector."""
rng = np.random.default_rng(seed)
v = rng.random(FEAT_DIM).astype(np.float32)
return v / np.linalg.norm(v)
def _make_entry(
tid: int,
feature: np.ndarray,
camera_id: str = 'front',
now: float = 1000.0,
is_stale: bool = False,
) -> TrackEntry:
return TrackEntry(
track_id=tid,
camera_id=camera_id,
feature=feature.copy(),
confidence=1.0,
first_seen=now,
last_seen=now,
is_stale=is_stale,
)
# ── extract_hsv_histogram ─────────────────────────────────────────────────────
class TestExtractHsvHistogram:
def test_output_shape(self):
img = _solid_bgr(0, 0, 255)
hist = extract_hsv_histogram(img)
assert hist.shape == (FEAT_DIM,)
def test_output_dtype(self):
hist = extract_hsv_histogram(_solid_bgr(0, 255, 0))
assert hist.dtype == np.float32
def test_none_returns_zero(self):
hist = extract_hsv_histogram(None)
assert hist.shape == (FEAT_DIM,)
assert np.all(hist == 0.0)
def test_empty_array_returns_zero(self):
import numpy as np
empty = np.zeros((0, 0, 3), dtype=np.uint8)
hist = extract_hsv_histogram(empty)
assert np.all(hist == 0.0)
def test_solid_colour_is_unit_norm(self):
hist = extract_hsv_histogram(_solid_bgr(255, 0, 0))
assert np.linalg.norm(hist) == pytest.approx(1.0, rel=1e-5)
def test_same_colour_different_sizes_equal(self):
h16 = extract_hsv_histogram(_solid_bgr(0, 0, 200, size=16))
h64 = extract_hsv_histogram(_solid_bgr(0, 0, 200, size=64))
assert np.allclose(h16, h64, atol=1e-5)
def test_different_colours_different_histograms(self):
red = extract_hsv_histogram(_solid_bgr(0, 0, 255))
blue = extract_hsv_histogram(_solid_bgr(255, 0, 0))
assert not np.allclose(red, blue, atol=1e-3)
def test_red_vs_green_low_similarity(self):
red = extract_hsv_histogram(_solid_bgr(0, 0, 255))
green = extract_hsv_histogram(_solid_bgr(0, 255, 0))
sim = cosine_similarity(red, green)
assert sim < 0.5
def test_single_pixel_valid(self):
import numpy as np
px = np.array([[[0, 200, 200]]], dtype=np.uint8)
hist = extract_hsv_histogram(px)
assert hist.shape == (FEAT_DIM,)
assert np.linalg.norm(hist) == pytest.approx(1.0, rel=1e-5)
def test_feat_dim_constant(self):
assert FEAT_DIM == H_BINS * S_BINS
assert FEAT_DIM == 128
# ── cosine_similarity ─────────────────────────────────────────────────────────
class TestCosineSimilarity:
def test_identical_unit_vectors(self):
v = _make_feature(0)
assert cosine_similarity(v, v.copy()) == pytest.approx(1.0, rel=1e-6)
def test_zero_vector_a(self):
z = np.zeros(FEAT_DIM, dtype=np.float32)
v = _make_feature(1)
assert cosine_similarity(z, v) == pytest.approx(0.0, abs=1e-9)
def test_zero_vector_b(self):
v = _make_feature(2)
z = np.zeros(FEAT_DIM, dtype=np.float32)
assert cosine_similarity(v, z) == pytest.approx(0.0, abs=1e-9)
def test_both_zero(self):
z = np.zeros(FEAT_DIM, dtype=np.float32)
assert cosine_similarity(z, z) == pytest.approx(0.0, abs=1e-9)
def test_anti_parallel(self):
v = _make_feature(3)
assert cosine_similarity(v, -v) == pytest.approx(-1.0, rel=1e-6)
def test_orthogonal(self):
a = np.zeros(FEAT_DIM, dtype=np.float32)
b = np.zeros(FEAT_DIM, dtype=np.float32)
a[0] = 1.0
b[1] = 1.0
assert cosine_similarity(a, b) == pytest.approx(0.0, abs=1e-9)
def test_non_unit_same_direction(self):
v = _make_feature(4)
assert cosine_similarity(v * 5.0, v * 3.0) == pytest.approx(1.0, rel=1e-6)
def test_non_unit_opposite_direction(self):
v = _make_feature(5)
assert cosine_similarity(v * 7.0, -v * 2.0) == pytest.approx(-1.0, rel=1e-6)
def test_partial_overlap_in_range(self):
a = np.zeros(FEAT_DIM, dtype=np.float32)
b = np.zeros(FEAT_DIM, dtype=np.float32)
a[0] = 1.0
b[0] = 1.0
b[1] = 1.0
sim = cosine_similarity(a, b)
assert 0.0 < sim < 1.0
# ── match_track ───────────────────────────────────────────────────────────────
class TestMatchTrack:
def test_empty_gallery(self):
feat = _make_feature(10)
tid, sim = match_track(feat, {}, threshold=0.75)
assert tid is None
assert sim == pytest.approx(0.0, abs=1e-9)
def test_exact_match_above_threshold(self):
feat = _make_feature(11)
tracks = {1: _make_entry(1, feat)}
tid, sim = match_track(feat, tracks, threshold=0.75)
assert tid == 1
assert sim == pytest.approx(1.0, rel=1e-5)
def test_similarity_at_threshold_no_match(self):
"""Strict >: equal to threshold is not a match."""
feat = _make_feature(12)
# construct a feature at exactly cosine = threshold
# use two orthogonal features blended so sim == threshold
a = np.zeros(FEAT_DIM, dtype=np.float32)
b = np.zeros(FEAT_DIM, dtype=np.float32)
a[0] = 1.0
b[1] = 1.0
threshold = 0.75
# query is a; gallery is a → sim = 1.0, way above. Just test with threshold=1.0+eps
tracks = {1: _make_entry(1, a)}
tid, sim = match_track(a, tracks, threshold=1.0 + 1e-9)
assert tid is None
def test_below_threshold_no_match(self):
feat_a = _make_feature(13)
feat_b = _make_feature(14)
tracks = {1: _make_entry(1, feat_b)}
# Force threshold above any realistic similarity between random features
tid, sim = match_track(feat_a, tracks, threshold=0.9999)
if cosine_similarity(feat_a, feat_b) <= 0.9999:
assert tid is None
def test_stale_track_excluded(self):
feat = _make_feature(15)
tracks = {1: _make_entry(1, feat, is_stale=True)}
tid, sim = match_track(feat, tracks, threshold=0.5)
assert tid is None
def test_multiple_tracks_best_returned(self):
query = _make_feature(16)
close = query.copy() # sim = 1.0
far = _make_feature(17) # lower sim
tracks = {
1: _make_entry(1, far),
2: _make_entry(2, close),
}
tid, sim = match_track(query, tracks, threshold=0.5)
assert tid == 2
assert sim == pytest.approx(1.0, rel=1e-5)
def test_all_stale_no_match(self):
feat = _make_feature(18)
tracks = {
1: _make_entry(1, feat, is_stale=True),
2: _make_entry(2, feat, is_stale=True),
}
tid, sim = match_track(feat, tracks, threshold=0.5)
assert tid is None
def test_fresh_and_stale_returns_fresh(self):
feat = _make_feature(19)
tracks = {
1: _make_entry(1, feat, is_stale=True),
2: _make_entry(2, feat, is_stale=False),
}
tid, sim = match_track(feat, tracks, threshold=0.5)
assert tid == 2
# ── TrackGallery ──────────────────────────────────────────────────────────────
_BBOX = (10.0, 20.0, 50.0, 80.0)
class TestTrackGallery:
def test_add_increments_track_id(self):
g = TrackGallery()
now = 1000.0
t1 = g.add('front', _make_feature(20), _BBOX, now)
t2 = g.add('front', _make_feature(21), _BBOX, now)
t3 = g.add('rear', _make_feature(22), _BBOX, now)
assert t1 == 1
assert t2 == 2
assert t3 == 3
def test_add_stores_copy(self):
g = TrackGallery()
feat = _make_feature(23)
orig = feat.copy()
g.add('front', feat, _BBOX, 1000.0)
feat[0] = 99.9 # mutate original
stored = g.tracks[1].feature
assert np.allclose(stored, orig, atol=1e-6)
def test_add_sets_timestamps(self):
g = TrackGallery()
now = 5000.0
g.add('front', _make_feature(24), _BBOX, now)
e = g.tracks[1]
assert e.first_seen == pytest.approx(now)
assert e.last_seen == pytest.approx(now)
def test_add_initial_confidence_one(self):
g = TrackGallery()
g.add('front', _make_feature(25), _BBOX, 1000.0)
assert g.tracks[1].confidence == pytest.approx(1.0)
def test_update_changes_last_seen(self):
g = TrackGallery()
feat = _make_feature(26)
g.add('front', feat, _BBOX, 1000.0)
g.update(1, 'front', feat, _BBOX, 0.9, 2000.0)
assert g.tracks[1].last_seen == pytest.approx(2000.0)
assert g.tracks[1].first_seen == pytest.approx(1000.0) # preserved
def test_update_changes_camera_id(self):
g = TrackGallery()
feat = _make_feature(27)
g.add('front', feat, _BBOX, 1000.0)
g.update(1, 'right', feat, _BBOX, 0.8, 2000.0)
assert g.tracks[1].camera_id == 'right'
def test_update_applies_ema(self):
g = TrackGallery()
feat0 = _make_feature(28)
g.add('front', feat0, _BBOX, 1000.0)
feat1 = _make_feature(29)
g.update(1, 'front', feat1, _BBOX, 0.85, 2000.0)
stored = g.tracks[1].feature
# EMA: 0.3*feat1 + 0.7*feat0 (then renorm) — not equal to either
assert not np.allclose(stored, feat0, atol=1e-4)
assert not np.allclose(stored, feat1, atol=1e-4)
def test_update_feature_stays_unit_norm(self):
g = TrackGallery()
feat = _make_feature(30)
g.add('front', feat, _BBOX, 1000.0)
for i in range(10):
new_feat = _make_feature(31 + i)
g.update(1, 'front', new_feat, _BBOX, 0.9, 1000.0 + i)
assert np.linalg.norm(g.tracks[1].feature) == pytest.approx(1.0, rel=1e-5)
def test_update_clears_stale(self):
g = TrackGallery()
feat = _make_feature(41)
g.add('front', feat, _BBOX, 1000.0)
g.tracks[1].is_stale = True
g.update(1, 'front', feat, _BBOX, 0.9, 2000.0)
assert g.tracks[1].is_stale is False
def test_mark_stale_recent_track_stays_fresh(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(42), _BBOX, now)
g.mark_stale(now + 10.0, timeout_s=30.0)
assert g.tracks[1].is_stale is False
def test_mark_stale_old_track_flagged(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(43), _BBOX, now)
g.mark_stale(now + 31.0, timeout_s=30.0)
assert g.tracks[1].is_stale is True
def test_mark_stale_exactly_at_timeout_not_stale(self):
"""Timeout is strictly >, so last_seen + timeout_s is still fresh."""
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(44), _BBOX, now)
g.mark_stale(now + 30.0, timeout_s=30.0)
assert g.tracks[1].is_stale is False
def test_prune_stale_removes_flagged(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(45), _BBOX, now) # track 1
g.add('front', _make_feature(46), _BBOX, now) # track 2
g.tracks[1].is_stale = True
g.prune_stale()
assert 1 not in g.tracks
assert 2 in g.tracks
def test_prune_stale_keeps_fresh(self):
g = TrackGallery()
now = 1000.0
g.add('front', _make_feature(47), _BBOX, now)
g.prune_stale()
assert 1 in g.tracks
def test_empty_gallery_match(self):
g = TrackGallery()
tid, sim = g.match(_make_feature(48), threshold=0.75)
assert tid is None
assert sim == pytest.approx(0.0, abs=1e-9)
def test_match_delegates_to_match_track(self):
g = TrackGallery()
feat = _make_feature(49)
g.add('front', feat, _BBOX, 1000.0)
tid, sim = g.match(feat, threshold=0.5)
assert tid == 1
assert sim == pytest.approx(1.0, rel=1e-5)
# ── Re-id integration ─────────────────────────────────────────────────────────
class TestReIdIntegration:
def test_same_appearance_two_cameras_same_track(self):
"""Same feature from two cameras → same track_id."""
g = TrackGallery()
feat = _make_feature(50)
now = 1000.0
# First sighting: front camera creates track 1
t1 = g.add('front', feat, _BBOX, now)
assert t1 == 1
# Second sighting: right camera matches → updates same track
tid, sim = g.match(feat, threshold=0.75)
assert tid == 1
g.update(tid, 'right', feat, _BBOX, float(sim), now + 1.0)
assert len(g.tracks) == 1
def test_different_appearance_different_tracks(self):
"""Two orthogonal features must not match → separate track IDs."""
g = TrackGallery()
now = 1000.0
# Construct two exactly orthogonal unit feature vectors
feat_a = np.zeros(FEAT_DIM, dtype=np.float32)
feat_b = np.zeros(FEAT_DIM, dtype=np.float32)
feat_a[0] = 1.0 # basis vector e0
feat_b[1] = 1.0 # basis vector e1 — cosine sim = 0.0
g.add('front', feat_a, _BBOX, now)
tid, sim = g.match(feat_b, threshold=0.75)
assert tid is None
g.add('front', feat_b, _BBOX, now + 0.1)
assert len(g.tracks) == 2
def test_stale_prune_readd_new_id(self):
g = TrackGallery()
feat = _make_feature(53)
now = 1000.0
t1 = g.add('front', feat, _BBOX, now)
assert t1 == 1
g.mark_stale(now + 31.0, timeout_s=30.0)
g.prune_stale()
assert len(g.tracks) == 0
# Re-add same person gets new ID
t2 = g.add('front', feat, _BBOX, now + 40.0)
assert t2 == 2 # next_id incremented past 1
def test_ema_feature_norm_stays_unit_after_many_updates(self):
g = TrackGallery()
feat = _make_feature(54)
g.add('front', feat, _BBOX, 0.0)
for i in range(50):
new_feat = _make_feature(55 + i)
g.update(1, 'front', new_feat, _BBOX, 0.9, float(i))
norm = np.linalg.norm(g.tracks[1].feature)
assert norm == pytest.approx(1.0, rel=1e-4)
def test_max_tracks_not_exceeded_in_gallery(self):
"""Gallery itself is unbounded; max_tracks enforcement is in the node.
Confirm gallery can hold many entries without errors."""
g = TrackGallery()
for i in range(25):
g.add('front', _make_feature(100 + i), _BBOX, float(i))
assert len(g.tracks) == 25
def test_hsv_same_image_produces_high_similarity(self):
"""Same-crop HSV histogram should have cosine sim ≈ 1."""
import cv2
img = _solid_bgr(50, 120, 200)
h1 = extract_hsv_histogram(img)
h2 = extract_hsv_histogram(img.copy())
assert cosine_similarity(h1, h2) == pytest.approx(1.0, rel=1e-5)
def test_hsv_different_colours_low_similarity(self):
"""Red vs cyan BGR should be dissimilar."""
red = extract_hsv_histogram(_solid_bgr(0, 0, 200))
cyan = extract_hsv_histogram(_solid_bgr(200, 200, 0))
assert cosine_similarity(red, cyan) < 0.5
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@ -19,6 +19,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #274 HSV color segmentation
"msg/ColorDetection.msg"
"msg/ColorDetectionArray.msg"
# Issue #322 cross-camera person re-identification
"msg/PersonTrack.msg"
"msg/PersonTrackArray.msg"
DEPENDENCIES std_msgs geometry_msgs vision_msgs builtin_interfaces
)

View File

@ -0,0 +1,19 @@
# PersonTrack.msg — stable cross-camera person identity track (Issue #322)
#
# track_id : monotonically-increasing track ID, stable across camera views
# camera_id : source camera label ("front", "right", "rear", "left")
# bbox : last-known bounding box in the source image (centre + size, pixels)
# confidence : cosine-similarity of the matched appearance feature (01);
# 1.0 for tracks created from a first-time observation
# first_seen : wall-clock time when this track was first created
# last_seen : wall-clock time of the most recent appearance observation
# is_stale : true when (now last_seen) > stale_timeout_s
#
std_msgs/Header header
uint32 track_id
string camera_id
vision_msgs/BoundingBox2D bbox
float32 confidence
builtin_interfaces/Time first_seen
builtin_interfaces/Time last_seen
bool is_stale

View File

@ -0,0 +1,4 @@
# PersonTrackArray.msg — all active person tracks from one camera view (Issue #322)
#
std_msgs/Header header
PersonTrack[] tracks

View File

@ -0,0 +1,14 @@
camera_hotplug_node:
ros__parameters:
video_glob: "/dev/video*" # glob pattern for USB camera devices
output_topic: "/saltybot/camera_status"
poll_rate: 2.0 # device scan frequency (Hz)
# Reconnect detection: if a camera reappears within restart_grace_s
# seconds of being lost, publish "restarting" instead of "connected".
restart_grace_s: 5.0 # reconnect grace window (s)
restart_hold_s: 2.0 # duration to hold "restarting" before resolving (s)
# Set true to publish status on every poll tick, not only on change.
publish_always: false

View File

@ -0,0 +1,43 @@
"""camera_hotplug.launch.py — Launch USB camera hot-plug monitor (Issue #320).
Usage:
ros2 launch saltybot_social camera_hotplug.launch.py
ros2 launch saltybot_social camera_hotplug.launch.py video_glob:=/dev/video0
ros2 launch saltybot_social camera_hotplug.launch.py restart_grace_s:=10.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "camera_hotplug_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("video_glob", default_value="/dev/video*",
description="Glob pattern for video device discovery"),
DeclareLaunchArgument("poll_rate", default_value="2.0",
description="Device scan frequency (Hz)"),
DeclareLaunchArgument("restart_grace_s", default_value="5.0",
description="Reconnect window that triggers 'restarting' (s)"),
Node(
package="saltybot_social",
executable="camera_hotplug_node",
name="camera_hotplug_node",
output="screen",
parameters=[
cfg,
{
"video_glob": LaunchConfiguration("video_glob"),
"poll_rate": LaunchConfiguration("poll_rate"),
"restart_grace_s": LaunchConfiguration("restart_grace_s"),
},
],
),
])

View File

@ -0,0 +1,241 @@
"""camera_hotplug_node.py — USB camera hot-plug monitor.
Issue #320
Polls ``/dev/video*`` at a configurable rate to detect USB camera
connect / disconnect events and publishes a status string on
/saltybot/camera_status.
Status values
"connected" one or more video devices present and stable
"disconnected" no /dev/video* devices found
"restarting" device set changed (hot-swap / reconnect); downstream
capture pipelines should restart. Held for
``restart_hold_s`` seconds then resolved to
"connected" or "disconnected" based on re-scan.
State machine
DISCONNECTED (devices appear, within grace window) RESTARTING
DISCONNECTED (devices appear, outside grace window) CONNECTED
CONNECTED (all devices lost) DISCONNECTED
CONNECTED (device set changed) RESTARTING
RESTARTING (hold_s elapsed, devices present) CONNECTED
RESTARTING (hold_s elapsed, no devices) DISCONNECTED
Initial state is set by the first scan at startup (no "restarting" on boot).
Publications
/saltybot/camera_status std_msgs/String "connected" | "disconnected"
| "restarting"
Parameters
video_glob (str, "/dev/video*") glob for device discovery
poll_rate (float, 2.0) scan frequency (Hz)
restart_grace_s (float, 5.0) reconnect window; if a camera
reappears within this many seconds
of being lost "restarting"
restart_hold_s (float, 2.0) how long to hold "restarting"
publish_always (bool, False) publish on every tick, not only
on state change
output_topic (str, "/saltybot/camera_status")
"""
from __future__ import annotations
import glob as _glob_mod
import threading
import time
from typing import FrozenSet, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
# ── Status constants ───────────────────────────────────────────────────────────
STATUS_CONNECTED = "connected"
STATUS_DISCONNECTED = "disconnected"
STATUS_RESTARTING = "restarting"
# ── Pure helpers ───────────────────────────────────────────────────────────────
def scan_video_devices(video_glob: str) -> FrozenSet[str]:
"""Return the frozenset of /dev/video* paths currently present.
Catches permission / OS errors and returns an empty frozenset.
"""
try:
return frozenset(_glob_mod.glob(video_glob))
except Exception:
return frozenset()
def compute_transition(
state: str,
prev_devices: FrozenSet[str],
curr_devices: FrozenSet[str],
now: float,
disconnect_time: float,
restart_until: float,
restart_grace_s: float,
restart_hold_s: float,
) -> Tuple[str, float, float]:
"""Pure state-machine step — no I/O, no ROS.
Parameters
----------
state : current status string
prev_devices : device set from the previous scan
curr_devices : device set from the current scan
now : current monotonic time (s)
disconnect_time : monotonic time of the last CONNECTED DISCONNECTED
transition (0.0 = never disconnected)
restart_until : monotonic time at which RESTARTING resolves
restart_grace_s : reconnect window that triggers RESTARTING
restart_hold_s : duration to hold RESTARTING before resolving
Returns
-------
(new_state, new_disconnect_time, new_restart_until)
"""
if state == STATUS_RESTARTING:
if now >= restart_until:
if curr_devices:
return STATUS_CONNECTED, disconnect_time, restart_until
return STATUS_DISCONNECTED, now, restart_until
return STATUS_RESTARTING, disconnect_time, restart_until
if state == STATUS_CONNECTED:
if not curr_devices:
# All cameras lost
return STATUS_DISCONNECTED, now, restart_until
if curr_devices != prev_devices:
# Hot-swap or device renumbering
return STATUS_RESTARTING, disconnect_time, now + restart_hold_s
return STATUS_CONNECTED, disconnect_time, restart_until
# STATUS_DISCONNECTED
if curr_devices:
# Reconnect: check if we're inside the grace window
if disconnect_time > 0.0 and (now - disconnect_time) <= restart_grace_s:
return STATUS_RESTARTING, disconnect_time, now + restart_hold_s
return STATUS_CONNECTED, disconnect_time, restart_until
return STATUS_DISCONNECTED, disconnect_time, restart_until
# ── ROS2 node ──────────────────────────────────────────────────────────────────
class CameraHotplugNode(Node):
"""Monitors /dev/video* and publishes camera connection status."""
def __init__(self) -> None:
super().__init__("camera_hotplug_node")
self.declare_parameter("video_glob", "/dev/video*")
self.declare_parameter("poll_rate", 2.0)
self.declare_parameter("restart_grace_s", 5.0)
self.declare_parameter("restart_hold_s", 2.0)
self.declare_parameter("publish_always", False)
self.declare_parameter("output_topic", "/saltybot/camera_status")
self._glob = self.get_parameter("video_glob").value
poll_rate = float(self.get_parameter("poll_rate").value)
self._grace_s = float(self.get_parameter("restart_grace_s").value)
self._hold_s = float(self.get_parameter("restart_hold_s").value)
self._pub_always = bool(self.get_parameter("publish_always").value)
output_topic = self.get_parameter("output_topic").value
# Injected scan function — allows unit-test mocking
self._scan_fn = scan_video_devices
# Perform initial scan (before any lock or timer)
initial = self._scan_fn(self._glob)
self._state = STATUS_CONNECTED if initial else STATUS_DISCONNECTED
self._prev_devices = initial
self._disconnect_time: float = 0.0
self._restart_until: float = 0.0
self._lock = threading.Lock()
qos = QoSProfile(depth=10)
self._pub = self.create_publisher(String, output_topic, qos)
self._timer = self.create_timer(1.0 / poll_rate, self._scan_cb)
# Publish initial state
self._publish(self._state)
n_dev = len(initial)
self.get_logger().info(
f"CameraHotplugNode ready — initial state: {self._state} "
f"({n_dev} device(s): {sorted(initial)})"
)
# ── Scan callback ───────────────────────────────────────────────────
def _scan_cb(self) -> None:
now = time.monotonic()
curr = self._scan_fn(self._glob)
with self._lock:
prev = self._prev_devices
state = self._state
disc_t = self._disconnect_time
rup = self._restart_until
new_state, new_disc_t, new_rup = compute_transition(
state, prev, curr, now, disc_t, rup,
self._grace_s, self._hold_s,
)
changed = new_state != state
with self._lock:
self._prev_devices = curr
self._state = new_state
self._disconnect_time = new_disc_t
self._restart_until = new_rup
if changed:
self.get_logger().info(
f"CameraHotplug: {state!r}{new_state!r} "
f"(devices: {sorted(curr) or 'none'})"
)
if changed or self._pub_always:
self._publish(new_state)
# ── Publish ─────────────────────────────────────────────────────────
def _publish(self, status: str) -> None:
msg = String()
msg.data = status
self._pub.publish(msg)
# ── Public accessors (for tests / introspection) ─────────────────────
@property
def state(self) -> str:
with self._lock:
return self._state
@property
def devices(self) -> FrozenSet[str]:
with self._lock:
return self._prev_devices
def main(args=None) -> None:
rclpy.init(args=args)
node = CameraHotplugNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -57,8 +57,10 @@ setup(
'topic_memory_node = saltybot_social.topic_memory_node:main',
# Personal space respector (Issue #310)
'personal_space_node = saltybot_social.personal_space_node:main',
# Audio wake-word detector — 'hey salty' (Issue #320)
# Audio wake-word detector — 'hey salty'
'wake_word_node = saltybot_social.wake_word_node:main',
# USB camera hot-plug monitor (Issue #320)
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
],
},
)

View File

@ -0,0 +1,630 @@
"""test_camera_hotplug.py — Offline tests for camera_hotplug_node (Issue #320).
Stubs out rclpy and ROS message types so tests run without a ROS install.
Filesystem calls are avoided by injecting a mock scan function.
"""
import importlib.util
import sys
import time
import types
import unittest
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
def _make_ros_stubs():
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
"std_msgs", "std_msgs.msg"):
if mod_name not in sys.modules:
sys.modules[mod_name] = types.ModuleType(mod_name)
class _Node:
def __init__(self, name="node"):
self._name = name
if not hasattr(self, "_params"):
self._params = {}
self._pubs = {}
self._subs = {}
self._logs = []
self._timers = []
def declare_parameter(self, name, default):
if name not in self._params:
self._params[name] = default
def get_parameter(self, name):
class _P:
def __init__(self, v): self.value = v
return _P(self._params.get(name))
def create_publisher(self, msg_type, topic, qos):
pub = _FakePub()
self._pubs[topic] = pub
return pub
def create_subscription(self, msg_type, topic, cb, qos):
self._subs[topic] = cb
return object()
def create_timer(self, period, cb):
self._timers.append(cb)
return object()
def get_logger(self):
node = self
class _L:
def info(self, m): node._logs.append(("INFO", m))
def warn(self, m): node._logs.append(("WARN", m))
def error(self, m): node._logs.append(("ERROR", m))
return _L()
def destroy_node(self): pass
class _FakePub:
def __init__(self):
self.msgs = []
def publish(self, msg):
self.msgs.append(msg)
class _QoSProfile:
def __init__(self, depth=10): self.depth = depth
class _String:
def __init__(self): self.data = ""
rclpy_mod = sys.modules["rclpy"]
rclpy_mod.init = lambda args=None: None
rclpy_mod.spin = lambda node: None
rclpy_mod.shutdown = lambda: None
sys.modules["rclpy.node"].Node = _Node
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
sys.modules["std_msgs.msg"].String = _String
return _Node, _FakePub, _String
_Node, _FakePub, _String = _make_ros_stubs()
# ── Module loader ─────────────────────────────────────────────────────────────
_SRC = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/saltybot_social/camera_hotplug_node.py"
)
def _load_mod():
spec = importlib.util.spec_from_file_location("camera_hotplug_testmod", _SRC)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _make_node(mod, initial_devices=frozenset(), **kwargs):
"""Construct a CameraHotplugNode with a mocked scan function."""
node = mod.CameraHotplugNode.__new__(mod.CameraHotplugNode)
defaults = {
"video_glob": "/dev/video*",
"poll_rate": 2.0,
"restart_grace_s": 5.0,
"restart_hold_s": 2.0,
"publish_always": False,
"output_topic": "/saltybot/camera_status",
}
defaults.update(kwargs)
node._params = dict(defaults)
# Patch scan so the initial scan (inside __init__) returns initial_devices
orig = mod.scan_video_devices
mod.scan_video_devices = lambda g: frozenset(initial_devices)
try:
mod.CameraHotplugNode.__init__(node)
finally:
mod.scan_video_devices = orig
# Keep using the mock for subsequent _scan_cb calls
node._scan_fn = lambda g: frozenset(initial_devices)
return node
def _set_scan(node, devices):
"""Replace node._scan_fn to return a fixed device set."""
node._scan_fn = lambda g: frozenset(devices)
# ── Tests: pure helpers ────────────────────────────────────────────────────────
class TestScanVideoDevices(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_returns_frozenset(self):
result = self.mod.scan_video_devices("/dev/video*")
self.assertIsInstance(result, frozenset)
def test_nonexistent_glob_returns_empty(self):
result = self.mod.scan_video_devices("/nonexistent_path_xyz_abc/video*")
self.assertEqual(result, frozenset())
def test_empty_glob_string(self):
result = self.mod.scan_video_devices("")
self.assertIsInstance(result, frozenset)
class TestStatusConstants(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_connected_string(self):
self.assertEqual(self.mod.STATUS_CONNECTED, "connected")
def test_disconnected_string(self):
self.assertEqual(self.mod.STATUS_DISCONNECTED, "disconnected")
def test_restarting_string(self):
self.assertEqual(self.mod.STATUS_RESTARTING, "restarting")
# ── Tests: compute_transition ─────────────────────────────────────────────────
class TestComputeTransition(unittest.TestCase):
"""Tests for the pure state-machine function."""
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def _tr(self, state, prev, curr, now=100.0,
disc_t=0.0, restart_until=0.0, grace=5.0, hold=2.0):
return self.mod.compute_transition(
state, frozenset(prev), frozenset(curr),
now, disc_t, restart_until, grace, hold,
)
# ── DISCONNECTED transitions ──────────────────────────────────────
def test_disc_no_devices_stays_disconnected(self):
s, _, _ = self._tr("disconnected", [], [])
self.assertEqual(s, "disconnected")
def test_disc_devices_appear_fresh_connect(self):
"""No prior disconnect → fresh connect, not restarting."""
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
disc_t=0.0, now=100.0)
self.assertEqual(s, "connected")
def test_disc_devices_appear_within_grace(self):
"""Reappears within grace window → restarting."""
# disconnected at t=95, now=t=99, grace=5 → 4 s elapsed → within
s, _, rup = self._tr("disconnected", [], ["/dev/video0"],
now=99.0, disc_t=95.0, grace=5.0, hold=2.0)
self.assertEqual(s, "restarting")
self.assertAlmostEqual(rup, 99.0 + 2.0)
def test_disc_devices_appear_outside_grace(self):
"""Reappears after grace window → connected directly."""
# disconnected at t=80, now=100, grace=5 → 20 s elapsed → outside
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
now=100.0, disc_t=80.0, grace=5.0)
self.assertEqual(s, "connected")
def test_disc_devices_appear_exactly_at_grace(self):
"""At exactly grace_s seconds the window is still inclusive → restarting."""
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
now=100.0, disc_t=95.0, grace=5.0)
# now - disc_t = 5.0, grace = 5.0 → 5.0 <= 5.0 is True → restarting
self.assertEqual(s, "restarting")
# ── CONNECTED transitions ─────────────────────────────────────────
def test_conn_same_devices_stays_connected(self):
s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video0"])
self.assertEqual(s, "connected")
def test_conn_no_devices_disconnects(self):
s, disc_t, _ = self._tr("connected", ["/dev/video0"], [], now=200.0)
self.assertEqual(s, "disconnected")
self.assertAlmostEqual(disc_t, 200.0)
def test_conn_device_added_restarting(self):
"""New device appears alongside existing → restarting."""
s, _, rup = self._tr("connected",
["/dev/video0"],
["/dev/video0", "/dev/video1"],
now=50.0, hold=2.0)
self.assertEqual(s, "restarting")
self.assertAlmostEqual(rup, 52.0)
def test_conn_device_replaced_restarting(self):
"""Different device path → restarting."""
s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video2"])
self.assertEqual(s, "restarting")
def test_conn_device_removed_partial_restarting(self):
"""One of multiple devices lost (not all) → restarting."""
s, _, _ = self._tr("connected",
["/dev/video0", "/dev/video1"],
["/dev/video0"])
self.assertEqual(s, "restarting")
def test_conn_multiple_same_stays_connected(self):
devs = ["/dev/video0", "/dev/video1"]
s, _, _ = self._tr("connected", devs, devs)
self.assertEqual(s, "connected")
# ── RESTARTING transitions ────────────────────────────────────────
def test_restarting_hold_not_elapsed_stays(self):
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=50.0, restart_until=52.0)
self.assertEqual(s, "restarting")
def test_restarting_hold_elapsed_with_devices_connected(self):
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=55.0, restart_until=52.0)
self.assertEqual(s, "connected")
def test_restarting_hold_elapsed_no_devices_disconnected(self):
s, disc_t, _ = self._tr("restarting", ["/dev/video0"], [],
now=55.0, restart_until=52.0)
self.assertEqual(s, "disconnected")
self.assertAlmostEqual(disc_t, 55.0)
def test_restarting_at_exact_hold_elapsed(self):
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=52.0, restart_until=52.0)
# now >= restart_until → resolves
self.assertEqual(s, "connected")
def test_restarting_preserves_disconnect_time(self):
_, disc_t, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=50.0, disc_t=40.0, restart_until=49.0)
# hold elapsed → connected; disconnect_time unchanged
self.assertAlmostEqual(disc_t, 40.0)
# ── Return value shapes ───────────────────────────────────────────
def test_returns_tuple_of_three(self):
result = self._tr("disconnected", [], [])
self.assertEqual(len(result), 3)
def test_new_state_is_string(self):
s, _, _ = self._tr("disconnected", [], [])
self.assertIsInstance(s, str)
def test_disconnect_time_float(self):
_, dt, _ = self._tr("connected", ["/dev/video0"], [], now=42.5)
self.assertIsInstance(dt, float)
def test_restart_until_float(self):
_, _, rup = self._tr("disconnected", [], ["/dev/video0"],
disc_t=99.0, now=100.0, grace=5.0, hold=3.0)
self.assertIsInstance(rup, float)
# ── Tests: node init ──────────────────────────────────────────────────────────
class TestNodeInit(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_instantiates_no_devices(self):
node = _make_node(self.mod, initial_devices=[])
self.assertIsNotNone(node)
def test_instantiates_with_devices(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
self.assertIsNotNone(node)
def test_initial_state_disconnected(self):
node = _make_node(self.mod, initial_devices=[])
self.assertEqual(node.state, "disconnected")
def test_initial_state_connected(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
self.assertEqual(node.state, "connected")
def test_publishes_initial_state(self):
node = _make_node(self.mod, initial_devices=[])
pub = node._pubs["/saltybot/camera_status"]
self.assertEqual(len(pub.msgs), 1)
self.assertEqual(pub.msgs[0].data, "disconnected")
def test_publishes_initial_connected(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
pub = node._pubs["/saltybot/camera_status"]
self.assertEqual(pub.msgs[0].data, "connected")
def test_publisher_registered(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/camera_status", node._pubs)
def test_timer_registered(self):
node = _make_node(self.mod)
self.assertGreater(len(node._timers), 0)
def test_custom_output_topic(self):
node = _make_node(self.mod, output_topic="/my/cam_status")
self.assertIn("/my/cam_status", node._pubs)
def test_devices_property(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
self.assertEqual(node.devices, frozenset(["/dev/video0"]))
def test_no_devices_property_empty(self):
node = _make_node(self.mod, initial_devices=[])
self.assertEqual(node.devices, frozenset())
# ── Tests: _scan_cb state transitions ────────────────────────────────────────
class TestScanCb(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def _pub(self, node):
return node._pubs["/saltybot/camera_status"]
def test_no_change_no_publish(self):
node = _make_node(self.mod, initial_devices=[])
pub = self._pub(node)
initial_count = len(pub.msgs)
_set_scan(node, [])
node._scan_cb()
self.assertEqual(len(pub.msgs), initial_count)
def test_device_appears_publishes_connected(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(self._pub(node).msgs[-1].data, "connected")
def test_device_lost_publishes_disconnected(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
_set_scan(node, [])
node._scan_cb()
self.assertEqual(self._pub(node).msgs[-1].data, "disconnected")
def test_device_replaced_publishes_restarting(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
_set_scan(node, ["/dev/video2"])
node._scan_cb()
self.assertEqual(self._pub(node).msgs[-1].data, "restarting")
def test_reconnect_within_grace_publishes_restarting(self):
"""Simulate: connected → disconnected → reconnected within grace."""
node = _make_node(self.mod, initial_devices=["/dev/video0"],
restart_grace_s=10.0)
# Simulate disconnect
_set_scan(node, [])
node._scan_cb()
self.assertEqual(node.state, "disconnected")
# Quick reconnect — disconnect_time was just set, well within grace
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
self.assertEqual(self._pub(node).msgs[-1].data, "restarting")
def test_restarting_resolves_to_connected_after_hold(self):
"""After hold expires, restarting → connected."""
node = _make_node(self.mod, initial_devices=["/dev/video0"],
restart_hold_s=0.0) # instant hold
# Device changes → restarting
_set_scan(node, ["/dev/video2"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
# Next tick: hold = 0.0 → restart_until is in the past → resolves
_set_scan(node, ["/dev/video2"])
node._scan_cb()
self.assertEqual(node.state, "connected")
def test_restarting_resolves_to_disconnected_when_no_device(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"],
restart_hold_s=0.0)
_set_scan(node, ["/dev/video2"])
node._scan_cb() # → restarting
_set_scan(node, [])
node._scan_cb() # → disconnected
self.assertEqual(node.state, "disconnected")
def test_publish_always_publishes_every_tick(self):
node = _make_node(self.mod, initial_devices=[], publish_always=True)
pub = self._pub(node)
before = len(pub.msgs)
_set_scan(node, [])
node._scan_cb()
node._scan_cb()
node._scan_cb()
self.assertEqual(len(pub.msgs), before + 3)
def test_no_publish_always_skips_unchanged(self):
node = _make_node(self.mod, initial_devices=[], publish_always=False)
pub = self._pub(node)
before = len(pub.msgs)
_set_scan(node, [])
node._scan_cb()
node._scan_cb()
# State unchanged → no additional publishes
self.assertEqual(len(pub.msgs), before)
def test_state_transitions_logged(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
infos = [m for lvl, m in node._logs if lvl == "INFO"]
self.assertTrue(any("connected" in m.lower() for m in infos))
def test_multiple_devices_connected(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0", "/dev/video1"])
node._scan_cb()
self.assertEqual(node.state, "connected")
def test_partial_loss_restarting(self):
node = _make_node(self.mod,
initial_devices=["/dev/video0", "/dev/video1"])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
def test_state_property_updated(self):
node = _make_node(self.mod, initial_devices=[])
self.assertEqual(node.state, "disconnected")
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "connected")
def test_devices_property_updated(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertIn("/dev/video0", node.devices)
def test_full_lifecycle(self):
"""Full cycle: disconnected → connected → restarting → disconnected."""
node = _make_node(self.mod, initial_devices=[],
restart_grace_s=60.0, restart_hold_s=0.0)
pub = self._pub(node)
# Boot: disconnected
self.assertEqual(node.state, "disconnected")
# Camera plugged in
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "connected")
# Camera hot-unplugged then immediately replugged
_set_scan(node, [])
node._scan_cb()
self.assertEqual(node.state, "disconnected")
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
# Hold expires (hold_s=0) → resolves to connected
node._scan_cb()
self.assertEqual(node.state, "connected")
# Camera fully removed
_set_scan(node, [])
node._scan_cb()
self.assertEqual(node.state, "disconnected")
status_sequence = [m.data for m in pub.msgs]
# Must contain all three statuses in the sequence
self.assertIn("connected", status_sequence)
self.assertIn("disconnected", status_sequence)
self.assertIn("restarting", status_sequence)
# ── Tests: _publish ───────────────────────────────────────────────────────────
class TestPublish(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod, initial_devices=[])
self.pub = self.node._pubs["/saltybot/camera_status"]
def test_publish_connected(self):
self.node._publish("connected")
self.assertEqual(self.pub.msgs[-1].data, "connected")
def test_publish_disconnected(self):
self.node._publish("disconnected")
self.assertEqual(self.pub.msgs[-1].data, "disconnected")
def test_publish_restarting(self):
self.node._publish("restarting")
self.assertEqual(self.pub.msgs[-1].data, "restarting")
def test_publish_increments_msg_count(self):
before = len(self.pub.msgs)
self.node._publish("connected")
self.assertEqual(len(self.pub.msgs), before + 1)
# ── Tests: source content ─────────────────────────────────────────────────────
class TestNodeSrc(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(_SRC) as f: cls.src = f.read()
def test_issue_tag(self): self.assertIn("#320", self.src)
def test_output_topic(self): self.assertIn("/saltybot/camera_status", self.src)
def test_dev_video_glob(self): self.assertIn("/dev/video*", self.src)
def test_status_connected(self): self.assertIn('"connected"', self.src)
def test_status_disconnected(self): self.assertIn('"disconnected"', self.src)
def test_status_restarting(self): self.assertIn('"restarting"', self.src)
def test_compute_transition_fn(self): self.assertIn("compute_transition", self.src)
def test_scan_video_devices_fn(self): self.assertIn("scan_video_devices", self.src)
def test_restart_grace_param(self): self.assertIn("restart_grace_s", self.src)
def test_restart_hold_param(self): self.assertIn("restart_hold_s", self.src)
def test_poll_rate_param(self): self.assertIn("poll_rate", self.src)
def test_publish_always_param(self): self.assertIn("publish_always", self.src)
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
def test_glob_used(self): self.assertIn("glob", self.src)
def test_main_defined(self): self.assertIn("def main", self.src)
def test_scan_fn_injectable(self): self.assertIn("_scan_fn", self.src)
def test_string_msg(self): self.assertIn("String", self.src)
# ── Tests: config / launch / setup ────────────────────────────────────────────
class TestConfig(unittest.TestCase):
_CONFIG = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/config/camera_hotplug_params.yaml"
)
_LAUNCH = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/launch/camera_hotplug.launch.py"
)
_SETUP = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/setup.py"
)
def test_config_exists(self):
import os; self.assertTrue(os.path.exists(self._CONFIG))
def test_config_video_glob(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("video_glob", c)
def test_config_poll_rate(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("poll_rate", c)
def test_config_restart_grace(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("restart_grace_s", c)
def test_config_restart_hold(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("restart_hold_s", c)
def test_launch_exists(self):
import os; self.assertTrue(os.path.exists(self._LAUNCH))
def test_launch_video_glob_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("video_glob", c)
def test_launch_restart_grace_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("restart_grace_s", c)
def test_entry_point_in_setup(self):
with open(self._SETUP) as f: c = f.read()
self.assertIn("camera_hotplug_node", c)
if __name__ == "__main__":
unittest.main()