feat(social): USB camera hot-plug monitor (Issue #320) #328

Merged
sl-jetson merged 1 commits from sl-jetson/issue-320-camera-hotplug into main 2026-03-03 06:45:24 -05:00
5 changed files with 931 additions and 1 deletions
Showing only changes of commit a3386e1694 - Show all commits

View File

@ -0,0 +1,14 @@
camera_hotplug_node:
ros__parameters:
video_glob: "/dev/video*" # glob pattern for USB camera devices
output_topic: "/saltybot/camera_status"
poll_rate: 2.0 # device scan frequency (Hz)
# Reconnect detection: if a camera reappears within restart_grace_s
# seconds of being lost, publish "restarting" instead of "connected".
restart_grace_s: 5.0 # reconnect grace window (s)
restart_hold_s: 2.0 # duration to hold "restarting" before resolving (s)
# Set true to publish status on every poll tick, not only on change.
publish_always: false

View File

@ -0,0 +1,43 @@
"""camera_hotplug.launch.py — Launch USB camera hot-plug monitor (Issue #320).
Usage:
ros2 launch saltybot_social camera_hotplug.launch.py
ros2 launch saltybot_social camera_hotplug.launch.py video_glob:=/dev/video0
ros2 launch saltybot_social camera_hotplug.launch.py restart_grace_s:=10.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "camera_hotplug_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("video_glob", default_value="/dev/video*",
description="Glob pattern for video device discovery"),
DeclareLaunchArgument("poll_rate", default_value="2.0",
description="Device scan frequency (Hz)"),
DeclareLaunchArgument("restart_grace_s", default_value="5.0",
description="Reconnect window that triggers 'restarting' (s)"),
Node(
package="saltybot_social",
executable="camera_hotplug_node",
name="camera_hotplug_node",
output="screen",
parameters=[
cfg,
{
"video_glob": LaunchConfiguration("video_glob"),
"poll_rate": LaunchConfiguration("poll_rate"),
"restart_grace_s": LaunchConfiguration("restart_grace_s"),
},
],
),
])

View File

@ -0,0 +1,241 @@
"""camera_hotplug_node.py — USB camera hot-plug monitor.
Issue #320
Polls ``/dev/video*`` at a configurable rate to detect USB camera
connect / disconnect events and publishes a status string on
/saltybot/camera_status.
Status values
"connected" one or more video devices present and stable
"disconnected" no /dev/video* devices found
"restarting" device set changed (hot-swap / reconnect); downstream
capture pipelines should restart. Held for
``restart_hold_s`` seconds then resolved to
"connected" or "disconnected" based on re-scan.
State machine
DISCONNECTED (devices appear, within grace window) RESTARTING
DISCONNECTED (devices appear, outside grace window) CONNECTED
CONNECTED (all devices lost) DISCONNECTED
CONNECTED (device set changed) RESTARTING
RESTARTING (hold_s elapsed, devices present) CONNECTED
RESTARTING (hold_s elapsed, no devices) DISCONNECTED
Initial state is set by the first scan at startup (no "restarting" on boot).
Publications
/saltybot/camera_status std_msgs/String "connected" | "disconnected"
| "restarting"
Parameters
video_glob (str, "/dev/video*") glob for device discovery
poll_rate (float, 2.0) scan frequency (Hz)
restart_grace_s (float, 5.0) reconnect window; if a camera
reappears within this many seconds
of being lost "restarting"
restart_hold_s (float, 2.0) how long to hold "restarting"
publish_always (bool, False) publish on every tick, not only
on state change
output_topic (str, "/saltybot/camera_status")
"""
from __future__ import annotations
import glob as _glob_mod
import threading
import time
from typing import FrozenSet, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
# ── Status constants ───────────────────────────────────────────────────────────
STATUS_CONNECTED = "connected"
STATUS_DISCONNECTED = "disconnected"
STATUS_RESTARTING = "restarting"
# ── Pure helpers ───────────────────────────────────────────────────────────────
def scan_video_devices(video_glob: str) -> FrozenSet[str]:
"""Return the frozenset of /dev/video* paths currently present.
Catches permission / OS errors and returns an empty frozenset.
"""
try:
return frozenset(_glob_mod.glob(video_glob))
except Exception:
return frozenset()
def compute_transition(
state: str,
prev_devices: FrozenSet[str],
curr_devices: FrozenSet[str],
now: float,
disconnect_time: float,
restart_until: float,
restart_grace_s: float,
restart_hold_s: float,
) -> Tuple[str, float, float]:
"""Pure state-machine step — no I/O, no ROS.
Parameters
----------
state : current status string
prev_devices : device set from the previous scan
curr_devices : device set from the current scan
now : current monotonic time (s)
disconnect_time : monotonic time of the last CONNECTED DISCONNECTED
transition (0.0 = never disconnected)
restart_until : monotonic time at which RESTARTING resolves
restart_grace_s : reconnect window that triggers RESTARTING
restart_hold_s : duration to hold RESTARTING before resolving
Returns
-------
(new_state, new_disconnect_time, new_restart_until)
"""
if state == STATUS_RESTARTING:
if now >= restart_until:
if curr_devices:
return STATUS_CONNECTED, disconnect_time, restart_until
return STATUS_DISCONNECTED, now, restart_until
return STATUS_RESTARTING, disconnect_time, restart_until
if state == STATUS_CONNECTED:
if not curr_devices:
# All cameras lost
return STATUS_DISCONNECTED, now, restart_until
if curr_devices != prev_devices:
# Hot-swap or device renumbering
return STATUS_RESTARTING, disconnect_time, now + restart_hold_s
return STATUS_CONNECTED, disconnect_time, restart_until
# STATUS_DISCONNECTED
if curr_devices:
# Reconnect: check if we're inside the grace window
if disconnect_time > 0.0 and (now - disconnect_time) <= restart_grace_s:
return STATUS_RESTARTING, disconnect_time, now + restart_hold_s
return STATUS_CONNECTED, disconnect_time, restart_until
return STATUS_DISCONNECTED, disconnect_time, restart_until
# ── ROS2 node ──────────────────────────────────────────────────────────────────
class CameraHotplugNode(Node):
"""Monitors /dev/video* and publishes camera connection status."""
def __init__(self) -> None:
super().__init__("camera_hotplug_node")
self.declare_parameter("video_glob", "/dev/video*")
self.declare_parameter("poll_rate", 2.0)
self.declare_parameter("restart_grace_s", 5.0)
self.declare_parameter("restart_hold_s", 2.0)
self.declare_parameter("publish_always", False)
self.declare_parameter("output_topic", "/saltybot/camera_status")
self._glob = self.get_parameter("video_glob").value
poll_rate = float(self.get_parameter("poll_rate").value)
self._grace_s = float(self.get_parameter("restart_grace_s").value)
self._hold_s = float(self.get_parameter("restart_hold_s").value)
self._pub_always = bool(self.get_parameter("publish_always").value)
output_topic = self.get_parameter("output_topic").value
# Injected scan function — allows unit-test mocking
self._scan_fn = scan_video_devices
# Perform initial scan (before any lock or timer)
initial = self._scan_fn(self._glob)
self._state = STATUS_CONNECTED if initial else STATUS_DISCONNECTED
self._prev_devices = initial
self._disconnect_time: float = 0.0
self._restart_until: float = 0.0
self._lock = threading.Lock()
qos = QoSProfile(depth=10)
self._pub = self.create_publisher(String, output_topic, qos)
self._timer = self.create_timer(1.0 / poll_rate, self._scan_cb)
# Publish initial state
self._publish(self._state)
n_dev = len(initial)
self.get_logger().info(
f"CameraHotplugNode ready — initial state: {self._state} "
f"({n_dev} device(s): {sorted(initial)})"
)
# ── Scan callback ───────────────────────────────────────────────────
def _scan_cb(self) -> None:
now = time.monotonic()
curr = self._scan_fn(self._glob)
with self._lock:
prev = self._prev_devices
state = self._state
disc_t = self._disconnect_time
rup = self._restart_until
new_state, new_disc_t, new_rup = compute_transition(
state, prev, curr, now, disc_t, rup,
self._grace_s, self._hold_s,
)
changed = new_state != state
with self._lock:
self._prev_devices = curr
self._state = new_state
self._disconnect_time = new_disc_t
self._restart_until = new_rup
if changed:
self.get_logger().info(
f"CameraHotplug: {state!r}{new_state!r} "
f"(devices: {sorted(curr) or 'none'})"
)
if changed or self._pub_always:
self._publish(new_state)
# ── Publish ─────────────────────────────────────────────────────────
def _publish(self, status: str) -> None:
msg = String()
msg.data = status
self._pub.publish(msg)
# ── Public accessors (for tests / introspection) ─────────────────────
@property
def state(self) -> str:
with self._lock:
return self._state
@property
def devices(self) -> FrozenSet[str]:
with self._lock:
return self._prev_devices
def main(args=None) -> None:
rclpy.init(args=args)
node = CameraHotplugNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -57,8 +57,10 @@ setup(
'topic_memory_node = saltybot_social.topic_memory_node:main',
# Personal space respector (Issue #310)
'personal_space_node = saltybot_social.personal_space_node:main',
# Audio wake-word detector — 'hey salty' (Issue #320)
# Audio wake-word detector — 'hey salty'
'wake_word_node = saltybot_social.wake_word_node:main',
# USB camera hot-plug monitor (Issue #320)
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
],
},
)

View File

@ -0,0 +1,630 @@
"""test_camera_hotplug.py — Offline tests for camera_hotplug_node (Issue #320).
Stubs out rclpy and ROS message types so tests run without a ROS install.
Filesystem calls are avoided by injecting a mock scan function.
"""
import importlib.util
import sys
import time
import types
import unittest
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
def _make_ros_stubs():
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
"std_msgs", "std_msgs.msg"):
if mod_name not in sys.modules:
sys.modules[mod_name] = types.ModuleType(mod_name)
class _Node:
def __init__(self, name="node"):
self._name = name
if not hasattr(self, "_params"):
self._params = {}
self._pubs = {}
self._subs = {}
self._logs = []
self._timers = []
def declare_parameter(self, name, default):
if name not in self._params:
self._params[name] = default
def get_parameter(self, name):
class _P:
def __init__(self, v): self.value = v
return _P(self._params.get(name))
def create_publisher(self, msg_type, topic, qos):
pub = _FakePub()
self._pubs[topic] = pub
return pub
def create_subscription(self, msg_type, topic, cb, qos):
self._subs[topic] = cb
return object()
def create_timer(self, period, cb):
self._timers.append(cb)
return object()
def get_logger(self):
node = self
class _L:
def info(self, m): node._logs.append(("INFO", m))
def warn(self, m): node._logs.append(("WARN", m))
def error(self, m): node._logs.append(("ERROR", m))
return _L()
def destroy_node(self): pass
class _FakePub:
def __init__(self):
self.msgs = []
def publish(self, msg):
self.msgs.append(msg)
class _QoSProfile:
def __init__(self, depth=10): self.depth = depth
class _String:
def __init__(self): self.data = ""
rclpy_mod = sys.modules["rclpy"]
rclpy_mod.init = lambda args=None: None
rclpy_mod.spin = lambda node: None
rclpy_mod.shutdown = lambda: None
sys.modules["rclpy.node"].Node = _Node
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
sys.modules["std_msgs.msg"].String = _String
return _Node, _FakePub, _String
_Node, _FakePub, _String = _make_ros_stubs()
# ── Module loader ─────────────────────────────────────────────────────────────
_SRC = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/saltybot_social/camera_hotplug_node.py"
)
def _load_mod():
spec = importlib.util.spec_from_file_location("camera_hotplug_testmod", _SRC)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _make_node(mod, initial_devices=frozenset(), **kwargs):
"""Construct a CameraHotplugNode with a mocked scan function."""
node = mod.CameraHotplugNode.__new__(mod.CameraHotplugNode)
defaults = {
"video_glob": "/dev/video*",
"poll_rate": 2.0,
"restart_grace_s": 5.0,
"restart_hold_s": 2.0,
"publish_always": False,
"output_topic": "/saltybot/camera_status",
}
defaults.update(kwargs)
node._params = dict(defaults)
# Patch scan so the initial scan (inside __init__) returns initial_devices
orig = mod.scan_video_devices
mod.scan_video_devices = lambda g: frozenset(initial_devices)
try:
mod.CameraHotplugNode.__init__(node)
finally:
mod.scan_video_devices = orig
# Keep using the mock for subsequent _scan_cb calls
node._scan_fn = lambda g: frozenset(initial_devices)
return node
def _set_scan(node, devices):
"""Replace node._scan_fn to return a fixed device set."""
node._scan_fn = lambda g: frozenset(devices)
# ── Tests: pure helpers ────────────────────────────────────────────────────────
class TestScanVideoDevices(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_returns_frozenset(self):
result = self.mod.scan_video_devices("/dev/video*")
self.assertIsInstance(result, frozenset)
def test_nonexistent_glob_returns_empty(self):
result = self.mod.scan_video_devices("/nonexistent_path_xyz_abc/video*")
self.assertEqual(result, frozenset())
def test_empty_glob_string(self):
result = self.mod.scan_video_devices("")
self.assertIsInstance(result, frozenset)
class TestStatusConstants(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_connected_string(self):
self.assertEqual(self.mod.STATUS_CONNECTED, "connected")
def test_disconnected_string(self):
self.assertEqual(self.mod.STATUS_DISCONNECTED, "disconnected")
def test_restarting_string(self):
self.assertEqual(self.mod.STATUS_RESTARTING, "restarting")
# ── Tests: compute_transition ─────────────────────────────────────────────────
class TestComputeTransition(unittest.TestCase):
"""Tests for the pure state-machine function."""
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def _tr(self, state, prev, curr, now=100.0,
disc_t=0.0, restart_until=0.0, grace=5.0, hold=2.0):
return self.mod.compute_transition(
state, frozenset(prev), frozenset(curr),
now, disc_t, restart_until, grace, hold,
)
# ── DISCONNECTED transitions ──────────────────────────────────────
def test_disc_no_devices_stays_disconnected(self):
s, _, _ = self._tr("disconnected", [], [])
self.assertEqual(s, "disconnected")
def test_disc_devices_appear_fresh_connect(self):
"""No prior disconnect → fresh connect, not restarting."""
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
disc_t=0.0, now=100.0)
self.assertEqual(s, "connected")
def test_disc_devices_appear_within_grace(self):
"""Reappears within grace window → restarting."""
# disconnected at t=95, now=t=99, grace=5 → 4 s elapsed → within
s, _, rup = self._tr("disconnected", [], ["/dev/video0"],
now=99.0, disc_t=95.0, grace=5.0, hold=2.0)
self.assertEqual(s, "restarting")
self.assertAlmostEqual(rup, 99.0 + 2.0)
def test_disc_devices_appear_outside_grace(self):
"""Reappears after grace window → connected directly."""
# disconnected at t=80, now=100, grace=5 → 20 s elapsed → outside
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
now=100.0, disc_t=80.0, grace=5.0)
self.assertEqual(s, "connected")
def test_disc_devices_appear_exactly_at_grace(self):
"""At exactly grace_s seconds the window is still inclusive → restarting."""
s, _, _ = self._tr("disconnected", [], ["/dev/video0"],
now=100.0, disc_t=95.0, grace=5.0)
# now - disc_t = 5.0, grace = 5.0 → 5.0 <= 5.0 is True → restarting
self.assertEqual(s, "restarting")
# ── CONNECTED transitions ─────────────────────────────────────────
def test_conn_same_devices_stays_connected(self):
s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video0"])
self.assertEqual(s, "connected")
def test_conn_no_devices_disconnects(self):
s, disc_t, _ = self._tr("connected", ["/dev/video0"], [], now=200.0)
self.assertEqual(s, "disconnected")
self.assertAlmostEqual(disc_t, 200.0)
def test_conn_device_added_restarting(self):
"""New device appears alongside existing → restarting."""
s, _, rup = self._tr("connected",
["/dev/video0"],
["/dev/video0", "/dev/video1"],
now=50.0, hold=2.0)
self.assertEqual(s, "restarting")
self.assertAlmostEqual(rup, 52.0)
def test_conn_device_replaced_restarting(self):
"""Different device path → restarting."""
s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video2"])
self.assertEqual(s, "restarting")
def test_conn_device_removed_partial_restarting(self):
"""One of multiple devices lost (not all) → restarting."""
s, _, _ = self._tr("connected",
["/dev/video0", "/dev/video1"],
["/dev/video0"])
self.assertEqual(s, "restarting")
def test_conn_multiple_same_stays_connected(self):
devs = ["/dev/video0", "/dev/video1"]
s, _, _ = self._tr("connected", devs, devs)
self.assertEqual(s, "connected")
# ── RESTARTING transitions ────────────────────────────────────────
def test_restarting_hold_not_elapsed_stays(self):
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=50.0, restart_until=52.0)
self.assertEqual(s, "restarting")
def test_restarting_hold_elapsed_with_devices_connected(self):
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=55.0, restart_until=52.0)
self.assertEqual(s, "connected")
def test_restarting_hold_elapsed_no_devices_disconnected(self):
s, disc_t, _ = self._tr("restarting", ["/dev/video0"], [],
now=55.0, restart_until=52.0)
self.assertEqual(s, "disconnected")
self.assertAlmostEqual(disc_t, 55.0)
def test_restarting_at_exact_hold_elapsed(self):
s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=52.0, restart_until=52.0)
# now >= restart_until → resolves
self.assertEqual(s, "connected")
def test_restarting_preserves_disconnect_time(self):
_, disc_t, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"],
now=50.0, disc_t=40.0, restart_until=49.0)
# hold elapsed → connected; disconnect_time unchanged
self.assertAlmostEqual(disc_t, 40.0)
# ── Return value shapes ───────────────────────────────────────────
def test_returns_tuple_of_three(self):
result = self._tr("disconnected", [], [])
self.assertEqual(len(result), 3)
def test_new_state_is_string(self):
s, _, _ = self._tr("disconnected", [], [])
self.assertIsInstance(s, str)
def test_disconnect_time_float(self):
_, dt, _ = self._tr("connected", ["/dev/video0"], [], now=42.5)
self.assertIsInstance(dt, float)
def test_restart_until_float(self):
_, _, rup = self._tr("disconnected", [], ["/dev/video0"],
disc_t=99.0, now=100.0, grace=5.0, hold=3.0)
self.assertIsInstance(rup, float)
# ── Tests: node init ──────────────────────────────────────────────────────────
class TestNodeInit(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_instantiates_no_devices(self):
node = _make_node(self.mod, initial_devices=[])
self.assertIsNotNone(node)
def test_instantiates_with_devices(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
self.assertIsNotNone(node)
def test_initial_state_disconnected(self):
node = _make_node(self.mod, initial_devices=[])
self.assertEqual(node.state, "disconnected")
def test_initial_state_connected(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
self.assertEqual(node.state, "connected")
def test_publishes_initial_state(self):
node = _make_node(self.mod, initial_devices=[])
pub = node._pubs["/saltybot/camera_status"]
self.assertEqual(len(pub.msgs), 1)
self.assertEqual(pub.msgs[0].data, "disconnected")
def test_publishes_initial_connected(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
pub = node._pubs["/saltybot/camera_status"]
self.assertEqual(pub.msgs[0].data, "connected")
def test_publisher_registered(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/camera_status", node._pubs)
def test_timer_registered(self):
node = _make_node(self.mod)
self.assertGreater(len(node._timers), 0)
def test_custom_output_topic(self):
node = _make_node(self.mod, output_topic="/my/cam_status")
self.assertIn("/my/cam_status", node._pubs)
def test_devices_property(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
self.assertEqual(node.devices, frozenset(["/dev/video0"]))
def test_no_devices_property_empty(self):
node = _make_node(self.mod, initial_devices=[])
self.assertEqual(node.devices, frozenset())
# ── Tests: _scan_cb state transitions ────────────────────────────────────────
class TestScanCb(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def _pub(self, node):
return node._pubs["/saltybot/camera_status"]
def test_no_change_no_publish(self):
node = _make_node(self.mod, initial_devices=[])
pub = self._pub(node)
initial_count = len(pub.msgs)
_set_scan(node, [])
node._scan_cb()
self.assertEqual(len(pub.msgs), initial_count)
def test_device_appears_publishes_connected(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(self._pub(node).msgs[-1].data, "connected")
def test_device_lost_publishes_disconnected(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
_set_scan(node, [])
node._scan_cb()
self.assertEqual(self._pub(node).msgs[-1].data, "disconnected")
def test_device_replaced_publishes_restarting(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"])
_set_scan(node, ["/dev/video2"])
node._scan_cb()
self.assertEqual(self._pub(node).msgs[-1].data, "restarting")
def test_reconnect_within_grace_publishes_restarting(self):
"""Simulate: connected → disconnected → reconnected within grace."""
node = _make_node(self.mod, initial_devices=["/dev/video0"],
restart_grace_s=10.0)
# Simulate disconnect
_set_scan(node, [])
node._scan_cb()
self.assertEqual(node.state, "disconnected")
# Quick reconnect — disconnect_time was just set, well within grace
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
self.assertEqual(self._pub(node).msgs[-1].data, "restarting")
def test_restarting_resolves_to_connected_after_hold(self):
"""After hold expires, restarting → connected."""
node = _make_node(self.mod, initial_devices=["/dev/video0"],
restart_hold_s=0.0) # instant hold
# Device changes → restarting
_set_scan(node, ["/dev/video2"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
# Next tick: hold = 0.0 → restart_until is in the past → resolves
_set_scan(node, ["/dev/video2"])
node._scan_cb()
self.assertEqual(node.state, "connected")
def test_restarting_resolves_to_disconnected_when_no_device(self):
node = _make_node(self.mod, initial_devices=["/dev/video0"],
restart_hold_s=0.0)
_set_scan(node, ["/dev/video2"])
node._scan_cb() # → restarting
_set_scan(node, [])
node._scan_cb() # → disconnected
self.assertEqual(node.state, "disconnected")
def test_publish_always_publishes_every_tick(self):
node = _make_node(self.mod, initial_devices=[], publish_always=True)
pub = self._pub(node)
before = len(pub.msgs)
_set_scan(node, [])
node._scan_cb()
node._scan_cb()
node._scan_cb()
self.assertEqual(len(pub.msgs), before + 3)
def test_no_publish_always_skips_unchanged(self):
node = _make_node(self.mod, initial_devices=[], publish_always=False)
pub = self._pub(node)
before = len(pub.msgs)
_set_scan(node, [])
node._scan_cb()
node._scan_cb()
# State unchanged → no additional publishes
self.assertEqual(len(pub.msgs), before)
def test_state_transitions_logged(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
infos = [m for lvl, m in node._logs if lvl == "INFO"]
self.assertTrue(any("connected" in m.lower() for m in infos))
def test_multiple_devices_connected(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0", "/dev/video1"])
node._scan_cb()
self.assertEqual(node.state, "connected")
def test_partial_loss_restarting(self):
node = _make_node(self.mod,
initial_devices=["/dev/video0", "/dev/video1"])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
def test_state_property_updated(self):
node = _make_node(self.mod, initial_devices=[])
self.assertEqual(node.state, "disconnected")
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "connected")
def test_devices_property_updated(self):
node = _make_node(self.mod, initial_devices=[])
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertIn("/dev/video0", node.devices)
def test_full_lifecycle(self):
"""Full cycle: disconnected → connected → restarting → disconnected."""
node = _make_node(self.mod, initial_devices=[],
restart_grace_s=60.0, restart_hold_s=0.0)
pub = self._pub(node)
# Boot: disconnected
self.assertEqual(node.state, "disconnected")
# Camera plugged in
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "connected")
# Camera hot-unplugged then immediately replugged
_set_scan(node, [])
node._scan_cb()
self.assertEqual(node.state, "disconnected")
_set_scan(node, ["/dev/video0"])
node._scan_cb()
self.assertEqual(node.state, "restarting")
# Hold expires (hold_s=0) → resolves to connected
node._scan_cb()
self.assertEqual(node.state, "connected")
# Camera fully removed
_set_scan(node, [])
node._scan_cb()
self.assertEqual(node.state, "disconnected")
status_sequence = [m.data for m in pub.msgs]
# Must contain all three statuses in the sequence
self.assertIn("connected", status_sequence)
self.assertIn("disconnected", status_sequence)
self.assertIn("restarting", status_sequence)
# ── Tests: _publish ───────────────────────────────────────────────────────────
class TestPublish(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod, initial_devices=[])
self.pub = self.node._pubs["/saltybot/camera_status"]
def test_publish_connected(self):
self.node._publish("connected")
self.assertEqual(self.pub.msgs[-1].data, "connected")
def test_publish_disconnected(self):
self.node._publish("disconnected")
self.assertEqual(self.pub.msgs[-1].data, "disconnected")
def test_publish_restarting(self):
self.node._publish("restarting")
self.assertEqual(self.pub.msgs[-1].data, "restarting")
def test_publish_increments_msg_count(self):
before = len(self.pub.msgs)
self.node._publish("connected")
self.assertEqual(len(self.pub.msgs), before + 1)
# ── Tests: source content ─────────────────────────────────────────────────────
class TestNodeSrc(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(_SRC) as f: cls.src = f.read()
def test_issue_tag(self): self.assertIn("#320", self.src)
def test_output_topic(self): self.assertIn("/saltybot/camera_status", self.src)
def test_dev_video_glob(self): self.assertIn("/dev/video*", self.src)
def test_status_connected(self): self.assertIn('"connected"', self.src)
def test_status_disconnected(self): self.assertIn('"disconnected"', self.src)
def test_status_restarting(self): self.assertIn('"restarting"', self.src)
def test_compute_transition_fn(self): self.assertIn("compute_transition", self.src)
def test_scan_video_devices_fn(self): self.assertIn("scan_video_devices", self.src)
def test_restart_grace_param(self): self.assertIn("restart_grace_s", self.src)
def test_restart_hold_param(self): self.assertIn("restart_hold_s", self.src)
def test_poll_rate_param(self): self.assertIn("poll_rate", self.src)
def test_publish_always_param(self): self.assertIn("publish_always", self.src)
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
def test_glob_used(self): self.assertIn("glob", self.src)
def test_main_defined(self): self.assertIn("def main", self.src)
def test_scan_fn_injectable(self): self.assertIn("_scan_fn", self.src)
def test_string_msg(self): self.assertIn("String", self.src)
# ── Tests: config / launch / setup ────────────────────────────────────────────
class TestConfig(unittest.TestCase):
_CONFIG = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/config/camera_hotplug_params.yaml"
)
_LAUNCH = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/launch/camera_hotplug.launch.py"
)
_SETUP = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/setup.py"
)
def test_config_exists(self):
import os; self.assertTrue(os.path.exists(self._CONFIG))
def test_config_video_glob(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("video_glob", c)
def test_config_poll_rate(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("poll_rate", c)
def test_config_restart_grace(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("restart_grace_s", c)
def test_config_restart_hold(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("restart_hold_s", c)
def test_launch_exists(self):
import os; self.assertTrue(os.path.exists(self._LAUNCH))
def test_launch_video_glob_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("video_glob", c)
def test_launch_restart_grace_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("restart_grace_s", c)
def test_entry_point_in_setup(self):
with open(self._SETUP) as f: c = f.read()
self.assertIn("camera_hotplug_node", c)
if __name__ == "__main__":
unittest.main()