From 813d6f2529aa713bff30bbc94a81e48ae56a6807 Mon Sep 17 00:00:00 2001 From: sl-jetson Date: Tue, 3 Mar 2026 06:51:46 -0500 Subject: [PATCH] feat(social): trigger-based ROS2 bag recorder (Issue #332) BagRecorderNode: subscribes /saltybot/record_trigger (Bool), spawns ros2 bag record subprocess, drives idle/recording/stopping/error state machine; auto-stop timeout, SIGINT graceful shutdown with SIGKILL fallback. Publishes /saltybot/recording_status (String). Configurable topics (csv), bag_dir, prefix, compression, size limit. Subprocess injectable for offline testing. 101/101 tests pass. Co-Authored-By: Claude Sonnet 4.6 --- .../config/rosbag_recorder_params.yaml | 20 + .../launch/rosbag_recorder.launch.py | 47 ++ .../saltybot_social/rosbag_recorder_node.py | 373 +++++++++ jetson/ros2_ws/src/saltybot_social/setup.py | 2 + .../test/test_rosbag_recorder.py | 719 ++++++++++++++++++ 5 files changed, 1161 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_social/config/rosbag_recorder_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_social/launch/rosbag_recorder.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_social/saltybot_social/rosbag_recorder_node.py create mode 100644 jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py diff --git a/jetson/ros2_ws/src/saltybot_social/config/rosbag_recorder_params.yaml b/jetson/ros2_ws/src/saltybot_social/config/rosbag_recorder_params.yaml new file mode 100644 index 0000000..c7b38f4 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/config/rosbag_recorder_params.yaml @@ -0,0 +1,20 @@ +rosbag_recorder_node: + ros__parameters: + trigger_topic: "/saltybot/record_trigger" + status_topic: "/saltybot/recording_status" + + # Comma-separated topic list to record. + # Empty string = record all topics (ros2 bag record --all). + # Example: "/saltybot/camera_status,/saltybot/wake_word_detected,/cmd_vel" + topics: "" + + bag_dir: "/tmp/saltybot_bags" # output directory (created if absent) + bag_prefix: "saltybot" # filename prefix; timestamp appended + + auto_stop_s: 60.0 # auto-stop after N seconds; 0 = disabled + stop_timeout_s: 5.0 # force-kill if subprocess won't stop within N s + + compression: false # enable zstd file-level compression + max_bag_size_mb: 0.0 # split bags at this size (MiB); 0 = no limit + + poll_rate: 2.0 # state-machine check frequency (Hz) diff --git a/jetson/ros2_ws/src/saltybot_social/launch/rosbag_recorder.launch.py b/jetson/ros2_ws/src/saltybot_social/launch/rosbag_recorder.launch.py new file mode 100644 index 0000000..906bb3e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/launch/rosbag_recorder.launch.py @@ -0,0 +1,47 @@ +"""rosbag_recorder.launch.py — Launch trigger-based bag recorder (Issue #332). + +Usage: + ros2 launch saltybot_social rosbag_recorder.launch.py + ros2 launch saltybot_social rosbag_recorder.launch.py auto_stop_s:=120.0 + ros2 launch saltybot_social rosbag_recorder.launch.py \\ + topics:="/saltybot/camera_status,/cmd_vel" bag_dir:=/data/bags +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg = get_package_share_directory("saltybot_social") + cfg = os.path.join(pkg, "config", "rosbag_recorder_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("topics", default_value="", + description="Comma-separated topics (empty=all)"), + DeclareLaunchArgument("bag_dir", default_value="/tmp/saltybot_bags", + description="Output directory for bag files"), + DeclareLaunchArgument("auto_stop_s", default_value="60.0", + description="Auto-stop timeout in seconds (0=off)"), + DeclareLaunchArgument("compression", default_value="false", + description="Enable zstd compression"), + + Node( + package="saltybot_social", + executable="rosbag_recorder_node", + name="rosbag_recorder_node", + output="screen", + parameters=[ + cfg, + { + "topics": LaunchConfiguration("topics"), + "bag_dir": LaunchConfiguration("bag_dir"), + "auto_stop_s": LaunchConfiguration("auto_stop_s"), + "compression": LaunchConfiguration("compression"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_social/saltybot_social/rosbag_recorder_node.py b/jetson/ros2_ws/src/saltybot_social/saltybot_social/rosbag_recorder_node.py new file mode 100644 index 0000000..0788bc5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/saltybot_social/rosbag_recorder_node.py @@ -0,0 +1,373 @@ +"""rosbag_recorder_node.py — Trigger-based ROS2 bag recorder. +Issue #332 + +Subscribes to /saltybot/record_trigger (Bool). True starts recording; +False stops it. Auto-stop fires after ``auto_stop_s`` seconds if still +running. Recording is performed by spawning a ``ros2 bag record`` +subprocess which is sent SIGINT for graceful shutdown and SIGKILL if it +does not exit within ``stop_timeout_s``. + +Status values +───────────── + "idle" — not recording + "recording" — subprocess active, writing to bag file + "stopping" — SIGINT sent, waiting for subprocess to exit + "error" — subprocess died unexpectedly; new trigger retries + +Subscriptions +───────────── + /saltybot/record_trigger std_msgs/Bool — True = start, False = stop + +Publications +──────────── + /saltybot/recording_status std_msgs/String — status value (see above) + +Parameters +────────── + trigger_topic (str, "/saltybot/record_trigger") + status_topic (str, "/saltybot/recording_status") + topics (str, "") comma-separated topic list; + empty string → record all topics (-a) + bag_dir (str, "/tmp/saltybot_bags") + bag_prefix (str, "saltybot") + auto_stop_s (float, 60.0) 0 = no auto-stop + stop_timeout_s (float, 5.0) force-kill after this many seconds + compression (bool, False) enable zstd file compression + max_bag_size_mb (float, 0.0) 0 = unlimited + poll_rate (float, 2.0) state-machine check frequency (Hz) +""" + +from __future__ import annotations + +import datetime +import os +import signal +import subprocess +import threading +import time +from typing import List, Optional, Tuple + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile +from std_msgs.msg import Bool, String + + +# ── Status constants ─────────────────────────────────────────────────────────── + +STATUS_IDLE = "idle" +STATUS_RECORDING = "recording" +STATUS_STOPPING = "stopping" +STATUS_ERROR = "error" + + +# ── Pure helpers ─────────────────────────────────────────────────────────────── + +def make_bag_path(bag_dir: str, prefix: str) -> str: + """Return a timestamped output path for a new bag (no file created).""" + ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + return os.path.join(bag_dir, f"{prefix}_{ts}") + + +def parse_topics(topics_str: str) -> List[str]: + """Parse a comma-separated topic string into a clean list. + + Returns an empty list when *topics_str* is blank (meaning record-all). + """ + if not topics_str or not topics_str.strip(): + return [] + return [t.strip() for t in topics_str.split(",") if t.strip()] + + +def compute_recording_transition( + state: str, + trigger: Optional[bool], + proc_running: bool, + now: float, + record_start_t: float, + stop_start_t: float, + auto_stop_s: float, + stop_timeout_s: float, +) -> Tuple[str, bool]: + """Pure state-machine step — no I/O, no ROS. + + Parameters + ---------- + state : current status string + trigger : latest trigger value (True=start, False=stop, None=none) + proc_running : whether the recorder subprocess is alive + now : current monotonic time (s) + record_start_t : monotonic time recording began (0 if not recording) + stop_start_t : monotonic time STOPPING began (0 if not stopping) + auto_stop_s : auto-stop after this many seconds (0 = disabled) + stop_timeout_s : force-kill if stopping > this long (0 = disabled) + + Returns + ------- + (new_state, force_kill) + force_kill=True signals the caller to SIGKILL the process. + """ + if state == STATUS_IDLE: + if trigger is True: + return STATUS_RECORDING, False + return STATUS_IDLE, False + + if state == STATUS_RECORDING: + if not proc_running: + return STATUS_ERROR, False + if trigger is False: + return STATUS_STOPPING, False + if (auto_stop_s > 0 and record_start_t > 0 + and (now - record_start_t) >= auto_stop_s): + return STATUS_STOPPING, False + return STATUS_RECORDING, False + + if state == STATUS_STOPPING: + if not proc_running: + return STATUS_IDLE, False + if (stop_timeout_s > 0 and stop_start_t > 0 + and (now - stop_start_t) >= stop_timeout_s): + return STATUS_IDLE, True # force-kill + return STATUS_STOPPING, False + + # STATUS_ERROR + if trigger is True: + return STATUS_RECORDING, False + return STATUS_ERROR, False + + +# ── Subprocess wrapper ───────────────────────────────────────────────────────── + +class BagRecorderProcess: + """Thin wrapper around a ``ros2 bag record`` subprocess.""" + + def __init__(self) -> None: + self._proc: Optional[subprocess.Popen] = None + + def start(self, topics: List[str], output_path: str, + compression: bool = False, + max_size_mb: float = 0.0) -> bool: + """Launch the recorder. Returns False if already running or on error.""" + if self.is_running(): + return False + + cmd = ["ros2", "bag", "record", "--output", output_path] + if topics: + cmd += topics + else: + cmd += ["--all"] + if compression: + cmd += ["--compression-mode", "file", + "--compression-format", "zstd"] + if max_size_mb > 0: + cmd += ["--max-bag-size", + str(int(max_size_mb * 1024 * 1024))] + + try: + self._proc = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.PIPE, + preexec_fn=os.setsid, + ) + return True + except Exception: + self._proc = None + return False + + def stop(self) -> None: + """Send SIGINT to the process group for graceful shutdown.""" + if self._proc is not None and self._proc.poll() is None: + try: + os.killpg(os.getpgid(self._proc.pid), signal.SIGINT) + except (ProcessLookupError, PermissionError, OSError): + pass + + def kill(self) -> None: + """Send SIGKILL to the process group for forced shutdown.""" + if self._proc is not None: + try: + os.killpg(os.getpgid(self._proc.pid), signal.SIGKILL) + except (ProcessLookupError, PermissionError, OSError): + pass + self._proc = None + + def is_running(self) -> bool: + return self._proc is not None and self._proc.poll() is None + + def reset(self) -> None: + """Clear internal proc reference (call after graceful exit).""" + self._proc = None + + +# ── ROS2 node ────────────────────────────────────────────────────────────────── + +class RosbagRecorderNode(Node): + """Trigger-based ROS bag recorder with auto-stop and status reporting.""" + + def __init__(self) -> None: + super().__init__("rosbag_recorder_node") + + self.declare_parameter("trigger_topic", "/saltybot/record_trigger") + self.declare_parameter("status_topic", "/saltybot/recording_status") + self.declare_parameter("topics", "") + self.declare_parameter("bag_dir", "/tmp/saltybot_bags") + self.declare_parameter("bag_prefix", "saltybot") + self.declare_parameter("auto_stop_s", 60.0) + self.declare_parameter("stop_timeout_s", 5.0) + self.declare_parameter("compression", False) + self.declare_parameter("max_bag_size_mb", 0.0) + self.declare_parameter("poll_rate", 2.0) + + trigger_topic = self.get_parameter("trigger_topic").value + status_topic = self.get_parameter("status_topic").value + topics_str = self.get_parameter("topics").value + self._bag_dir = str(self.get_parameter("bag_dir").value) + self._bag_prefix = str(self.get_parameter("bag_prefix").value) + self._auto_stop_s = float(self.get_parameter("auto_stop_s").value) + self._stop_tmo_s = float(self.get_parameter("stop_timeout_s").value) + self._compression = bool(self.get_parameter("compression").value) + self._max_mb = float(self.get_parameter("max_bag_size_mb").value) + poll_rate = float(self.get_parameter("poll_rate").value) + + self._topics: List[str] = parse_topics(str(topics_str)) + + # Recorder process — injectable for tests + self._recorder: BagRecorderProcess = BagRecorderProcess() + + # State + self._state = STATUS_IDLE + self._trigger: Optional[bool] = None + self._record_start_t: float = 0.0 + self._stop_start_t: float = 0.0 + self._current_bag: str = "" + self._lock = threading.Lock() + + qos = QoSProfile(depth=10) + self._status_pub = self.create_publisher(String, status_topic, qos) + self._trigger_sub = self.create_subscription( + Bool, trigger_topic, self._on_trigger, qos + ) + self._timer = self.create_timer(1.0 / poll_rate, self._poll_cb) + + # Publish initial state + self._publish(STATUS_IDLE) + + topic_desc = ",".join(self._topics) if self._topics else "" + self.get_logger().info( + f"RosbagRecorderNode ready — topics={topic_desc}, " + f"bag_dir={self._bag_dir}, auto_stop={self._auto_stop_s}s" + ) + + # ── Subscription ─────────────────────────────────────────────────── + + def _on_trigger(self, msg) -> None: + with self._lock: + self._trigger = bool(msg.data) + + # ── Poll / state machine ──────────────────────────────────────────── + + def _poll_cb(self) -> None: + now = time.monotonic() + + with self._lock: + trigger = self._trigger + self._trigger = None # consume + state = self._state + rec_t = self._record_start_t + stop_t = self._stop_start_t + + proc_running = self._recorder.is_running() + + new_state, force_kill = compute_recording_transition( + state, trigger, proc_running, now, + rec_t, stop_t, self._auto_stop_s, self._stop_tmo_s, + ) + + if force_kill: + self._recorder.kill() + self.get_logger().warn( + f"RosbagRecorder: force-killed (stop_timeout={self._stop_tmo_s}s)" + ) + + if new_state != state: + self._enter_state(new_state, now) + + def _enter_state(self, new_state: str, now: float) -> None: + if new_state == STATUS_RECORDING: + bag_path = make_bag_path(self._bag_dir, self._bag_prefix) + started = self._recorder.start( + self._topics, bag_path, + compression=self._compression, + max_size_mb=self._max_mb, + ) + if not started: + new_state = STATUS_ERROR + self.get_logger().error( + "RosbagRecorder: failed to start recorder subprocess" + ) + else: + with self._lock: + self._record_start_t = now + self._current_bag = bag_path + self.get_logger().info( + f"RosbagRecorder: recording started → {bag_path}" + ) + + elif new_state == STATUS_STOPPING: + self._recorder.stop() + with self._lock: + self._stop_start_t = now + self.get_logger().info("RosbagRecorder: stopping (SIGINT sent)") + + elif new_state == STATUS_IDLE: + bag = self._current_bag + with self._lock: + self._record_start_t = 0.0 + self._stop_start_t = 0.0 + self._current_bag = "" + self._recorder.reset() + self.get_logger().info( + f"RosbagRecorder: recording complete → {bag}" + ) + + elif new_state == STATUS_ERROR: + self.get_logger().error( + "RosbagRecorder: subprocess exited unexpectedly" + ) + + with self._lock: + self._state = new_state + + self._publish(new_state) + + # ── Publish ───────────────────────────────────────────────────────── + + def _publish(self, status: str) -> None: + msg = String() + msg.data = status + self._status_pub.publish(msg) + + # ── Public accessors ───────────────────────────────────────────────── + + @property + def state(self) -> str: + with self._lock: + return self._state + + @property + def current_bag(self) -> str: + with self._lock: + return self._current_bag + + +def main(args=None) -> None: + rclpy.init(args=args) + node = RosbagRecorderNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() diff --git a/jetson/ros2_ws/src/saltybot_social/setup.py b/jetson/ros2_ws/src/saltybot_social/setup.py index a2fcc0b..16337bf 100644 --- a/jetson/ros2_ws/src/saltybot_social/setup.py +++ b/jetson/ros2_ws/src/saltybot_social/setup.py @@ -61,6 +61,8 @@ setup( 'wake_word_node = saltybot_social.wake_word_node:main', # USB camera hot-plug monitor (Issue #320) 'camera_hotplug_node = saltybot_social.camera_hotplug_node:main', + # Trigger-based ROS2 bag recorder (Issue #332) + 'rosbag_recorder_node = saltybot_social.rosbag_recorder_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py b/jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py new file mode 100644 index 0000000..7a2a86a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py @@ -0,0 +1,719 @@ +"""test_rosbag_recorder.py — Offline tests for rosbag_recorder_node (Issue #332). + +Stubs out rclpy and ROS message types. +BagRecorderProcess is replaced with MockRecorder — no subprocesses are spawned. +""" + +import importlib.util +import sys +import time +import types +import unittest + + +# ── ROS2 stubs ──────────────────────────────────────────────────────────────── + +def _make_ros_stubs(): + for mod_name in ("rclpy", "rclpy.node", "rclpy.qos", + "std_msgs", "std_msgs.msg"): + if mod_name not in sys.modules: + sys.modules[mod_name] = types.ModuleType(mod_name) + + class _Node: + def __init__(self, name="node"): + self._name = name + if not hasattr(self, "_params"): + self._params = {} + self._pubs = {} + self._subs = {} + self._logs = [] + self._timers = [] + + def declare_parameter(self, name, default): + if name not in self._params: + self._params[name] = default + + def get_parameter(self, name): + class _P: + def __init__(self, v): self.value = v + return _P(self._params.get(name)) + + def create_publisher(self, msg_type, topic, qos): + pub = _FakePub() + self._pubs[topic] = pub + return pub + + def create_subscription(self, msg_type, topic, cb, qos): + self._subs[topic] = cb + return object() + + def create_timer(self, period, cb): + self._timers.append(cb) + return object() + + def get_logger(self): + node = self + class _L: + def info(self, m): node._logs.append(("INFO", m)) + def warn(self, m): node._logs.append(("WARN", m)) + def error(self, m): node._logs.append(("ERROR", m)) + return _L() + + def destroy_node(self): pass + + class _FakePub: + def __init__(self): + self.msgs = [] + def publish(self, msg): + self.msgs.append(msg) + + class _QoSProfile: + def __init__(self, depth=10): self.depth = depth + + class _Bool: + def __init__(self): self.data = False + + class _String: + def __init__(self): self.data = "" + + rclpy_mod = sys.modules["rclpy"] + rclpy_mod.init = lambda args=None: None + rclpy_mod.spin = lambda node: None + rclpy_mod.shutdown = lambda: None + + sys.modules["rclpy.node"].Node = _Node + sys.modules["rclpy.qos"].QoSProfile = _QoSProfile + sys.modules["std_msgs.msg"].Bool = _Bool + sys.modules["std_msgs.msg"].String = _String + + return _Node, _FakePub, _Bool, _String + + +_Node, _FakePub, _Bool, _String = _make_ros_stubs() + + +# ── Module loader ───────────────────────────────────────────────────────────── + +_SRC = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/saltybot_social/rosbag_recorder_node.py" +) + + +def _load_mod(): + spec = importlib.util.spec_from_file_location("rosbag_recorder_testmod", _SRC) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +class _MockRecorder: + """Injectable replacement for BagRecorderProcess.""" + + def __init__(self, start_succeeds: bool = True) -> None: + self.start_succeeds = start_succeeds + self._running = False + self.calls: list = [] + + def start(self, topics, output_path, compression=False, max_size_mb=0.0): + self.calls.append(("start", list(topics), output_path)) + if self.start_succeeds: + self._running = True + return True + return False + + def stop(self): + self.calls.append(("stop",)) + self._running = False # immediately "gone" for deterministic tests + + def kill(self): + self.calls.append(("kill",)) + self._running = False + + def is_running(self): + return self._running + + def reset(self): + self.calls.append(("reset",)) + self._running = False + + def call_types(self): + return [c[0] for c in self.calls] + + +def _make_node(mod, recorder=None, **kwargs): + node = mod.RosbagRecorderNode.__new__(mod.RosbagRecorderNode) + defaults = { + "trigger_topic": "/saltybot/record_trigger", + "status_topic": "/saltybot/recording_status", + "topics": "", + "bag_dir": "/tmp/test_bags", + "bag_prefix": "test", + "auto_stop_s": 60.0, + "stop_timeout_s": 5.0, + "compression": False, + "max_bag_size_mb": 0.0, + "poll_rate": 2.0, + } + defaults.update(kwargs) + node._params = dict(defaults) + mod.RosbagRecorderNode.__init__(node) + if recorder is not None: + node._recorder = recorder + return node + + +def _trigger(node, value: bool): + """Deliver a Bool trigger message.""" + msg = _Bool() + msg.data = value + node._subs["/saltybot/record_trigger"](msg) + + +def _pub(node): + return node._pubs["/saltybot/recording_status"] + + +# ── Tests: pure helpers ──────────────────────────────────────────────────────── + +class TestMakeBagPath(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_contains_prefix(self): + p = self.mod.make_bag_path("/tmp/bags", "saltybot") + self.assertIn("saltybot", p) + + def test_contains_bag_dir(self): + p = self.mod.make_bag_path("/tmp/bags", "saltybot") + self.assertTrue(p.startswith("/tmp/bags")) + + def test_unique_per_call(self): + # Two calls in tight succession may share a second but that's fine; + # just ensure the function doesn't crash and returns a string. + p1 = self.mod.make_bag_path("/tmp", "t") + self.assertIsInstance(p1, str) + self.assertTrue(len(p1) > 0) + + +class TestParseTopics(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_empty_string_returns_empty_list(self): + self.assertEqual(self.mod.parse_topics(""), []) + + def test_whitespace_only_returns_empty(self): + self.assertEqual(self.mod.parse_topics(" "), []) + + def test_single_topic(self): + self.assertEqual(self.mod.parse_topics("/topic/foo"), ["/topic/foo"]) + + def test_multiple_topics(self): + r = self.mod.parse_topics("/a,/b,/c") + self.assertEqual(r, ["/a", "/b", "/c"]) + + def test_strips_whitespace(self): + r = self.mod.parse_topics(" /a , /b ") + self.assertEqual(r, ["/a", "/b"]) + + def test_ignores_empty_segments(self): + r = self.mod.parse_topics("/a,,/b") + self.assertEqual(r, ["/a", "/b"]) + + +class TestStatusConstants(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_idle(self): self.assertEqual(self.mod.STATUS_IDLE, "idle") + def test_recording(self): self.assertEqual(self.mod.STATUS_RECORDING, "recording") + def test_stopping(self): self.assertEqual(self.mod.STATUS_STOPPING, "stopping") + def test_error(self): self.assertEqual(self.mod.STATUS_ERROR, "error") + + +# ── Tests: compute_recording_transition ────────────────────────────────────── + +class TestComputeRecordingTransition(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def _tr(self, state, trigger=None, proc_running=True, + now=100.0, rec_t=0.0, stop_t=0.0, + auto_stop=60.0, stop_tmo=5.0): + return self.mod.compute_recording_transition( + state, trigger, proc_running, now, + rec_t, stop_t, auto_stop, stop_tmo, + ) + + # ── IDLE ────────────────────────────────────────────────────────── + + def test_idle_no_trigger_stays_idle(self): + s, fk = self._tr("idle", trigger=None) + self.assertEqual(s, "idle"); self.assertFalse(fk) + + def test_idle_false_trigger_stays_idle(self): + s, fk = self._tr("idle", trigger=False) + self.assertEqual(s, "idle"); self.assertFalse(fk) + + def test_idle_true_trigger_starts_recording(self): + s, fk = self._tr("idle", trigger=True) + self.assertEqual(s, "recording"); self.assertFalse(fk) + + # ── RECORDING ───────────────────────────────────────────────────── + + def test_recording_stable_no_change(self): + s, fk = self._tr("recording", proc_running=True) + self.assertEqual(s, "recording"); self.assertFalse(fk) + + def test_recording_false_trigger_stops(self): + s, fk = self._tr("recording", trigger=False, proc_running=True) + self.assertEqual(s, "stopping"); self.assertFalse(fk) + + def test_recording_proc_dies_error(self): + s, fk = self._tr("recording", proc_running=False) + self.assertEqual(s, "error"); self.assertFalse(fk) + + def test_recording_auto_stop_fires(self): + # started at t=40, now=t=101 → 61 s elapsed > auto_stop=60 + s, fk = self._tr("recording", proc_running=True, + now=101.0, rec_t=40.0, auto_stop=60.0) + self.assertEqual(s, "stopping"); self.assertFalse(fk) + + def test_recording_auto_stop_not_yet(self): + # started at t=50, now=100 → 50 s < 60 s + s, fk = self._tr("recording", proc_running=True, + now=100.0, rec_t=50.0, auto_stop=60.0) + self.assertEqual(s, "recording") + + def test_recording_auto_stop_at_exactly_timeout(self): + s, fk = self._tr("recording", proc_running=True, + now=110.0, rec_t=50.0, auto_stop=60.0) + self.assertEqual(s, "stopping") + + def test_recording_auto_stop_disabled(self): + # auto_stop_s=0 → never auto-stops + s, fk = self._tr("recording", proc_running=True, + now=9999.0, rec_t=0.0, auto_stop=0.0) + self.assertEqual(s, "recording") + + # ── STOPPING ────────────────────────────────────────────────────── + + def test_stopping_proc_running_stays(self): + s, fk = self._tr("stopping", proc_running=True) + self.assertEqual(s, "stopping"); self.assertFalse(fk) + + def test_stopping_proc_exits_idle(self): + s, fk = self._tr("stopping", proc_running=False) + self.assertEqual(s, "idle"); self.assertFalse(fk) + + def test_stopping_force_kill_after_timeout(self): + # entered stopping at t=95, now=101 → 6 s > stop_tmo=5 + s, fk = self._tr("stopping", proc_running=True, + now=101.0, stop_t=95.0, stop_tmo=5.0) + self.assertEqual(s, "idle"); self.assertTrue(fk) + + def test_stopping_not_yet_force_kill(self): + # entered at t=98, now=100 → 2 s < 5 s + s, fk = self._tr("stopping", proc_running=True, + now=100.0, stop_t=98.0, stop_tmo=5.0) + self.assertEqual(s, "stopping"); self.assertFalse(fk) + + def test_stopping_timeout_disabled(self): + # stop_tmo=0 → never force-kills + s, fk = self._tr("stopping", proc_running=True, + now=9999.0, stop_t=0.0, stop_tmo=0.0) + self.assertEqual(s, "stopping"); self.assertFalse(fk) + + def test_stopping_force_kill_exactly_at_timeout(self): + s, fk = self._tr("stopping", proc_running=True, + now=100.0, stop_t=95.0, stop_tmo=5.0) + self.assertEqual(s, "idle"); self.assertTrue(fk) + + # ── ERROR ───────────────────────────────────────────────────────── + + def test_error_stays_without_trigger(self): + s, fk = self._tr("error", trigger=None) + self.assertEqual(s, "error"); self.assertFalse(fk) + + def test_error_false_trigger_stays(self): + s, fk = self._tr("error", trigger=False) + self.assertEqual(s, "error") + + def test_error_true_trigger_retries(self): + s, fk = self._tr("error", trigger=True) + self.assertEqual(s, "recording"); self.assertFalse(fk) + + # ── Return shape ────────────────────────────────────────────────── + + def test_returns_tuple_of_two(self): + result = self._tr("idle") + self.assertEqual(len(result), 2) + + def test_force_kill_is_bool(self): + _, fk = self._tr("idle") + self.assertIsInstance(fk, bool) + + +# ── Tests: node init ────────────────────────────────────────────────────────── + +class TestNodeInit(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_instantiates(self): + self.assertIsNotNone(_make_node(self.mod)) + + def test_initial_state_idle(self): + node = _make_node(self.mod) + self.assertEqual(node.state, "idle") + + def test_publishes_initial_idle(self): + node = _make_node(self.mod) + pub = _pub(node) + self.assertEqual(pub.msgs[0].data, "idle") + + def test_publisher_registered(self): + node = _make_node(self.mod) + self.assertIn("/saltybot/recording_status", node._pubs) + + def test_subscriber_registered(self): + node = _make_node(self.mod) + self.assertIn("/saltybot/record_trigger", node._subs) + + def test_timer_registered(self): + node = _make_node(self.mod) + self.assertGreater(len(node._timers), 0) + + def test_custom_topics(self): + node = _make_node(self.mod, + trigger_topic="/my/trigger", + status_topic="/my/status") + self.assertIn("/my/trigger", node._subs) + self.assertIn("/my/status", node._pubs) + + def test_topics_parsed_correctly(self): + node = _make_node(self.mod, topics="/a,/b,/c") + self.assertEqual(node._topics, ["/a", "/b", "/c"]) + + def test_empty_topics_means_all(self): + node = _make_node(self.mod, topics="") + self.assertEqual(node._topics, []) + + def test_auto_stop_s_stored(self): + node = _make_node(self.mod, auto_stop_s=30.0) + self.assertAlmostEqual(node._auto_stop_s, 30.0) + + def test_current_bag_empty_initially(self): + node = _make_node(self.mod) + self.assertEqual(node.current_bag, "") + + +# ── Tests: _on_trigger ──────────────────────────────────────────────────────── + +class TestOnTrigger(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_stores_true_trigger(self): + node = _make_node(self.mod) + _trigger(node, True) + with node._lock: + self.assertTrue(node._trigger) + + def test_stores_false_trigger(self): + node = _make_node(self.mod) + _trigger(node, False) + with node._lock: + self.assertFalse(node._trigger) + + def test_trigger_consumed_after_poll(self): + rec = _MockRecorder() + node = _make_node(self.mod, recorder=rec) + _trigger(node, True) + node._poll_cb() + with node._lock: + self.assertIsNone(node._trigger) + + +# ── Tests: poll loop — full state machine ──────────────────────────────────── + +class TestPollCb(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def _node(self, **kwargs): + rec = _MockRecorder() + node = _make_node(self.mod, recorder=rec, **kwargs) + return node, rec + + def test_true_trigger_starts_recording(self): + node, rec = self._node() + _trigger(node, True) + node._poll_cb() + self.assertEqual(node.state, "recording") + self.assertIn("start", rec.call_types()) + + def test_recording_publishes_status(self): + node, rec = self._node() + _trigger(node, True) + node._poll_cb() + self.assertEqual(_pub(node).msgs[-1].data, "recording") + + def test_false_trigger_stops_recording(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() # → recording + _trigger(node, False); node._poll_cb() # → stopping + self.assertEqual(node.state, "stopping") + self.assertIn("stop", rec.call_types()) + + def test_stopping_publishes_status(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() + _trigger(node, False); node._poll_cb() + self.assertEqual(_pub(node).msgs[-1].data, "stopping") + + def test_after_stop_proc_exit_idle(self): + """Once the mock recorder stops, next poll resolves to idle.""" + node, rec = self._node() + _trigger(node, True); node._poll_cb() # recording + _trigger(node, False); node._poll_cb() # stopping (rec.stop() sets running=False) + node._poll_cb() # proc not running → idle + self.assertEqual(node.state, "idle") + + def test_idle_publishes_after_stop(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() + _trigger(node, False); node._poll_cb() + node._poll_cb() + self.assertEqual(_pub(node).msgs[-1].data, "idle") + + def test_auto_stop_triggers_stopping(self): + node, rec = self._node(auto_stop_s=1.0) + _trigger(node, True) + node._poll_cb() # → recording + # Back-date start time so auto-stop fires + with node._lock: + node._record_start_t = time.monotonic() - 10.0 + node._poll_cb() # → stopping + self.assertEqual(node.state, "stopping") + + def test_auto_stop_disabled(self): + node, rec = self._node(auto_stop_s=0.0) + _trigger(node, True) + node._poll_cb() + with node._lock: + node._record_start_t = time.monotonic() - 9999.0 + node._poll_cb() + # Should still be recording (auto-stop disabled) + self.assertEqual(node.state, "recording") + + def test_proc_dies_error(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() # → recording + rec._running = False # simulate unexpected exit + node._poll_cb() + self.assertEqual(node.state, "error") + + def test_error_publishes_status(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() + rec._running = False + node._poll_cb() + self.assertEqual(_pub(node).msgs[-1].data, "error") + + def test_error_retries_on_true_trigger(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() # recording + rec._running = False + node._poll_cb() # error + _trigger(node, True); node._poll_cb() # retry → recording + self.assertEqual(node.state, "recording") + + def test_start_failure_enters_error(self): + rec = _MockRecorder(start_succeeds=False) + node = _make_node(self.mod, recorder=rec) + _trigger(node, True) + node._poll_cb() + self.assertEqual(node.state, "error") + + def test_force_kill_on_stop_timeout(self): + """Stubborn process that ignores SIGINT → force-killed after timeout.""" + class _StubbornRecorder(_MockRecorder): + def stop(self): + self.calls.append(("stop",)) + # Don't set _running = False — simulates process ignoring SIGINT + + stubborn = _StubbornRecorder() + node = _make_node(self.mod, recorder=stubborn, stop_timeout_s=2.0) + _trigger(node, True); node._poll_cb() # → recording + _trigger(node, False); node._poll_cb() # → stopping (process stays alive) + self.assertEqual(node.state, "stopping") + # Expire the stop timeout + with node._lock: + node._stop_start_t = time.monotonic() - 10.0 + node._poll_cb() # → force kill → idle + self.assertIn("kill", stubborn.call_types()) + self.assertEqual(node.state, "idle") + + def test_bag_path_set_when_recording(self): + node, rec = self._node(bag_prefix="mytest") + _trigger(node, True) + node._poll_cb() + self.assertIn("mytest", node.current_bag) + + def test_bag_path_cleared_after_idle(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() + _trigger(node, False); node._poll_cb() + node._poll_cb() + self.assertEqual(node.current_bag, "") + + def test_topics_passed_to_recorder(self): + rec = _MockRecorder() + node = _make_node(self.mod, recorder=rec, topics="/a,/b") + _trigger(node, True) + node._poll_cb() + start_calls = [c for c in rec.calls if c[0] == "start"] + self.assertEqual(len(start_calls), 1) + self.assertEqual(start_calls[0][1], ["/a", "/b"]) + + def test_empty_topics_passes_empty_list(self): + rec = _MockRecorder() + node = _make_node(self.mod, recorder=rec, topics="") + _trigger(node, True) + node._poll_cb() + start_calls = [c for c in rec.calls if c[0] == "start"] + self.assertEqual(start_calls[0][1], []) + + def test_recorder_reset_on_idle(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() + _trigger(node, False); node._poll_cb() + node._poll_cb() # idle + self.assertIn("reset", rec.call_types()) + + def test_full_lifecycle_status_sequence(self): + """idle → recording → stopping → idle.""" + node, rec = self._node() + pub = _pub(node) + + _trigger(node, True); node._poll_cb() + _trigger(node, False); node._poll_cb() + node._poll_cb() + + statuses = [m.data for m in pub.msgs] + self.assertIn("idle", statuses) + self.assertIn("recording", statuses) + self.assertIn("stopping", statuses) + + def test_logging_on_start(self): + node, rec = self._node() + _trigger(node, True) + node._poll_cb() + infos = [m for lvl, m in node._logs if lvl == "INFO"] + self.assertTrue(any("recording" in m.lower() for m in infos)) + + def test_logging_on_stop(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() + _trigger(node, False); node._poll_cb() + infos = [m for lvl, m in node._logs if lvl == "INFO"] + self.assertTrue(any("stop" in m.lower() for m in infos)) + + def test_logging_on_error(self): + node, rec = self._node() + _trigger(node, True); node._poll_cb() + rec._running = False + node._poll_cb() + errors = [m for lvl, m in node._logs if lvl == "ERROR"] + self.assertTrue(len(errors) > 0) + + +# ── Tests: source content ───────────────────────────────────────────────────── + +class TestNodeSrc(unittest.TestCase): + @classmethod + def setUpClass(cls): + with open(_SRC) as f: cls.src = f.read() + + def test_issue_tag(self): self.assertIn("#332", self.src) + def test_trigger_topic(self): self.assertIn("/saltybot/record_trigger", self.src) + def test_status_topic(self): self.assertIn("/saltybot/recording_status", self.src) + def test_status_idle(self): self.assertIn('"idle"', self.src) + def test_status_recording(self): self.assertIn('"recording"', self.src) + def test_status_stopping(self): self.assertIn('"stopping"', self.src) + def test_status_error(self): self.assertIn('"error"', self.src) + def test_compute_transition_fn(self): self.assertIn("compute_recording_transition", self.src) + def test_bag_recorder_process(self): self.assertIn("BagRecorderProcess", self.src) + def test_make_bag_path(self): self.assertIn("make_bag_path", self.src) + def test_parse_topics(self): self.assertIn("parse_topics", self.src) + def test_auto_stop_param(self): self.assertIn("auto_stop_s", self.src) + def test_stop_timeout_param(self): self.assertIn("stop_timeout_s", self.src) + def test_compression_param(self): self.assertIn("compression", self.src) + def test_subprocess_used(self): self.assertIn("subprocess", self.src) + def test_sigint_used(self): self.assertIn("SIGINT", self.src) + def test_threading_lock(self): self.assertIn("threading.Lock", self.src) + def test_recorder_injectable(self): self.assertIn("_recorder", self.src) + def test_main_defined(self): self.assertIn("def main", self.src) + def test_bool_subscription(self): self.assertIn("Bool", self.src) + def test_string_publication(self): self.assertIn("String", self.src) + + +# ── Tests: config / launch / setup ──────────────────────────────────────────── + +class TestConfig(unittest.TestCase): + _CONFIG = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/config/rosbag_recorder_params.yaml" + ) + _LAUNCH = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/launch/rosbag_recorder.launch.py" + ) + _SETUP = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/setup.py" + ) + + def test_config_exists(self): + import os; self.assertTrue(os.path.exists(self._CONFIG)) + + def test_config_auto_stop(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("auto_stop_s", c) + + def test_config_bag_dir(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("bag_dir", c) + + def test_config_topics(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("topics", c) + + def test_config_compression(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("compression", c) + + def test_config_stop_timeout(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("stop_timeout_s", c) + + def test_launch_exists(self): + import os; self.assertTrue(os.path.exists(self._LAUNCH)) + + def test_launch_auto_stop_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("auto_stop_s", c) + + def test_launch_topics_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("topics", c) + + def test_entry_point_in_setup(self): + with open(self._SETUP) as f: c = f.read() + self.assertIn("rosbag_recorder_node", c) + + +if __name__ == "__main__": + unittest.main() -- 2.47.2