feat(social): trigger-based ROS2 bag recorder (Issue #332) #335

Merged
sl-jetson merged 1 commits from sl-jetson/issue-332-rosbag-recorder into main 2026-03-03 13:25:56 -05:00
5 changed files with 1161 additions and 0 deletions

View File

@ -0,0 +1,20 @@
rosbag_recorder_node:
ros__parameters:
trigger_topic: "/saltybot/record_trigger"
status_topic: "/saltybot/recording_status"
# Comma-separated topic list to record.
# Empty string = record all topics (ros2 bag record --all).
# Example: "/saltybot/camera_status,/saltybot/wake_word_detected,/cmd_vel"
topics: ""
bag_dir: "/tmp/saltybot_bags" # output directory (created if absent)
bag_prefix: "saltybot" # filename prefix; timestamp appended
auto_stop_s: 60.0 # auto-stop after N seconds; 0 = disabled
stop_timeout_s: 5.0 # force-kill if subprocess won't stop within N s
compression: false # enable zstd file-level compression
max_bag_size_mb: 0.0 # split bags at this size (MiB); 0 = no limit
poll_rate: 2.0 # state-machine check frequency (Hz)

View File

@ -0,0 +1,47 @@
"""rosbag_recorder.launch.py — Launch trigger-based bag recorder (Issue #332).
Usage:
ros2 launch saltybot_social rosbag_recorder.launch.py
ros2 launch saltybot_social rosbag_recorder.launch.py auto_stop_s:=120.0
ros2 launch saltybot_social rosbag_recorder.launch.py \\
topics:="/saltybot/camera_status,/cmd_vel" bag_dir:=/data/bags
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "rosbag_recorder_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("topics", default_value="",
description="Comma-separated topics (empty=all)"),
DeclareLaunchArgument("bag_dir", default_value="/tmp/saltybot_bags",
description="Output directory for bag files"),
DeclareLaunchArgument("auto_stop_s", default_value="60.0",
description="Auto-stop timeout in seconds (0=off)"),
DeclareLaunchArgument("compression", default_value="false",
description="Enable zstd compression"),
Node(
package="saltybot_social",
executable="rosbag_recorder_node",
name="rosbag_recorder_node",
output="screen",
parameters=[
cfg,
{
"topics": LaunchConfiguration("topics"),
"bag_dir": LaunchConfiguration("bag_dir"),
"auto_stop_s": LaunchConfiguration("auto_stop_s"),
"compression": LaunchConfiguration("compression"),
},
],
),
])

View File

@ -0,0 +1,373 @@
"""rosbag_recorder_node.py — Trigger-based ROS2 bag recorder.
Issue #332
Subscribes to /saltybot/record_trigger (Bool). True starts recording;
False stops it. Auto-stop fires after ``auto_stop_s`` seconds if still
running. Recording is performed by spawning a ``ros2 bag record``
subprocess which is sent SIGINT for graceful shutdown and SIGKILL if it
does not exit within ``stop_timeout_s``.
Status values
"idle" not recording
"recording" subprocess active, writing to bag file
"stopping" SIGINT sent, waiting for subprocess to exit
"error" subprocess died unexpectedly; new trigger retries
Subscriptions
/saltybot/record_trigger std_msgs/Bool True = start, False = stop
Publications
/saltybot/recording_status std_msgs/String status value (see above)
Parameters
trigger_topic (str, "/saltybot/record_trigger")
status_topic (str, "/saltybot/recording_status")
topics (str, "") comma-separated topic list;
empty string record all topics (-a)
bag_dir (str, "/tmp/saltybot_bags")
bag_prefix (str, "saltybot")
auto_stop_s (float, 60.0) 0 = no auto-stop
stop_timeout_s (float, 5.0) force-kill after this many seconds
compression (bool, False) enable zstd file compression
max_bag_size_mb (float, 0.0) 0 = unlimited
poll_rate (float, 2.0) state-machine check frequency (Hz)
"""
from __future__ import annotations
import datetime
import os
import signal
import subprocess
import threading
import time
from typing import List, Optional, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Bool, String
# ── Status constants ───────────────────────────────────────────────────────────
STATUS_IDLE = "idle"
STATUS_RECORDING = "recording"
STATUS_STOPPING = "stopping"
STATUS_ERROR = "error"
# ── Pure helpers ───────────────────────────────────────────────────────────────
def make_bag_path(bag_dir: str, prefix: str) -> str:
"""Return a timestamped output path for a new bag (no file created)."""
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
return os.path.join(bag_dir, f"{prefix}_{ts}")
def parse_topics(topics_str: str) -> List[str]:
"""Parse a comma-separated topic string into a clean list.
Returns an empty list when *topics_str* is blank (meaning record-all).
"""
if not topics_str or not topics_str.strip():
return []
return [t.strip() for t in topics_str.split(",") if t.strip()]
def compute_recording_transition(
state: str,
trigger: Optional[bool],
proc_running: bool,
now: float,
record_start_t: float,
stop_start_t: float,
auto_stop_s: float,
stop_timeout_s: float,
) -> Tuple[str, bool]:
"""Pure state-machine step — no I/O, no ROS.
Parameters
----------
state : current status string
trigger : latest trigger value (True=start, False=stop, None=none)
proc_running : whether the recorder subprocess is alive
now : current monotonic time (s)
record_start_t : monotonic time recording began (0 if not recording)
stop_start_t : monotonic time STOPPING began (0 if not stopping)
auto_stop_s : auto-stop after this many seconds (0 = disabled)
stop_timeout_s : force-kill if stopping > this long (0 = disabled)
Returns
-------
(new_state, force_kill)
force_kill=True signals the caller to SIGKILL the process.
"""
if state == STATUS_IDLE:
if trigger is True:
return STATUS_RECORDING, False
return STATUS_IDLE, False
if state == STATUS_RECORDING:
if not proc_running:
return STATUS_ERROR, False
if trigger is False:
return STATUS_STOPPING, False
if (auto_stop_s > 0 and record_start_t > 0
and (now - record_start_t) >= auto_stop_s):
return STATUS_STOPPING, False
return STATUS_RECORDING, False
if state == STATUS_STOPPING:
if not proc_running:
return STATUS_IDLE, False
if (stop_timeout_s > 0 and stop_start_t > 0
and (now - stop_start_t) >= stop_timeout_s):
return STATUS_IDLE, True # force-kill
return STATUS_STOPPING, False
# STATUS_ERROR
if trigger is True:
return STATUS_RECORDING, False
return STATUS_ERROR, False
# ── Subprocess wrapper ─────────────────────────────────────────────────────────
class BagRecorderProcess:
"""Thin wrapper around a ``ros2 bag record`` subprocess."""
def __init__(self) -> None:
self._proc: Optional[subprocess.Popen] = None
def start(self, topics: List[str], output_path: str,
compression: bool = False,
max_size_mb: float = 0.0) -> bool:
"""Launch the recorder. Returns False if already running or on error."""
if self.is_running():
return False
cmd = ["ros2", "bag", "record", "--output", output_path]
if topics:
cmd += topics
else:
cmd += ["--all"]
if compression:
cmd += ["--compression-mode", "file",
"--compression-format", "zstd"]
if max_size_mb > 0:
cmd += ["--max-bag-size",
str(int(max_size_mb * 1024 * 1024))]
try:
self._proc = subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
)
return True
except Exception:
self._proc = None
return False
def stop(self) -> None:
"""Send SIGINT to the process group for graceful shutdown."""
if self._proc is not None and self._proc.poll() is None:
try:
os.killpg(os.getpgid(self._proc.pid), signal.SIGINT)
except (ProcessLookupError, PermissionError, OSError):
pass
def kill(self) -> None:
"""Send SIGKILL to the process group for forced shutdown."""
if self._proc is not None:
try:
os.killpg(os.getpgid(self._proc.pid), signal.SIGKILL)
except (ProcessLookupError, PermissionError, OSError):
pass
self._proc = None
def is_running(self) -> bool:
return self._proc is not None and self._proc.poll() is None
def reset(self) -> None:
"""Clear internal proc reference (call after graceful exit)."""
self._proc = None
# ── ROS2 node ──────────────────────────────────────────────────────────────────
class RosbagRecorderNode(Node):
"""Trigger-based ROS bag recorder with auto-stop and status reporting."""
def __init__(self) -> None:
super().__init__("rosbag_recorder_node")
self.declare_parameter("trigger_topic", "/saltybot/record_trigger")
self.declare_parameter("status_topic", "/saltybot/recording_status")
self.declare_parameter("topics", "")
self.declare_parameter("bag_dir", "/tmp/saltybot_bags")
self.declare_parameter("bag_prefix", "saltybot")
self.declare_parameter("auto_stop_s", 60.0)
self.declare_parameter("stop_timeout_s", 5.0)
self.declare_parameter("compression", False)
self.declare_parameter("max_bag_size_mb", 0.0)
self.declare_parameter("poll_rate", 2.0)
trigger_topic = self.get_parameter("trigger_topic").value
status_topic = self.get_parameter("status_topic").value
topics_str = self.get_parameter("topics").value
self._bag_dir = str(self.get_parameter("bag_dir").value)
self._bag_prefix = str(self.get_parameter("bag_prefix").value)
self._auto_stop_s = float(self.get_parameter("auto_stop_s").value)
self._stop_tmo_s = float(self.get_parameter("stop_timeout_s").value)
self._compression = bool(self.get_parameter("compression").value)
self._max_mb = float(self.get_parameter("max_bag_size_mb").value)
poll_rate = float(self.get_parameter("poll_rate").value)
self._topics: List[str] = parse_topics(str(topics_str))
# Recorder process — injectable for tests
self._recorder: BagRecorderProcess = BagRecorderProcess()
# State
self._state = STATUS_IDLE
self._trigger: Optional[bool] = None
self._record_start_t: float = 0.0
self._stop_start_t: float = 0.0
self._current_bag: str = ""
self._lock = threading.Lock()
qos = QoSProfile(depth=10)
self._status_pub = self.create_publisher(String, status_topic, qos)
self._trigger_sub = self.create_subscription(
Bool, trigger_topic, self._on_trigger, qos
)
self._timer = self.create_timer(1.0 / poll_rate, self._poll_cb)
# Publish initial state
self._publish(STATUS_IDLE)
topic_desc = ",".join(self._topics) if self._topics else "<all>"
self.get_logger().info(
f"RosbagRecorderNode ready — topics={topic_desc}, "
f"bag_dir={self._bag_dir}, auto_stop={self._auto_stop_s}s"
)
# ── Subscription ───────────────────────────────────────────────────
def _on_trigger(self, msg) -> None:
with self._lock:
self._trigger = bool(msg.data)
# ── Poll / state machine ────────────────────────────────────────────
def _poll_cb(self) -> None:
now = time.monotonic()
with self._lock:
trigger = self._trigger
self._trigger = None # consume
state = self._state
rec_t = self._record_start_t
stop_t = self._stop_start_t
proc_running = self._recorder.is_running()
new_state, force_kill = compute_recording_transition(
state, trigger, proc_running, now,
rec_t, stop_t, self._auto_stop_s, self._stop_tmo_s,
)
if force_kill:
self._recorder.kill()
self.get_logger().warn(
f"RosbagRecorder: force-killed (stop_timeout={self._stop_tmo_s}s)"
)
if new_state != state:
self._enter_state(new_state, now)
def _enter_state(self, new_state: str, now: float) -> None:
if new_state == STATUS_RECORDING:
bag_path = make_bag_path(self._bag_dir, self._bag_prefix)
started = self._recorder.start(
self._topics, bag_path,
compression=self._compression,
max_size_mb=self._max_mb,
)
if not started:
new_state = STATUS_ERROR
self.get_logger().error(
"RosbagRecorder: failed to start recorder subprocess"
)
else:
with self._lock:
self._record_start_t = now
self._current_bag = bag_path
self.get_logger().info(
f"RosbagRecorder: recording started → {bag_path}"
)
elif new_state == STATUS_STOPPING:
self._recorder.stop()
with self._lock:
self._stop_start_t = now
self.get_logger().info("RosbagRecorder: stopping (SIGINT sent)")
elif new_state == STATUS_IDLE:
bag = self._current_bag
with self._lock:
self._record_start_t = 0.0
self._stop_start_t = 0.0
self._current_bag = ""
self._recorder.reset()
self.get_logger().info(
f"RosbagRecorder: recording complete → {bag}"
)
elif new_state == STATUS_ERROR:
self.get_logger().error(
"RosbagRecorder: subprocess exited unexpectedly"
)
with self._lock:
self._state = new_state
self._publish(new_state)
# ── Publish ─────────────────────────────────────────────────────────
def _publish(self, status: str) -> None:
msg = String()
msg.data = status
self._status_pub.publish(msg)
# ── Public accessors ─────────────────────────────────────────────────
@property
def state(self) -> str:
with self._lock:
return self._state
@property
def current_bag(self) -> str:
with self._lock:
return self._current_bag
def main(args=None) -> None:
rclpy.init(args=args)
node = RosbagRecorderNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -61,6 +61,8 @@ setup(
'wake_word_node = saltybot_social.wake_word_node:main',
# USB camera hot-plug monitor (Issue #320)
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
# Trigger-based ROS2 bag recorder (Issue #332)
'rosbag_recorder_node = saltybot_social.rosbag_recorder_node:main',
],
},
)

View File

@ -0,0 +1,719 @@
"""test_rosbag_recorder.py — Offline tests for rosbag_recorder_node (Issue #332).
Stubs out rclpy and ROS message types.
BagRecorderProcess is replaced with MockRecorder no subprocesses are spawned.
"""
import importlib.util
import sys
import time
import types
import unittest
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
def _make_ros_stubs():
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
"std_msgs", "std_msgs.msg"):
if mod_name not in sys.modules:
sys.modules[mod_name] = types.ModuleType(mod_name)
class _Node:
def __init__(self, name="node"):
self._name = name
if not hasattr(self, "_params"):
self._params = {}
self._pubs = {}
self._subs = {}
self._logs = []
self._timers = []
def declare_parameter(self, name, default):
if name not in self._params:
self._params[name] = default
def get_parameter(self, name):
class _P:
def __init__(self, v): self.value = v
return _P(self._params.get(name))
def create_publisher(self, msg_type, topic, qos):
pub = _FakePub()
self._pubs[topic] = pub
return pub
def create_subscription(self, msg_type, topic, cb, qos):
self._subs[topic] = cb
return object()
def create_timer(self, period, cb):
self._timers.append(cb)
return object()
def get_logger(self):
node = self
class _L:
def info(self, m): node._logs.append(("INFO", m))
def warn(self, m): node._logs.append(("WARN", m))
def error(self, m): node._logs.append(("ERROR", m))
return _L()
def destroy_node(self): pass
class _FakePub:
def __init__(self):
self.msgs = []
def publish(self, msg):
self.msgs.append(msg)
class _QoSProfile:
def __init__(self, depth=10): self.depth = depth
class _Bool:
def __init__(self): self.data = False
class _String:
def __init__(self): self.data = ""
rclpy_mod = sys.modules["rclpy"]
rclpy_mod.init = lambda args=None: None
rclpy_mod.spin = lambda node: None
rclpy_mod.shutdown = lambda: None
sys.modules["rclpy.node"].Node = _Node
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
sys.modules["std_msgs.msg"].Bool = _Bool
sys.modules["std_msgs.msg"].String = _String
return _Node, _FakePub, _Bool, _String
_Node, _FakePub, _Bool, _String = _make_ros_stubs()
# ── Module loader ─────────────────────────────────────────────────────────────
_SRC = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/saltybot_social/rosbag_recorder_node.py"
)
def _load_mod():
spec = importlib.util.spec_from_file_location("rosbag_recorder_testmod", _SRC)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
class _MockRecorder:
"""Injectable replacement for BagRecorderProcess."""
def __init__(self, start_succeeds: bool = True) -> None:
self.start_succeeds = start_succeeds
self._running = False
self.calls: list = []
def start(self, topics, output_path, compression=False, max_size_mb=0.0):
self.calls.append(("start", list(topics), output_path))
if self.start_succeeds:
self._running = True
return True
return False
def stop(self):
self.calls.append(("stop",))
self._running = False # immediately "gone" for deterministic tests
def kill(self):
self.calls.append(("kill",))
self._running = False
def is_running(self):
return self._running
def reset(self):
self.calls.append(("reset",))
self._running = False
def call_types(self):
return [c[0] for c in self.calls]
def _make_node(mod, recorder=None, **kwargs):
node = mod.RosbagRecorderNode.__new__(mod.RosbagRecorderNode)
defaults = {
"trigger_topic": "/saltybot/record_trigger",
"status_topic": "/saltybot/recording_status",
"topics": "",
"bag_dir": "/tmp/test_bags",
"bag_prefix": "test",
"auto_stop_s": 60.0,
"stop_timeout_s": 5.0,
"compression": False,
"max_bag_size_mb": 0.0,
"poll_rate": 2.0,
}
defaults.update(kwargs)
node._params = dict(defaults)
mod.RosbagRecorderNode.__init__(node)
if recorder is not None:
node._recorder = recorder
return node
def _trigger(node, value: bool):
"""Deliver a Bool trigger message."""
msg = _Bool()
msg.data = value
node._subs["/saltybot/record_trigger"](msg)
def _pub(node):
return node._pubs["/saltybot/recording_status"]
# ── Tests: pure helpers ────────────────────────────────────────────────────────
class TestMakeBagPath(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_contains_prefix(self):
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
self.assertIn("saltybot", p)
def test_contains_bag_dir(self):
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
self.assertTrue(p.startswith("/tmp/bags"))
def test_unique_per_call(self):
# Two calls in tight succession may share a second but that's fine;
# just ensure the function doesn't crash and returns a string.
p1 = self.mod.make_bag_path("/tmp", "t")
self.assertIsInstance(p1, str)
self.assertTrue(len(p1) > 0)
class TestParseTopics(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_empty_string_returns_empty_list(self):
self.assertEqual(self.mod.parse_topics(""), [])
def test_whitespace_only_returns_empty(self):
self.assertEqual(self.mod.parse_topics(" "), [])
def test_single_topic(self):
self.assertEqual(self.mod.parse_topics("/topic/foo"), ["/topic/foo"])
def test_multiple_topics(self):
r = self.mod.parse_topics("/a,/b,/c")
self.assertEqual(r, ["/a", "/b", "/c"])
def test_strips_whitespace(self):
r = self.mod.parse_topics(" /a , /b ")
self.assertEqual(r, ["/a", "/b"])
def test_ignores_empty_segments(self):
r = self.mod.parse_topics("/a,,/b")
self.assertEqual(r, ["/a", "/b"])
class TestStatusConstants(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_idle(self): self.assertEqual(self.mod.STATUS_IDLE, "idle")
def test_recording(self): self.assertEqual(self.mod.STATUS_RECORDING, "recording")
def test_stopping(self): self.assertEqual(self.mod.STATUS_STOPPING, "stopping")
def test_error(self): self.assertEqual(self.mod.STATUS_ERROR, "error")
# ── Tests: compute_recording_transition ──────────────────────────────────────
class TestComputeRecordingTransition(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def _tr(self, state, trigger=None, proc_running=True,
now=100.0, rec_t=0.0, stop_t=0.0,
auto_stop=60.0, stop_tmo=5.0):
return self.mod.compute_recording_transition(
state, trigger, proc_running, now,
rec_t, stop_t, auto_stop, stop_tmo,
)
# ── IDLE ──────────────────────────────────────────────────────────
def test_idle_no_trigger_stays_idle(self):
s, fk = self._tr("idle", trigger=None)
self.assertEqual(s, "idle"); self.assertFalse(fk)
def test_idle_false_trigger_stays_idle(self):
s, fk = self._tr("idle", trigger=False)
self.assertEqual(s, "idle"); self.assertFalse(fk)
def test_idle_true_trigger_starts_recording(self):
s, fk = self._tr("idle", trigger=True)
self.assertEqual(s, "recording"); self.assertFalse(fk)
# ── RECORDING ─────────────────────────────────────────────────────
def test_recording_stable_no_change(self):
s, fk = self._tr("recording", proc_running=True)
self.assertEqual(s, "recording"); self.assertFalse(fk)
def test_recording_false_trigger_stops(self):
s, fk = self._tr("recording", trigger=False, proc_running=True)
self.assertEqual(s, "stopping"); self.assertFalse(fk)
def test_recording_proc_dies_error(self):
s, fk = self._tr("recording", proc_running=False)
self.assertEqual(s, "error"); self.assertFalse(fk)
def test_recording_auto_stop_fires(self):
# started at t=40, now=t=101 → 61 s elapsed > auto_stop=60
s, fk = self._tr("recording", proc_running=True,
now=101.0, rec_t=40.0, auto_stop=60.0)
self.assertEqual(s, "stopping"); self.assertFalse(fk)
def test_recording_auto_stop_not_yet(self):
# started at t=50, now=100 → 50 s < 60 s
s, fk = self._tr("recording", proc_running=True,
now=100.0, rec_t=50.0, auto_stop=60.0)
self.assertEqual(s, "recording")
def test_recording_auto_stop_at_exactly_timeout(self):
s, fk = self._tr("recording", proc_running=True,
now=110.0, rec_t=50.0, auto_stop=60.0)
self.assertEqual(s, "stopping")
def test_recording_auto_stop_disabled(self):
# auto_stop_s=0 → never auto-stops
s, fk = self._tr("recording", proc_running=True,
now=9999.0, rec_t=0.0, auto_stop=0.0)
self.assertEqual(s, "recording")
# ── STOPPING ──────────────────────────────────────────────────────
def test_stopping_proc_running_stays(self):
s, fk = self._tr("stopping", proc_running=True)
self.assertEqual(s, "stopping"); self.assertFalse(fk)
def test_stopping_proc_exits_idle(self):
s, fk = self._tr("stopping", proc_running=False)
self.assertEqual(s, "idle"); self.assertFalse(fk)
def test_stopping_force_kill_after_timeout(self):
# entered stopping at t=95, now=101 → 6 s > stop_tmo=5
s, fk = self._tr("stopping", proc_running=True,
now=101.0, stop_t=95.0, stop_tmo=5.0)
self.assertEqual(s, "idle"); self.assertTrue(fk)
def test_stopping_not_yet_force_kill(self):
# entered at t=98, now=100 → 2 s < 5 s
s, fk = self._tr("stopping", proc_running=True,
now=100.0, stop_t=98.0, stop_tmo=5.0)
self.assertEqual(s, "stopping"); self.assertFalse(fk)
def test_stopping_timeout_disabled(self):
# stop_tmo=0 → never force-kills
s, fk = self._tr("stopping", proc_running=True,
now=9999.0, stop_t=0.0, stop_tmo=0.0)
self.assertEqual(s, "stopping"); self.assertFalse(fk)
def test_stopping_force_kill_exactly_at_timeout(self):
s, fk = self._tr("stopping", proc_running=True,
now=100.0, stop_t=95.0, stop_tmo=5.0)
self.assertEqual(s, "idle"); self.assertTrue(fk)
# ── ERROR ─────────────────────────────────────────────────────────
def test_error_stays_without_trigger(self):
s, fk = self._tr("error", trigger=None)
self.assertEqual(s, "error"); self.assertFalse(fk)
def test_error_false_trigger_stays(self):
s, fk = self._tr("error", trigger=False)
self.assertEqual(s, "error")
def test_error_true_trigger_retries(self):
s, fk = self._tr("error", trigger=True)
self.assertEqual(s, "recording"); self.assertFalse(fk)
# ── Return shape ──────────────────────────────────────────────────
def test_returns_tuple_of_two(self):
result = self._tr("idle")
self.assertEqual(len(result), 2)
def test_force_kill_is_bool(self):
_, fk = self._tr("idle")
self.assertIsInstance(fk, bool)
# ── Tests: node init ──────────────────────────────────────────────────────────
class TestNodeInit(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_instantiates(self):
self.assertIsNotNone(_make_node(self.mod))
def test_initial_state_idle(self):
node = _make_node(self.mod)
self.assertEqual(node.state, "idle")
def test_publishes_initial_idle(self):
node = _make_node(self.mod)
pub = _pub(node)
self.assertEqual(pub.msgs[0].data, "idle")
def test_publisher_registered(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/recording_status", node._pubs)
def test_subscriber_registered(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/record_trigger", node._subs)
def test_timer_registered(self):
node = _make_node(self.mod)
self.assertGreater(len(node._timers), 0)
def test_custom_topics(self):
node = _make_node(self.mod,
trigger_topic="/my/trigger",
status_topic="/my/status")
self.assertIn("/my/trigger", node._subs)
self.assertIn("/my/status", node._pubs)
def test_topics_parsed_correctly(self):
node = _make_node(self.mod, topics="/a,/b,/c")
self.assertEqual(node._topics, ["/a", "/b", "/c"])
def test_empty_topics_means_all(self):
node = _make_node(self.mod, topics="")
self.assertEqual(node._topics, [])
def test_auto_stop_s_stored(self):
node = _make_node(self.mod, auto_stop_s=30.0)
self.assertAlmostEqual(node._auto_stop_s, 30.0)
def test_current_bag_empty_initially(self):
node = _make_node(self.mod)
self.assertEqual(node.current_bag, "")
# ── Tests: _on_trigger ────────────────────────────────────────────────────────
class TestOnTrigger(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_stores_true_trigger(self):
node = _make_node(self.mod)
_trigger(node, True)
with node._lock:
self.assertTrue(node._trigger)
def test_stores_false_trigger(self):
node = _make_node(self.mod)
_trigger(node, False)
with node._lock:
self.assertFalse(node._trigger)
def test_trigger_consumed_after_poll(self):
rec = _MockRecorder()
node = _make_node(self.mod, recorder=rec)
_trigger(node, True)
node._poll_cb()
with node._lock:
self.assertIsNone(node._trigger)
# ── Tests: poll loop — full state machine ────────────────────────────────────
class TestPollCb(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def _node(self, **kwargs):
rec = _MockRecorder()
node = _make_node(self.mod, recorder=rec, **kwargs)
return node, rec
def test_true_trigger_starts_recording(self):
node, rec = self._node()
_trigger(node, True)
node._poll_cb()
self.assertEqual(node.state, "recording")
self.assertIn("start", rec.call_types())
def test_recording_publishes_status(self):
node, rec = self._node()
_trigger(node, True)
node._poll_cb()
self.assertEqual(_pub(node).msgs[-1].data, "recording")
def test_false_trigger_stops_recording(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb() # → recording
_trigger(node, False); node._poll_cb() # → stopping
self.assertEqual(node.state, "stopping")
self.assertIn("stop", rec.call_types())
def test_stopping_publishes_status(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb()
_trigger(node, False); node._poll_cb()
self.assertEqual(_pub(node).msgs[-1].data, "stopping")
def test_after_stop_proc_exit_idle(self):
"""Once the mock recorder stops, next poll resolves to idle."""
node, rec = self._node()
_trigger(node, True); node._poll_cb() # recording
_trigger(node, False); node._poll_cb() # stopping (rec.stop() sets running=False)
node._poll_cb() # proc not running → idle
self.assertEqual(node.state, "idle")
def test_idle_publishes_after_stop(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb()
_trigger(node, False); node._poll_cb()
node._poll_cb()
self.assertEqual(_pub(node).msgs[-1].data, "idle")
def test_auto_stop_triggers_stopping(self):
node, rec = self._node(auto_stop_s=1.0)
_trigger(node, True)
node._poll_cb() # → recording
# Back-date start time so auto-stop fires
with node._lock:
node._record_start_t = time.monotonic() - 10.0
node._poll_cb() # → stopping
self.assertEqual(node.state, "stopping")
def test_auto_stop_disabled(self):
node, rec = self._node(auto_stop_s=0.0)
_trigger(node, True)
node._poll_cb()
with node._lock:
node._record_start_t = time.monotonic() - 9999.0
node._poll_cb()
# Should still be recording (auto-stop disabled)
self.assertEqual(node.state, "recording")
def test_proc_dies_error(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb() # → recording
rec._running = False # simulate unexpected exit
node._poll_cb()
self.assertEqual(node.state, "error")
def test_error_publishes_status(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb()
rec._running = False
node._poll_cb()
self.assertEqual(_pub(node).msgs[-1].data, "error")
def test_error_retries_on_true_trigger(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb() # recording
rec._running = False
node._poll_cb() # error
_trigger(node, True); node._poll_cb() # retry → recording
self.assertEqual(node.state, "recording")
def test_start_failure_enters_error(self):
rec = _MockRecorder(start_succeeds=False)
node = _make_node(self.mod, recorder=rec)
_trigger(node, True)
node._poll_cb()
self.assertEqual(node.state, "error")
def test_force_kill_on_stop_timeout(self):
"""Stubborn process that ignores SIGINT → force-killed after timeout."""
class _StubbornRecorder(_MockRecorder):
def stop(self):
self.calls.append(("stop",))
# Don't set _running = False — simulates process ignoring SIGINT
stubborn = _StubbornRecorder()
node = _make_node(self.mod, recorder=stubborn, stop_timeout_s=2.0)
_trigger(node, True); node._poll_cb() # → recording
_trigger(node, False); node._poll_cb() # → stopping (process stays alive)
self.assertEqual(node.state, "stopping")
# Expire the stop timeout
with node._lock:
node._stop_start_t = time.monotonic() - 10.0
node._poll_cb() # → force kill → idle
self.assertIn("kill", stubborn.call_types())
self.assertEqual(node.state, "idle")
def test_bag_path_set_when_recording(self):
node, rec = self._node(bag_prefix="mytest")
_trigger(node, True)
node._poll_cb()
self.assertIn("mytest", node.current_bag)
def test_bag_path_cleared_after_idle(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb()
_trigger(node, False); node._poll_cb()
node._poll_cb()
self.assertEqual(node.current_bag, "")
def test_topics_passed_to_recorder(self):
rec = _MockRecorder()
node = _make_node(self.mod, recorder=rec, topics="/a,/b")
_trigger(node, True)
node._poll_cb()
start_calls = [c for c in rec.calls if c[0] == "start"]
self.assertEqual(len(start_calls), 1)
self.assertEqual(start_calls[0][1], ["/a", "/b"])
def test_empty_topics_passes_empty_list(self):
rec = _MockRecorder()
node = _make_node(self.mod, recorder=rec, topics="")
_trigger(node, True)
node._poll_cb()
start_calls = [c for c in rec.calls if c[0] == "start"]
self.assertEqual(start_calls[0][1], [])
def test_recorder_reset_on_idle(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb()
_trigger(node, False); node._poll_cb()
node._poll_cb() # idle
self.assertIn("reset", rec.call_types())
def test_full_lifecycle_status_sequence(self):
"""idle → recording → stopping → idle."""
node, rec = self._node()
pub = _pub(node)
_trigger(node, True); node._poll_cb()
_trigger(node, False); node._poll_cb()
node._poll_cb()
statuses = [m.data for m in pub.msgs]
self.assertIn("idle", statuses)
self.assertIn("recording", statuses)
self.assertIn("stopping", statuses)
def test_logging_on_start(self):
node, rec = self._node()
_trigger(node, True)
node._poll_cb()
infos = [m for lvl, m in node._logs if lvl == "INFO"]
self.assertTrue(any("recording" in m.lower() for m in infos))
def test_logging_on_stop(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb()
_trigger(node, False); node._poll_cb()
infos = [m for lvl, m in node._logs if lvl == "INFO"]
self.assertTrue(any("stop" in m.lower() for m in infos))
def test_logging_on_error(self):
node, rec = self._node()
_trigger(node, True); node._poll_cb()
rec._running = False
node._poll_cb()
errors = [m for lvl, m in node._logs if lvl == "ERROR"]
self.assertTrue(len(errors) > 0)
# ── Tests: source content ─────────────────────────────────────────────────────
class TestNodeSrc(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(_SRC) as f: cls.src = f.read()
def test_issue_tag(self): self.assertIn("#332", self.src)
def test_trigger_topic(self): self.assertIn("/saltybot/record_trigger", self.src)
def test_status_topic(self): self.assertIn("/saltybot/recording_status", self.src)
def test_status_idle(self): self.assertIn('"idle"', self.src)
def test_status_recording(self): self.assertIn('"recording"', self.src)
def test_status_stopping(self): self.assertIn('"stopping"', self.src)
def test_status_error(self): self.assertIn('"error"', self.src)
def test_compute_transition_fn(self): self.assertIn("compute_recording_transition", self.src)
def test_bag_recorder_process(self): self.assertIn("BagRecorderProcess", self.src)
def test_make_bag_path(self): self.assertIn("make_bag_path", self.src)
def test_parse_topics(self): self.assertIn("parse_topics", self.src)
def test_auto_stop_param(self): self.assertIn("auto_stop_s", self.src)
def test_stop_timeout_param(self): self.assertIn("stop_timeout_s", self.src)
def test_compression_param(self): self.assertIn("compression", self.src)
def test_subprocess_used(self): self.assertIn("subprocess", self.src)
def test_sigint_used(self): self.assertIn("SIGINT", self.src)
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
def test_recorder_injectable(self): self.assertIn("_recorder", self.src)
def test_main_defined(self): self.assertIn("def main", self.src)
def test_bool_subscription(self): self.assertIn("Bool", self.src)
def test_string_publication(self): self.assertIn("String", self.src)
# ── Tests: config / launch / setup ────────────────────────────────────────────
class TestConfig(unittest.TestCase):
_CONFIG = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/config/rosbag_recorder_params.yaml"
)
_LAUNCH = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/launch/rosbag_recorder.launch.py"
)
_SETUP = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/setup.py"
)
def test_config_exists(self):
import os; self.assertTrue(os.path.exists(self._CONFIG))
def test_config_auto_stop(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("auto_stop_s", c)
def test_config_bag_dir(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("bag_dir", c)
def test_config_topics(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("topics", c)
def test_config_compression(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("compression", c)
def test_config_stop_timeout(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("stop_timeout_s", c)
def test_launch_exists(self):
import os; self.assertTrue(os.path.exists(self._LAUNCH))
def test_launch_auto_stop_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("auto_stop_s", c)
def test_launch_topics_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("topics", c)
def test_entry_point_in_setup(self):
with open(self._SETUP) as f: c = f.read()
self.assertIn("rosbag_recorder_node", c)
if __name__ == "__main__":
unittest.main()