diff --git a/jetson/ros2_ws/src/saltybot_social/config/camera_hotplug_params.yaml b/jetson/ros2_ws/src/saltybot_social/config/camera_hotplug_params.yaml new file mode 100644 index 0000000..5a8927d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/config/camera_hotplug_params.yaml @@ -0,0 +1,14 @@ +camera_hotplug_node: + ros__parameters: + video_glob: "/dev/video*" # glob pattern for USB camera devices + output_topic: "/saltybot/camera_status" + + poll_rate: 2.0 # device scan frequency (Hz) + + # Reconnect detection: if a camera reappears within restart_grace_s + # seconds of being lost, publish "restarting" instead of "connected". + restart_grace_s: 5.0 # reconnect grace window (s) + restart_hold_s: 2.0 # duration to hold "restarting" before resolving (s) + + # Set true to publish status on every poll tick, not only on change. + publish_always: false diff --git a/jetson/ros2_ws/src/saltybot_social/launch/camera_hotplug.launch.py b/jetson/ros2_ws/src/saltybot_social/launch/camera_hotplug.launch.py new file mode 100644 index 0000000..55e5ece --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/launch/camera_hotplug.launch.py @@ -0,0 +1,43 @@ +"""camera_hotplug.launch.py — Launch USB camera hot-plug monitor (Issue #320). + +Usage: + ros2 launch saltybot_social camera_hotplug.launch.py + ros2 launch saltybot_social camera_hotplug.launch.py video_glob:=/dev/video0 + ros2 launch saltybot_social camera_hotplug.launch.py restart_grace_s:=10.0 +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg = get_package_share_directory("saltybot_social") + cfg = os.path.join(pkg, "config", "camera_hotplug_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("video_glob", default_value="/dev/video*", + description="Glob pattern for video device discovery"), + DeclareLaunchArgument("poll_rate", default_value="2.0", + description="Device scan frequency (Hz)"), + DeclareLaunchArgument("restart_grace_s", default_value="5.0", + description="Reconnect window that triggers 'restarting' (s)"), + + Node( + package="saltybot_social", + executable="camera_hotplug_node", + name="camera_hotplug_node", + output="screen", + parameters=[ + cfg, + { + "video_glob": LaunchConfiguration("video_glob"), + "poll_rate": LaunchConfiguration("poll_rate"), + "restart_grace_s": LaunchConfiguration("restart_grace_s"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_social/saltybot_social/camera_hotplug_node.py b/jetson/ros2_ws/src/saltybot_social/saltybot_social/camera_hotplug_node.py new file mode 100644 index 0000000..f56d8d0 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/saltybot_social/camera_hotplug_node.py @@ -0,0 +1,241 @@ +"""camera_hotplug_node.py — USB camera hot-plug monitor. +Issue #320 + +Polls ``/dev/video*`` at a configurable rate to detect USB camera +connect / disconnect events and publishes a status string on +/saltybot/camera_status. + +Status values +───────────── + "connected" — one or more video devices present and stable + "disconnected" — no /dev/video* devices found + "restarting" — device set changed (hot-swap / reconnect); downstream + capture pipelines should restart. Held for + ``restart_hold_s`` seconds then resolved to + "connected" or "disconnected" based on re-scan. + +State machine +───────────── + DISCONNECTED ──(devices appear, within grace window)──► RESTARTING + DISCONNECTED ──(devices appear, outside grace window)─► CONNECTED + CONNECTED ──(all devices lost)──────────────────────► DISCONNECTED + CONNECTED ──(device set changed)────────────────────► RESTARTING + RESTARTING ──(hold_s elapsed, devices present)───────► CONNECTED + RESTARTING ──(hold_s elapsed, no devices)────────────► DISCONNECTED + + Initial state is set by the first scan at startup (no "restarting" on boot). + +Publications +──────────── + /saltybot/camera_status std_msgs/String — "connected" | "disconnected" + | "restarting" + +Parameters +────────── + video_glob (str, "/dev/video*") glob for device discovery + poll_rate (float, 2.0) scan frequency (Hz) + restart_grace_s (float, 5.0) reconnect window; if a camera + reappears within this many seconds + of being lost → "restarting" + restart_hold_s (float, 2.0) how long to hold "restarting" + publish_always (bool, False) publish on every tick, not only + on state change + output_topic (str, "/saltybot/camera_status") +""" + +from __future__ import annotations + +import glob as _glob_mod +import threading +import time +from typing import FrozenSet, Tuple + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile +from std_msgs.msg import String + + +# ── Status constants ─────────────────────────────────────────────────────────── + +STATUS_CONNECTED = "connected" +STATUS_DISCONNECTED = "disconnected" +STATUS_RESTARTING = "restarting" + + +# ── Pure helpers ─────────────────────────────────────────────────────────────── + +def scan_video_devices(video_glob: str) -> FrozenSet[str]: + """Return the frozenset of /dev/video* paths currently present. + + Catches permission / OS errors and returns an empty frozenset. + """ + try: + return frozenset(_glob_mod.glob(video_glob)) + except Exception: + return frozenset() + + +def compute_transition( + state: str, + prev_devices: FrozenSet[str], + curr_devices: FrozenSet[str], + now: float, + disconnect_time: float, + restart_until: float, + restart_grace_s: float, + restart_hold_s: float, +) -> Tuple[str, float, float]: + """Pure state-machine step — no I/O, no ROS. + + Parameters + ---------- + state : current status string + prev_devices : device set from the previous scan + curr_devices : device set from the current scan + now : current monotonic time (s) + disconnect_time : monotonic time of the last CONNECTED → DISCONNECTED + transition (0.0 = never disconnected) + restart_until : monotonic time at which RESTARTING resolves + restart_grace_s : reconnect window that triggers RESTARTING + restart_hold_s : duration to hold RESTARTING before resolving + + Returns + ------- + (new_state, new_disconnect_time, new_restart_until) + """ + if state == STATUS_RESTARTING: + if now >= restart_until: + if curr_devices: + return STATUS_CONNECTED, disconnect_time, restart_until + return STATUS_DISCONNECTED, now, restart_until + return STATUS_RESTARTING, disconnect_time, restart_until + + if state == STATUS_CONNECTED: + if not curr_devices: + # All cameras lost + return STATUS_DISCONNECTED, now, restart_until + if curr_devices != prev_devices: + # Hot-swap or device renumbering + return STATUS_RESTARTING, disconnect_time, now + restart_hold_s + return STATUS_CONNECTED, disconnect_time, restart_until + + # STATUS_DISCONNECTED + if curr_devices: + # Reconnect: check if we're inside the grace window + if disconnect_time > 0.0 and (now - disconnect_time) <= restart_grace_s: + return STATUS_RESTARTING, disconnect_time, now + restart_hold_s + return STATUS_CONNECTED, disconnect_time, restart_until + return STATUS_DISCONNECTED, disconnect_time, restart_until + + +# ── ROS2 node ────────────────────────────────────────────────────────────────── + +class CameraHotplugNode(Node): + """Monitors /dev/video* and publishes camera connection status.""" + + def __init__(self) -> None: + super().__init__("camera_hotplug_node") + + self.declare_parameter("video_glob", "/dev/video*") + self.declare_parameter("poll_rate", 2.0) + self.declare_parameter("restart_grace_s", 5.0) + self.declare_parameter("restart_hold_s", 2.0) + self.declare_parameter("publish_always", False) + self.declare_parameter("output_topic", "/saltybot/camera_status") + + self._glob = self.get_parameter("video_glob").value + poll_rate = float(self.get_parameter("poll_rate").value) + self._grace_s = float(self.get_parameter("restart_grace_s").value) + self._hold_s = float(self.get_parameter("restart_hold_s").value) + self._pub_always = bool(self.get_parameter("publish_always").value) + output_topic = self.get_parameter("output_topic").value + + # Injected scan function — allows unit-test mocking + self._scan_fn = scan_video_devices + + # Perform initial scan (before any lock or timer) + initial = self._scan_fn(self._glob) + self._state = STATUS_CONNECTED if initial else STATUS_DISCONNECTED + self._prev_devices = initial + self._disconnect_time: float = 0.0 + self._restart_until: float = 0.0 + self._lock = threading.Lock() + + qos = QoSProfile(depth=10) + self._pub = self.create_publisher(String, output_topic, qos) + self._timer = self.create_timer(1.0 / poll_rate, self._scan_cb) + + # Publish initial state + self._publish(self._state) + + n_dev = len(initial) + self.get_logger().info( + f"CameraHotplugNode ready — initial state: {self._state} " + f"({n_dev} device(s): {sorted(initial)})" + ) + + # ── Scan callback ─────────────────────────────────────────────────── + + def _scan_cb(self) -> None: + now = time.monotonic() + curr = self._scan_fn(self._glob) + + with self._lock: + prev = self._prev_devices + state = self._state + disc_t = self._disconnect_time + rup = self._restart_until + + new_state, new_disc_t, new_rup = compute_transition( + state, prev, curr, now, disc_t, rup, + self._grace_s, self._hold_s, + ) + + changed = new_state != state + + with self._lock: + self._prev_devices = curr + self._state = new_state + self._disconnect_time = new_disc_t + self._restart_until = new_rup + + if changed: + self.get_logger().info( + f"CameraHotplug: {state!r} → {new_state!r} " + f"(devices: {sorted(curr) or 'none'})" + ) + + if changed or self._pub_always: + self._publish(new_state) + + # ── Publish ───────────────────────────────────────────────────────── + + def _publish(self, status: str) -> None: + msg = String() + msg.data = status + self._pub.publish(msg) + + # ── Public accessors (for tests / introspection) ───────────────────── + + @property + def state(self) -> str: + with self._lock: + return self._state + + @property + def devices(self) -> FrozenSet[str]: + with self._lock: + return self._prev_devices + + +def main(args=None) -> None: + rclpy.init(args=args) + node = CameraHotplugNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() diff --git a/jetson/ros2_ws/src/saltybot_social/setup.py b/jetson/ros2_ws/src/saltybot_social/setup.py index b9ba59a..a2fcc0b 100644 --- a/jetson/ros2_ws/src/saltybot_social/setup.py +++ b/jetson/ros2_ws/src/saltybot_social/setup.py @@ -57,8 +57,10 @@ setup( 'topic_memory_node = saltybot_social.topic_memory_node:main', # Personal space respector (Issue #310) 'personal_space_node = saltybot_social.personal_space_node:main', - # Audio wake-word detector — 'hey salty' (Issue #320) + # Audio wake-word detector — 'hey salty' 'wake_word_node = saltybot_social.wake_word_node:main', + # USB camera hot-plug monitor (Issue #320) + 'camera_hotplug_node = saltybot_social.camera_hotplug_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_social/test/test_camera_hotplug.py b/jetson/ros2_ws/src/saltybot_social/test/test_camera_hotplug.py new file mode 100644 index 0000000..32f8110 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/test/test_camera_hotplug.py @@ -0,0 +1,630 @@ +"""test_camera_hotplug.py — Offline tests for camera_hotplug_node (Issue #320). + +Stubs out rclpy and ROS message types so tests run without a ROS install. +Filesystem calls are avoided by injecting a mock scan function. +""" + +import importlib.util +import sys +import time +import types +import unittest + + +# ── ROS2 stubs ──────────────────────────────────────────────────────────────── + +def _make_ros_stubs(): + for mod_name in ("rclpy", "rclpy.node", "rclpy.qos", + "std_msgs", "std_msgs.msg"): + if mod_name not in sys.modules: + sys.modules[mod_name] = types.ModuleType(mod_name) + + class _Node: + def __init__(self, name="node"): + self._name = name + if not hasattr(self, "_params"): + self._params = {} + self._pubs = {} + self._subs = {} + self._logs = [] + self._timers = [] + + def declare_parameter(self, name, default): + if name not in self._params: + self._params[name] = default + + def get_parameter(self, name): + class _P: + def __init__(self, v): self.value = v + return _P(self._params.get(name)) + + def create_publisher(self, msg_type, topic, qos): + pub = _FakePub() + self._pubs[topic] = pub + return pub + + def create_subscription(self, msg_type, topic, cb, qos): + self._subs[topic] = cb + return object() + + def create_timer(self, period, cb): + self._timers.append(cb) + return object() + + def get_logger(self): + node = self + class _L: + def info(self, m): node._logs.append(("INFO", m)) + def warn(self, m): node._logs.append(("WARN", m)) + def error(self, m): node._logs.append(("ERROR", m)) + return _L() + + def destroy_node(self): pass + + class _FakePub: + def __init__(self): + self.msgs = [] + def publish(self, msg): + self.msgs.append(msg) + + class _QoSProfile: + def __init__(self, depth=10): self.depth = depth + + class _String: + def __init__(self): self.data = "" + + rclpy_mod = sys.modules["rclpy"] + rclpy_mod.init = lambda args=None: None + rclpy_mod.spin = lambda node: None + rclpy_mod.shutdown = lambda: None + + sys.modules["rclpy.node"].Node = _Node + sys.modules["rclpy.qos"].QoSProfile = _QoSProfile + sys.modules["std_msgs.msg"].String = _String + + return _Node, _FakePub, _String + + +_Node, _FakePub, _String = _make_ros_stubs() + + +# ── Module loader ───────────────────────────────────────────────────────────── + +_SRC = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/saltybot_social/camera_hotplug_node.py" +) + + +def _load_mod(): + spec = importlib.util.spec_from_file_location("camera_hotplug_testmod", _SRC) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def _make_node(mod, initial_devices=frozenset(), **kwargs): + """Construct a CameraHotplugNode with a mocked scan function.""" + node = mod.CameraHotplugNode.__new__(mod.CameraHotplugNode) + defaults = { + "video_glob": "/dev/video*", + "poll_rate": 2.0, + "restart_grace_s": 5.0, + "restart_hold_s": 2.0, + "publish_always": False, + "output_topic": "/saltybot/camera_status", + } + defaults.update(kwargs) + node._params = dict(defaults) + + # Patch scan so the initial scan (inside __init__) returns initial_devices + orig = mod.scan_video_devices + mod.scan_video_devices = lambda g: frozenset(initial_devices) + try: + mod.CameraHotplugNode.__init__(node) + finally: + mod.scan_video_devices = orig + + # Keep using the mock for subsequent _scan_cb calls + node._scan_fn = lambda g: frozenset(initial_devices) + return node + + +def _set_scan(node, devices): + """Replace node._scan_fn to return a fixed device set.""" + node._scan_fn = lambda g: frozenset(devices) + + +# ── Tests: pure helpers ──────────────────────────────────────────────────────── + +class TestScanVideoDevices(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_returns_frozenset(self): + result = self.mod.scan_video_devices("/dev/video*") + self.assertIsInstance(result, frozenset) + + def test_nonexistent_glob_returns_empty(self): + result = self.mod.scan_video_devices("/nonexistent_path_xyz_abc/video*") + self.assertEqual(result, frozenset()) + + def test_empty_glob_string(self): + result = self.mod.scan_video_devices("") + self.assertIsInstance(result, frozenset) + + +class TestStatusConstants(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_connected_string(self): + self.assertEqual(self.mod.STATUS_CONNECTED, "connected") + + def test_disconnected_string(self): + self.assertEqual(self.mod.STATUS_DISCONNECTED, "disconnected") + + def test_restarting_string(self): + self.assertEqual(self.mod.STATUS_RESTARTING, "restarting") + + +# ── Tests: compute_transition ───────────────────────────────────────────────── + +class TestComputeTransition(unittest.TestCase): + """Tests for the pure state-machine function.""" + + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def _tr(self, state, prev, curr, now=100.0, + disc_t=0.0, restart_until=0.0, grace=5.0, hold=2.0): + return self.mod.compute_transition( + state, frozenset(prev), frozenset(curr), + now, disc_t, restart_until, grace, hold, + ) + + # ── DISCONNECTED transitions ────────────────────────────────────── + + def test_disc_no_devices_stays_disconnected(self): + s, _, _ = self._tr("disconnected", [], []) + self.assertEqual(s, "disconnected") + + def test_disc_devices_appear_fresh_connect(self): + """No prior disconnect → fresh connect, not restarting.""" + s, _, _ = self._tr("disconnected", [], ["/dev/video0"], + disc_t=0.0, now=100.0) + self.assertEqual(s, "connected") + + def test_disc_devices_appear_within_grace(self): + """Reappears within grace window → restarting.""" + # disconnected at t=95, now=t=99, grace=5 → 4 s elapsed → within + s, _, rup = self._tr("disconnected", [], ["/dev/video0"], + now=99.0, disc_t=95.0, grace=5.0, hold=2.0) + self.assertEqual(s, "restarting") + self.assertAlmostEqual(rup, 99.0 + 2.0) + + def test_disc_devices_appear_outside_grace(self): + """Reappears after grace window → connected directly.""" + # disconnected at t=80, now=100, grace=5 → 20 s elapsed → outside + s, _, _ = self._tr("disconnected", [], ["/dev/video0"], + now=100.0, disc_t=80.0, grace=5.0) + self.assertEqual(s, "connected") + + def test_disc_devices_appear_exactly_at_grace(self): + """At exactly grace_s seconds the window is still inclusive → restarting.""" + s, _, _ = self._tr("disconnected", [], ["/dev/video0"], + now=100.0, disc_t=95.0, grace=5.0) + # now - disc_t = 5.0, grace = 5.0 → 5.0 <= 5.0 is True → restarting + self.assertEqual(s, "restarting") + + # ── CONNECTED transitions ───────────────────────────────────────── + + def test_conn_same_devices_stays_connected(self): + s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video0"]) + self.assertEqual(s, "connected") + + def test_conn_no_devices_disconnects(self): + s, disc_t, _ = self._tr("connected", ["/dev/video0"], [], now=200.0) + self.assertEqual(s, "disconnected") + self.assertAlmostEqual(disc_t, 200.0) + + def test_conn_device_added_restarting(self): + """New device appears alongside existing → restarting.""" + s, _, rup = self._tr("connected", + ["/dev/video0"], + ["/dev/video0", "/dev/video1"], + now=50.0, hold=2.0) + self.assertEqual(s, "restarting") + self.assertAlmostEqual(rup, 52.0) + + def test_conn_device_replaced_restarting(self): + """Different device path → restarting.""" + s, _, _ = self._tr("connected", ["/dev/video0"], ["/dev/video2"]) + self.assertEqual(s, "restarting") + + def test_conn_device_removed_partial_restarting(self): + """One of multiple devices lost (not all) → restarting.""" + s, _, _ = self._tr("connected", + ["/dev/video0", "/dev/video1"], + ["/dev/video0"]) + self.assertEqual(s, "restarting") + + def test_conn_multiple_same_stays_connected(self): + devs = ["/dev/video0", "/dev/video1"] + s, _, _ = self._tr("connected", devs, devs) + self.assertEqual(s, "connected") + + # ── RESTARTING transitions ──────────────────────────────────────── + + def test_restarting_hold_not_elapsed_stays(self): + s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"], + now=50.0, restart_until=52.0) + self.assertEqual(s, "restarting") + + def test_restarting_hold_elapsed_with_devices_connected(self): + s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"], + now=55.0, restart_until=52.0) + self.assertEqual(s, "connected") + + def test_restarting_hold_elapsed_no_devices_disconnected(self): + s, disc_t, _ = self._tr("restarting", ["/dev/video0"], [], + now=55.0, restart_until=52.0) + self.assertEqual(s, "disconnected") + self.assertAlmostEqual(disc_t, 55.0) + + def test_restarting_at_exact_hold_elapsed(self): + s, _, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"], + now=52.0, restart_until=52.0) + # now >= restart_until → resolves + self.assertEqual(s, "connected") + + def test_restarting_preserves_disconnect_time(self): + _, disc_t, _ = self._tr("restarting", ["/dev/video0"], ["/dev/video0"], + now=50.0, disc_t=40.0, restart_until=49.0) + # hold elapsed → connected; disconnect_time unchanged + self.assertAlmostEqual(disc_t, 40.0) + + # ── Return value shapes ─────────────────────────────────────────── + + def test_returns_tuple_of_three(self): + result = self._tr("disconnected", [], []) + self.assertEqual(len(result), 3) + + def test_new_state_is_string(self): + s, _, _ = self._tr("disconnected", [], []) + self.assertIsInstance(s, str) + + def test_disconnect_time_float(self): + _, dt, _ = self._tr("connected", ["/dev/video0"], [], now=42.5) + self.assertIsInstance(dt, float) + + def test_restart_until_float(self): + _, _, rup = self._tr("disconnected", [], ["/dev/video0"], + disc_t=99.0, now=100.0, grace=5.0, hold=3.0) + self.assertIsInstance(rup, float) + + +# ── Tests: node init ────────────────────────────────────────────────────────── + +class TestNodeInit(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_instantiates_no_devices(self): + node = _make_node(self.mod, initial_devices=[]) + self.assertIsNotNone(node) + + def test_instantiates_with_devices(self): + node = _make_node(self.mod, initial_devices=["/dev/video0"]) + self.assertIsNotNone(node) + + def test_initial_state_disconnected(self): + node = _make_node(self.mod, initial_devices=[]) + self.assertEqual(node.state, "disconnected") + + def test_initial_state_connected(self): + node = _make_node(self.mod, initial_devices=["/dev/video0"]) + self.assertEqual(node.state, "connected") + + def test_publishes_initial_state(self): + node = _make_node(self.mod, initial_devices=[]) + pub = node._pubs["/saltybot/camera_status"] + self.assertEqual(len(pub.msgs), 1) + self.assertEqual(pub.msgs[0].data, "disconnected") + + def test_publishes_initial_connected(self): + node = _make_node(self.mod, initial_devices=["/dev/video0"]) + pub = node._pubs["/saltybot/camera_status"] + self.assertEqual(pub.msgs[0].data, "connected") + + def test_publisher_registered(self): + node = _make_node(self.mod) + self.assertIn("/saltybot/camera_status", node._pubs) + + def test_timer_registered(self): + node = _make_node(self.mod) + self.assertGreater(len(node._timers), 0) + + def test_custom_output_topic(self): + node = _make_node(self.mod, output_topic="/my/cam_status") + self.assertIn("/my/cam_status", node._pubs) + + def test_devices_property(self): + node = _make_node(self.mod, initial_devices=["/dev/video0"]) + self.assertEqual(node.devices, frozenset(["/dev/video0"])) + + def test_no_devices_property_empty(self): + node = _make_node(self.mod, initial_devices=[]) + self.assertEqual(node.devices, frozenset()) + + +# ── Tests: _scan_cb state transitions ──────────────────────────────────────── + +class TestScanCb(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def _pub(self, node): + return node._pubs["/saltybot/camera_status"] + + def test_no_change_no_publish(self): + node = _make_node(self.mod, initial_devices=[]) + pub = self._pub(node) + initial_count = len(pub.msgs) + _set_scan(node, []) + node._scan_cb() + self.assertEqual(len(pub.msgs), initial_count) + + def test_device_appears_publishes_connected(self): + node = _make_node(self.mod, initial_devices=[]) + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + self.assertEqual(self._pub(node).msgs[-1].data, "connected") + + def test_device_lost_publishes_disconnected(self): + node = _make_node(self.mod, initial_devices=["/dev/video0"]) + _set_scan(node, []) + node._scan_cb() + self.assertEqual(self._pub(node).msgs[-1].data, "disconnected") + + def test_device_replaced_publishes_restarting(self): + node = _make_node(self.mod, initial_devices=["/dev/video0"]) + _set_scan(node, ["/dev/video2"]) + node._scan_cb() + self.assertEqual(self._pub(node).msgs[-1].data, "restarting") + + def test_reconnect_within_grace_publishes_restarting(self): + """Simulate: connected → disconnected → reconnected within grace.""" + node = _make_node(self.mod, initial_devices=["/dev/video0"], + restart_grace_s=10.0) + # Simulate disconnect + _set_scan(node, []) + node._scan_cb() + self.assertEqual(node.state, "disconnected") + # Quick reconnect — disconnect_time was just set, well within grace + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + self.assertEqual(node.state, "restarting") + self.assertEqual(self._pub(node).msgs[-1].data, "restarting") + + def test_restarting_resolves_to_connected_after_hold(self): + """After hold expires, restarting → connected.""" + node = _make_node(self.mod, initial_devices=["/dev/video0"], + restart_hold_s=0.0) # instant hold + # Device changes → restarting + _set_scan(node, ["/dev/video2"]) + node._scan_cb() + self.assertEqual(node.state, "restarting") + # Next tick: hold = 0.0 → restart_until is in the past → resolves + _set_scan(node, ["/dev/video2"]) + node._scan_cb() + self.assertEqual(node.state, "connected") + + def test_restarting_resolves_to_disconnected_when_no_device(self): + node = _make_node(self.mod, initial_devices=["/dev/video0"], + restart_hold_s=0.0) + _set_scan(node, ["/dev/video2"]) + node._scan_cb() # → restarting + _set_scan(node, []) + node._scan_cb() # → disconnected + self.assertEqual(node.state, "disconnected") + + def test_publish_always_publishes_every_tick(self): + node = _make_node(self.mod, initial_devices=[], publish_always=True) + pub = self._pub(node) + before = len(pub.msgs) + _set_scan(node, []) + node._scan_cb() + node._scan_cb() + node._scan_cb() + self.assertEqual(len(pub.msgs), before + 3) + + def test_no_publish_always_skips_unchanged(self): + node = _make_node(self.mod, initial_devices=[], publish_always=False) + pub = self._pub(node) + before = len(pub.msgs) + _set_scan(node, []) + node._scan_cb() + node._scan_cb() + # State unchanged → no additional publishes + self.assertEqual(len(pub.msgs), before) + + def test_state_transitions_logged(self): + node = _make_node(self.mod, initial_devices=[]) + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + infos = [m for lvl, m in node._logs if lvl == "INFO"] + self.assertTrue(any("connected" in m.lower() for m in infos)) + + def test_multiple_devices_connected(self): + node = _make_node(self.mod, initial_devices=[]) + _set_scan(node, ["/dev/video0", "/dev/video1"]) + node._scan_cb() + self.assertEqual(node.state, "connected") + + def test_partial_loss_restarting(self): + node = _make_node(self.mod, + initial_devices=["/dev/video0", "/dev/video1"]) + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + self.assertEqual(node.state, "restarting") + + def test_state_property_updated(self): + node = _make_node(self.mod, initial_devices=[]) + self.assertEqual(node.state, "disconnected") + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + self.assertEqual(node.state, "connected") + + def test_devices_property_updated(self): + node = _make_node(self.mod, initial_devices=[]) + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + self.assertIn("/dev/video0", node.devices) + + def test_full_lifecycle(self): + """Full cycle: disconnected → connected → restarting → disconnected.""" + node = _make_node(self.mod, initial_devices=[], + restart_grace_s=60.0, restart_hold_s=0.0) + pub = self._pub(node) + + # Boot: disconnected + self.assertEqual(node.state, "disconnected") + + # Camera plugged in + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + self.assertEqual(node.state, "connected") + + # Camera hot-unplugged then immediately replugged + _set_scan(node, []) + node._scan_cb() + self.assertEqual(node.state, "disconnected") + + _set_scan(node, ["/dev/video0"]) + node._scan_cb() + self.assertEqual(node.state, "restarting") + + # Hold expires (hold_s=0) → resolves to connected + node._scan_cb() + self.assertEqual(node.state, "connected") + + # Camera fully removed + _set_scan(node, []) + node._scan_cb() + self.assertEqual(node.state, "disconnected") + + status_sequence = [m.data for m in pub.msgs] + # Must contain all three statuses in the sequence + self.assertIn("connected", status_sequence) + self.assertIn("disconnected", status_sequence) + self.assertIn("restarting", status_sequence) + + +# ── Tests: _publish ─────────────────────────────────────────────────────────── + +class TestPublish(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod, initial_devices=[]) + self.pub = self.node._pubs["/saltybot/camera_status"] + + def test_publish_connected(self): + self.node._publish("connected") + self.assertEqual(self.pub.msgs[-1].data, "connected") + + def test_publish_disconnected(self): + self.node._publish("disconnected") + self.assertEqual(self.pub.msgs[-1].data, "disconnected") + + def test_publish_restarting(self): + self.node._publish("restarting") + self.assertEqual(self.pub.msgs[-1].data, "restarting") + + def test_publish_increments_msg_count(self): + before = len(self.pub.msgs) + self.node._publish("connected") + self.assertEqual(len(self.pub.msgs), before + 1) + + +# ── Tests: source content ───────────────────────────────────────────────────── + +class TestNodeSrc(unittest.TestCase): + @classmethod + def setUpClass(cls): + with open(_SRC) as f: cls.src = f.read() + + def test_issue_tag(self): self.assertIn("#320", self.src) + def test_output_topic(self): self.assertIn("/saltybot/camera_status", self.src) + def test_dev_video_glob(self): self.assertIn("/dev/video*", self.src) + def test_status_connected(self): self.assertIn('"connected"', self.src) + def test_status_disconnected(self): self.assertIn('"disconnected"', self.src) + def test_status_restarting(self): self.assertIn('"restarting"', self.src) + def test_compute_transition_fn(self): self.assertIn("compute_transition", self.src) + def test_scan_video_devices_fn(self): self.assertIn("scan_video_devices", self.src) + def test_restart_grace_param(self): self.assertIn("restart_grace_s", self.src) + def test_restart_hold_param(self): self.assertIn("restart_hold_s", self.src) + def test_poll_rate_param(self): self.assertIn("poll_rate", self.src) + def test_publish_always_param(self): self.assertIn("publish_always", self.src) + def test_threading_lock(self): self.assertIn("threading.Lock", self.src) + def test_glob_used(self): self.assertIn("glob", self.src) + def test_main_defined(self): self.assertIn("def main", self.src) + def test_scan_fn_injectable(self): self.assertIn("_scan_fn", self.src) + def test_string_msg(self): self.assertIn("String", self.src) + + +# ── Tests: config / launch / setup ──────────────────────────────────────────── + +class TestConfig(unittest.TestCase): + _CONFIG = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/config/camera_hotplug_params.yaml" + ) + _LAUNCH = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/launch/camera_hotplug.launch.py" + ) + _SETUP = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/setup.py" + ) + + def test_config_exists(self): + import os; self.assertTrue(os.path.exists(self._CONFIG)) + + def test_config_video_glob(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("video_glob", c) + + def test_config_poll_rate(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("poll_rate", c) + + def test_config_restart_grace(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("restart_grace_s", c) + + def test_config_restart_hold(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("restart_hold_s", c) + + def test_launch_exists(self): + import os; self.assertTrue(os.path.exists(self._LAUNCH)) + + def test_launch_video_glob_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("video_glob", c) + + def test_launch_restart_grace_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("restart_grace_s", c) + + def test_entry_point_in_setup(self): + with open(self._SETUP) as f: c = f.read() + self.assertIn("camera_hotplug_node", c) + + +if __name__ == "__main__": + unittest.main()