feat(social): trigger-based ROS2 bag recorder (Issue #332) #335
@ -0,0 +1,20 @@
|
||||
rosbag_recorder_node:
|
||||
ros__parameters:
|
||||
trigger_topic: "/saltybot/record_trigger"
|
||||
status_topic: "/saltybot/recording_status"
|
||||
|
||||
# Comma-separated topic list to record.
|
||||
# Empty string = record all topics (ros2 bag record --all).
|
||||
# Example: "/saltybot/camera_status,/saltybot/wake_word_detected,/cmd_vel"
|
||||
topics: ""
|
||||
|
||||
bag_dir: "/tmp/saltybot_bags" # output directory (created if absent)
|
||||
bag_prefix: "saltybot" # filename prefix; timestamp appended
|
||||
|
||||
auto_stop_s: 60.0 # auto-stop after N seconds; 0 = disabled
|
||||
stop_timeout_s: 5.0 # force-kill if subprocess won't stop within N s
|
||||
|
||||
compression: false # enable zstd file-level compression
|
||||
max_bag_size_mb: 0.0 # split bags at this size (MiB); 0 = no limit
|
||||
|
||||
poll_rate: 2.0 # state-machine check frequency (Hz)
|
||||
@ -0,0 +1,47 @@
|
||||
"""rosbag_recorder.launch.py — Launch trigger-based bag recorder (Issue #332).
|
||||
|
||||
Usage:
|
||||
ros2 launch saltybot_social rosbag_recorder.launch.py
|
||||
ros2 launch saltybot_social rosbag_recorder.launch.py auto_stop_s:=120.0
|
||||
ros2 launch saltybot_social rosbag_recorder.launch.py \\
|
||||
topics:="/saltybot/camera_status,/cmd_vel" bag_dir:=/data/bags
|
||||
"""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg = get_package_share_directory("saltybot_social")
|
||||
cfg = os.path.join(pkg, "config", "rosbag_recorder_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument("topics", default_value="",
|
||||
description="Comma-separated topics (empty=all)"),
|
||||
DeclareLaunchArgument("bag_dir", default_value="/tmp/saltybot_bags",
|
||||
description="Output directory for bag files"),
|
||||
DeclareLaunchArgument("auto_stop_s", default_value="60.0",
|
||||
description="Auto-stop timeout in seconds (0=off)"),
|
||||
DeclareLaunchArgument("compression", default_value="false",
|
||||
description="Enable zstd compression"),
|
||||
|
||||
Node(
|
||||
package="saltybot_social",
|
||||
executable="rosbag_recorder_node",
|
||||
name="rosbag_recorder_node",
|
||||
output="screen",
|
||||
parameters=[
|
||||
cfg,
|
||||
{
|
||||
"topics": LaunchConfiguration("topics"),
|
||||
"bag_dir": LaunchConfiguration("bag_dir"),
|
||||
"auto_stop_s": LaunchConfiguration("auto_stop_s"),
|
||||
"compression": LaunchConfiguration("compression"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
@ -0,0 +1,373 @@
|
||||
"""rosbag_recorder_node.py — Trigger-based ROS2 bag recorder.
|
||||
Issue #332
|
||||
|
||||
Subscribes to /saltybot/record_trigger (Bool). True starts recording;
|
||||
False stops it. Auto-stop fires after ``auto_stop_s`` seconds if still
|
||||
running. Recording is performed by spawning a ``ros2 bag record``
|
||||
subprocess which is sent SIGINT for graceful shutdown and SIGKILL if it
|
||||
does not exit within ``stop_timeout_s``.
|
||||
|
||||
Status values
|
||||
─────────────
|
||||
"idle" — not recording
|
||||
"recording" — subprocess active, writing to bag file
|
||||
"stopping" — SIGINT sent, waiting for subprocess to exit
|
||||
"error" — subprocess died unexpectedly; new trigger retries
|
||||
|
||||
Subscriptions
|
||||
─────────────
|
||||
/saltybot/record_trigger std_msgs/Bool — True = start, False = stop
|
||||
|
||||
Publications
|
||||
────────────
|
||||
/saltybot/recording_status std_msgs/String — status value (see above)
|
||||
|
||||
Parameters
|
||||
──────────
|
||||
trigger_topic (str, "/saltybot/record_trigger")
|
||||
status_topic (str, "/saltybot/recording_status")
|
||||
topics (str, "") comma-separated topic list;
|
||||
empty string → record all topics (-a)
|
||||
bag_dir (str, "/tmp/saltybot_bags")
|
||||
bag_prefix (str, "saltybot")
|
||||
auto_stop_s (float, 60.0) 0 = no auto-stop
|
||||
stop_timeout_s (float, 5.0) force-kill after this many seconds
|
||||
compression (bool, False) enable zstd file compression
|
||||
max_bag_size_mb (float, 0.0) 0 = unlimited
|
||||
poll_rate (float, 2.0) state-machine check frequency (Hz)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile
|
||||
from std_msgs.msg import Bool, String
|
||||
|
||||
|
||||
# ── Status constants ───────────────────────────────────────────────────────────
|
||||
|
||||
STATUS_IDLE = "idle"
|
||||
STATUS_RECORDING = "recording"
|
||||
STATUS_STOPPING = "stopping"
|
||||
STATUS_ERROR = "error"
|
||||
|
||||
|
||||
# ── Pure helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
def make_bag_path(bag_dir: str, prefix: str) -> str:
|
||||
"""Return a timestamped output path for a new bag (no file created)."""
|
||||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
return os.path.join(bag_dir, f"{prefix}_{ts}")
|
||||
|
||||
|
||||
def parse_topics(topics_str: str) -> List[str]:
|
||||
"""Parse a comma-separated topic string into a clean list.
|
||||
|
||||
Returns an empty list when *topics_str* is blank (meaning record-all).
|
||||
"""
|
||||
if not topics_str or not topics_str.strip():
|
||||
return []
|
||||
return [t.strip() for t in topics_str.split(",") if t.strip()]
|
||||
|
||||
|
||||
def compute_recording_transition(
|
||||
state: str,
|
||||
trigger: Optional[bool],
|
||||
proc_running: bool,
|
||||
now: float,
|
||||
record_start_t: float,
|
||||
stop_start_t: float,
|
||||
auto_stop_s: float,
|
||||
stop_timeout_s: float,
|
||||
) -> Tuple[str, bool]:
|
||||
"""Pure state-machine step — no I/O, no ROS.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
state : current status string
|
||||
trigger : latest trigger value (True=start, False=stop, None=none)
|
||||
proc_running : whether the recorder subprocess is alive
|
||||
now : current monotonic time (s)
|
||||
record_start_t : monotonic time recording began (0 if not recording)
|
||||
stop_start_t : monotonic time STOPPING began (0 if not stopping)
|
||||
auto_stop_s : auto-stop after this many seconds (0 = disabled)
|
||||
stop_timeout_s : force-kill if stopping > this long (0 = disabled)
|
||||
|
||||
Returns
|
||||
-------
|
||||
(new_state, force_kill)
|
||||
force_kill=True signals the caller to SIGKILL the process.
|
||||
"""
|
||||
if state == STATUS_IDLE:
|
||||
if trigger is True:
|
||||
return STATUS_RECORDING, False
|
||||
return STATUS_IDLE, False
|
||||
|
||||
if state == STATUS_RECORDING:
|
||||
if not proc_running:
|
||||
return STATUS_ERROR, False
|
||||
if trigger is False:
|
||||
return STATUS_STOPPING, False
|
||||
if (auto_stop_s > 0 and record_start_t > 0
|
||||
and (now - record_start_t) >= auto_stop_s):
|
||||
return STATUS_STOPPING, False
|
||||
return STATUS_RECORDING, False
|
||||
|
||||
if state == STATUS_STOPPING:
|
||||
if not proc_running:
|
||||
return STATUS_IDLE, False
|
||||
if (stop_timeout_s > 0 and stop_start_t > 0
|
||||
and (now - stop_start_t) >= stop_timeout_s):
|
||||
return STATUS_IDLE, True # force-kill
|
||||
return STATUS_STOPPING, False
|
||||
|
||||
# STATUS_ERROR
|
||||
if trigger is True:
|
||||
return STATUS_RECORDING, False
|
||||
return STATUS_ERROR, False
|
||||
|
||||
|
||||
# ── Subprocess wrapper ─────────────────────────────────────────────────────────
|
||||
|
||||
class BagRecorderProcess:
|
||||
"""Thin wrapper around a ``ros2 bag record`` subprocess."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._proc: Optional[subprocess.Popen] = None
|
||||
|
||||
def start(self, topics: List[str], output_path: str,
|
||||
compression: bool = False,
|
||||
max_size_mb: float = 0.0) -> bool:
|
||||
"""Launch the recorder. Returns False if already running or on error."""
|
||||
if self.is_running():
|
||||
return False
|
||||
|
||||
cmd = ["ros2", "bag", "record", "--output", output_path]
|
||||
if topics:
|
||||
cmd += topics
|
||||
else:
|
||||
cmd += ["--all"]
|
||||
if compression:
|
||||
cmd += ["--compression-mode", "file",
|
||||
"--compression-format", "zstd"]
|
||||
if max_size_mb > 0:
|
||||
cmd += ["--max-bag-size",
|
||||
str(int(max_size_mb * 1024 * 1024))]
|
||||
|
||||
try:
|
||||
self._proc = subprocess.Popen(
|
||||
cmd,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.PIPE,
|
||||
preexec_fn=os.setsid,
|
||||
)
|
||||
return True
|
||||
except Exception:
|
||||
self._proc = None
|
||||
return False
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Send SIGINT to the process group for graceful shutdown."""
|
||||
if self._proc is not None and self._proc.poll() is None:
|
||||
try:
|
||||
os.killpg(os.getpgid(self._proc.pid), signal.SIGINT)
|
||||
except (ProcessLookupError, PermissionError, OSError):
|
||||
pass
|
||||
|
||||
def kill(self) -> None:
|
||||
"""Send SIGKILL to the process group for forced shutdown."""
|
||||
if self._proc is not None:
|
||||
try:
|
||||
os.killpg(os.getpgid(self._proc.pid), signal.SIGKILL)
|
||||
except (ProcessLookupError, PermissionError, OSError):
|
||||
pass
|
||||
self._proc = None
|
||||
|
||||
def is_running(self) -> bool:
|
||||
return self._proc is not None and self._proc.poll() is None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Clear internal proc reference (call after graceful exit)."""
|
||||
self._proc = None
|
||||
|
||||
|
||||
# ── ROS2 node ──────────────────────────────────────────────────────────────────
|
||||
|
||||
class RosbagRecorderNode(Node):
|
||||
"""Trigger-based ROS bag recorder with auto-stop and status reporting."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("rosbag_recorder_node")
|
||||
|
||||
self.declare_parameter("trigger_topic", "/saltybot/record_trigger")
|
||||
self.declare_parameter("status_topic", "/saltybot/recording_status")
|
||||
self.declare_parameter("topics", "")
|
||||
self.declare_parameter("bag_dir", "/tmp/saltybot_bags")
|
||||
self.declare_parameter("bag_prefix", "saltybot")
|
||||
self.declare_parameter("auto_stop_s", 60.0)
|
||||
self.declare_parameter("stop_timeout_s", 5.0)
|
||||
self.declare_parameter("compression", False)
|
||||
self.declare_parameter("max_bag_size_mb", 0.0)
|
||||
self.declare_parameter("poll_rate", 2.0)
|
||||
|
||||
trigger_topic = self.get_parameter("trigger_topic").value
|
||||
status_topic = self.get_parameter("status_topic").value
|
||||
topics_str = self.get_parameter("topics").value
|
||||
self._bag_dir = str(self.get_parameter("bag_dir").value)
|
||||
self._bag_prefix = str(self.get_parameter("bag_prefix").value)
|
||||
self._auto_stop_s = float(self.get_parameter("auto_stop_s").value)
|
||||
self._stop_tmo_s = float(self.get_parameter("stop_timeout_s").value)
|
||||
self._compression = bool(self.get_parameter("compression").value)
|
||||
self._max_mb = float(self.get_parameter("max_bag_size_mb").value)
|
||||
poll_rate = float(self.get_parameter("poll_rate").value)
|
||||
|
||||
self._topics: List[str] = parse_topics(str(topics_str))
|
||||
|
||||
# Recorder process — injectable for tests
|
||||
self._recorder: BagRecorderProcess = BagRecorderProcess()
|
||||
|
||||
# State
|
||||
self._state = STATUS_IDLE
|
||||
self._trigger: Optional[bool] = None
|
||||
self._record_start_t: float = 0.0
|
||||
self._stop_start_t: float = 0.0
|
||||
self._current_bag: str = ""
|
||||
self._lock = threading.Lock()
|
||||
|
||||
qos = QoSProfile(depth=10)
|
||||
self._status_pub = self.create_publisher(String, status_topic, qos)
|
||||
self._trigger_sub = self.create_subscription(
|
||||
Bool, trigger_topic, self._on_trigger, qos
|
||||
)
|
||||
self._timer = self.create_timer(1.0 / poll_rate, self._poll_cb)
|
||||
|
||||
# Publish initial state
|
||||
self._publish(STATUS_IDLE)
|
||||
|
||||
topic_desc = ",".join(self._topics) if self._topics else "<all>"
|
||||
self.get_logger().info(
|
||||
f"RosbagRecorderNode ready — topics={topic_desc}, "
|
||||
f"bag_dir={self._bag_dir}, auto_stop={self._auto_stop_s}s"
|
||||
)
|
||||
|
||||
# ── Subscription ───────────────────────────────────────────────────
|
||||
|
||||
def _on_trigger(self, msg) -> None:
|
||||
with self._lock:
|
||||
self._trigger = bool(msg.data)
|
||||
|
||||
# ── Poll / state machine ────────────────────────────────────────────
|
||||
|
||||
def _poll_cb(self) -> None:
|
||||
now = time.monotonic()
|
||||
|
||||
with self._lock:
|
||||
trigger = self._trigger
|
||||
self._trigger = None # consume
|
||||
state = self._state
|
||||
rec_t = self._record_start_t
|
||||
stop_t = self._stop_start_t
|
||||
|
||||
proc_running = self._recorder.is_running()
|
||||
|
||||
new_state, force_kill = compute_recording_transition(
|
||||
state, trigger, proc_running, now,
|
||||
rec_t, stop_t, self._auto_stop_s, self._stop_tmo_s,
|
||||
)
|
||||
|
||||
if force_kill:
|
||||
self._recorder.kill()
|
||||
self.get_logger().warn(
|
||||
f"RosbagRecorder: force-killed (stop_timeout={self._stop_tmo_s}s)"
|
||||
)
|
||||
|
||||
if new_state != state:
|
||||
self._enter_state(new_state, now)
|
||||
|
||||
def _enter_state(self, new_state: str, now: float) -> None:
|
||||
if new_state == STATUS_RECORDING:
|
||||
bag_path = make_bag_path(self._bag_dir, self._bag_prefix)
|
||||
started = self._recorder.start(
|
||||
self._topics, bag_path,
|
||||
compression=self._compression,
|
||||
max_size_mb=self._max_mb,
|
||||
)
|
||||
if not started:
|
||||
new_state = STATUS_ERROR
|
||||
self.get_logger().error(
|
||||
"RosbagRecorder: failed to start recorder subprocess"
|
||||
)
|
||||
else:
|
||||
with self._lock:
|
||||
self._record_start_t = now
|
||||
self._current_bag = bag_path
|
||||
self.get_logger().info(
|
||||
f"RosbagRecorder: recording started → {bag_path}"
|
||||
)
|
||||
|
||||
elif new_state == STATUS_STOPPING:
|
||||
self._recorder.stop()
|
||||
with self._lock:
|
||||
self._stop_start_t = now
|
||||
self.get_logger().info("RosbagRecorder: stopping (SIGINT sent)")
|
||||
|
||||
elif new_state == STATUS_IDLE:
|
||||
bag = self._current_bag
|
||||
with self._lock:
|
||||
self._record_start_t = 0.0
|
||||
self._stop_start_t = 0.0
|
||||
self._current_bag = ""
|
||||
self._recorder.reset()
|
||||
self.get_logger().info(
|
||||
f"RosbagRecorder: recording complete → {bag}"
|
||||
)
|
||||
|
||||
elif new_state == STATUS_ERROR:
|
||||
self.get_logger().error(
|
||||
"RosbagRecorder: subprocess exited unexpectedly"
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._state = new_state
|
||||
|
||||
self._publish(new_state)
|
||||
|
||||
# ── Publish ─────────────────────────────────────────────────────────
|
||||
|
||||
def _publish(self, status: str) -> None:
|
||||
msg = String()
|
||||
msg.data = status
|
||||
self._status_pub.publish(msg)
|
||||
|
||||
# ── Public accessors ─────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def state(self) -> str:
|
||||
with self._lock:
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def current_bag(self) -> str:
|
||||
with self._lock:
|
||||
return self._current_bag
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = RosbagRecorderNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
@ -61,6 +61,8 @@ setup(
|
||||
'wake_word_node = saltybot_social.wake_word_node:main',
|
||||
# USB camera hot-plug monitor (Issue #320)
|
||||
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
|
||||
# Trigger-based ROS2 bag recorder (Issue #332)
|
||||
'rosbag_recorder_node = saltybot_social.rosbag_recorder_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
719
jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py
Normal file
719
jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py
Normal file
@ -0,0 +1,719 @@
|
||||
"""test_rosbag_recorder.py — Offline tests for rosbag_recorder_node (Issue #332).
|
||||
|
||||
Stubs out rclpy and ROS message types.
|
||||
BagRecorderProcess is replaced with MockRecorder — no subprocesses are spawned.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import sys
|
||||
import time
|
||||
import types
|
||||
import unittest
|
||||
|
||||
|
||||
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_ros_stubs():
|
||||
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
|
||||
"std_msgs", "std_msgs.msg"):
|
||||
if mod_name not in sys.modules:
|
||||
sys.modules[mod_name] = types.ModuleType(mod_name)
|
||||
|
||||
class _Node:
|
||||
def __init__(self, name="node"):
|
||||
self._name = name
|
||||
if not hasattr(self, "_params"):
|
||||
self._params = {}
|
||||
self._pubs = {}
|
||||
self._subs = {}
|
||||
self._logs = []
|
||||
self._timers = []
|
||||
|
||||
def declare_parameter(self, name, default):
|
||||
if name not in self._params:
|
||||
self._params[name] = default
|
||||
|
||||
def get_parameter(self, name):
|
||||
class _P:
|
||||
def __init__(self, v): self.value = v
|
||||
return _P(self._params.get(name))
|
||||
|
||||
def create_publisher(self, msg_type, topic, qos):
|
||||
pub = _FakePub()
|
||||
self._pubs[topic] = pub
|
||||
return pub
|
||||
|
||||
def create_subscription(self, msg_type, topic, cb, qos):
|
||||
self._subs[topic] = cb
|
||||
return object()
|
||||
|
||||
def create_timer(self, period, cb):
|
||||
self._timers.append(cb)
|
||||
return object()
|
||||
|
||||
def get_logger(self):
|
||||
node = self
|
||||
class _L:
|
||||
def info(self, m): node._logs.append(("INFO", m))
|
||||
def warn(self, m): node._logs.append(("WARN", m))
|
||||
def error(self, m): node._logs.append(("ERROR", m))
|
||||
return _L()
|
||||
|
||||
def destroy_node(self): pass
|
||||
|
||||
class _FakePub:
|
||||
def __init__(self):
|
||||
self.msgs = []
|
||||
def publish(self, msg):
|
||||
self.msgs.append(msg)
|
||||
|
||||
class _QoSProfile:
|
||||
def __init__(self, depth=10): self.depth = depth
|
||||
|
||||
class _Bool:
|
||||
def __init__(self): self.data = False
|
||||
|
||||
class _String:
|
||||
def __init__(self): self.data = ""
|
||||
|
||||
rclpy_mod = sys.modules["rclpy"]
|
||||
rclpy_mod.init = lambda args=None: None
|
||||
rclpy_mod.spin = lambda node: None
|
||||
rclpy_mod.shutdown = lambda: None
|
||||
|
||||
sys.modules["rclpy.node"].Node = _Node
|
||||
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
|
||||
sys.modules["std_msgs.msg"].Bool = _Bool
|
||||
sys.modules["std_msgs.msg"].String = _String
|
||||
|
||||
return _Node, _FakePub, _Bool, _String
|
||||
|
||||
|
||||
_Node, _FakePub, _Bool, _String = _make_ros_stubs()
|
||||
|
||||
|
||||
# ── Module loader ─────────────────────────────────────────────────────────────
|
||||
|
||||
_SRC = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/saltybot_social/rosbag_recorder_node.py"
|
||||
)
|
||||
|
||||
|
||||
def _load_mod():
|
||||
spec = importlib.util.spec_from_file_location("rosbag_recorder_testmod", _SRC)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
class _MockRecorder:
|
||||
"""Injectable replacement for BagRecorderProcess."""
|
||||
|
||||
def __init__(self, start_succeeds: bool = True) -> None:
|
||||
self.start_succeeds = start_succeeds
|
||||
self._running = False
|
||||
self.calls: list = []
|
||||
|
||||
def start(self, topics, output_path, compression=False, max_size_mb=0.0):
|
||||
self.calls.append(("start", list(topics), output_path))
|
||||
if self.start_succeeds:
|
||||
self._running = True
|
||||
return True
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
self.calls.append(("stop",))
|
||||
self._running = False # immediately "gone" for deterministic tests
|
||||
|
||||
def kill(self):
|
||||
self.calls.append(("kill",))
|
||||
self._running = False
|
||||
|
||||
def is_running(self):
|
||||
return self._running
|
||||
|
||||
def reset(self):
|
||||
self.calls.append(("reset",))
|
||||
self._running = False
|
||||
|
||||
def call_types(self):
|
||||
return [c[0] for c in self.calls]
|
||||
|
||||
|
||||
def _make_node(mod, recorder=None, **kwargs):
|
||||
node = mod.RosbagRecorderNode.__new__(mod.RosbagRecorderNode)
|
||||
defaults = {
|
||||
"trigger_topic": "/saltybot/record_trigger",
|
||||
"status_topic": "/saltybot/recording_status",
|
||||
"topics": "",
|
||||
"bag_dir": "/tmp/test_bags",
|
||||
"bag_prefix": "test",
|
||||
"auto_stop_s": 60.0,
|
||||
"stop_timeout_s": 5.0,
|
||||
"compression": False,
|
||||
"max_bag_size_mb": 0.0,
|
||||
"poll_rate": 2.0,
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
node._params = dict(defaults)
|
||||
mod.RosbagRecorderNode.__init__(node)
|
||||
if recorder is not None:
|
||||
node._recorder = recorder
|
||||
return node
|
||||
|
||||
|
||||
def _trigger(node, value: bool):
|
||||
"""Deliver a Bool trigger message."""
|
||||
msg = _Bool()
|
||||
msg.data = value
|
||||
node._subs["/saltybot/record_trigger"](msg)
|
||||
|
||||
|
||||
def _pub(node):
|
||||
return node._pubs["/saltybot/recording_status"]
|
||||
|
||||
|
||||
# ── Tests: pure helpers ────────────────────────────────────────────────────────
|
||||
|
||||
class TestMakeBagPath(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_contains_prefix(self):
|
||||
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
|
||||
self.assertIn("saltybot", p)
|
||||
|
||||
def test_contains_bag_dir(self):
|
||||
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
|
||||
self.assertTrue(p.startswith("/tmp/bags"))
|
||||
|
||||
def test_unique_per_call(self):
|
||||
# Two calls in tight succession may share a second but that's fine;
|
||||
# just ensure the function doesn't crash and returns a string.
|
||||
p1 = self.mod.make_bag_path("/tmp", "t")
|
||||
self.assertIsInstance(p1, str)
|
||||
self.assertTrue(len(p1) > 0)
|
||||
|
||||
|
||||
class TestParseTopics(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_empty_string_returns_empty_list(self):
|
||||
self.assertEqual(self.mod.parse_topics(""), [])
|
||||
|
||||
def test_whitespace_only_returns_empty(self):
|
||||
self.assertEqual(self.mod.parse_topics(" "), [])
|
||||
|
||||
def test_single_topic(self):
|
||||
self.assertEqual(self.mod.parse_topics("/topic/foo"), ["/topic/foo"])
|
||||
|
||||
def test_multiple_topics(self):
|
||||
r = self.mod.parse_topics("/a,/b,/c")
|
||||
self.assertEqual(r, ["/a", "/b", "/c"])
|
||||
|
||||
def test_strips_whitespace(self):
|
||||
r = self.mod.parse_topics(" /a , /b ")
|
||||
self.assertEqual(r, ["/a", "/b"])
|
||||
|
||||
def test_ignores_empty_segments(self):
|
||||
r = self.mod.parse_topics("/a,,/b")
|
||||
self.assertEqual(r, ["/a", "/b"])
|
||||
|
||||
|
||||
class TestStatusConstants(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_idle(self): self.assertEqual(self.mod.STATUS_IDLE, "idle")
|
||||
def test_recording(self): self.assertEqual(self.mod.STATUS_RECORDING, "recording")
|
||||
def test_stopping(self): self.assertEqual(self.mod.STATUS_STOPPING, "stopping")
|
||||
def test_error(self): self.assertEqual(self.mod.STATUS_ERROR, "error")
|
||||
|
||||
|
||||
# ── Tests: compute_recording_transition ──────────────────────────────────────
|
||||
|
||||
class TestComputeRecordingTransition(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def _tr(self, state, trigger=None, proc_running=True,
|
||||
now=100.0, rec_t=0.0, stop_t=0.0,
|
||||
auto_stop=60.0, stop_tmo=5.0):
|
||||
return self.mod.compute_recording_transition(
|
||||
state, trigger, proc_running, now,
|
||||
rec_t, stop_t, auto_stop, stop_tmo,
|
||||
)
|
||||
|
||||
# ── IDLE ──────────────────────────────────────────────────────────
|
||||
|
||||
def test_idle_no_trigger_stays_idle(self):
|
||||
s, fk = self._tr("idle", trigger=None)
|
||||
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
||||
|
||||
def test_idle_false_trigger_stays_idle(self):
|
||||
s, fk = self._tr("idle", trigger=False)
|
||||
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
||||
|
||||
def test_idle_true_trigger_starts_recording(self):
|
||||
s, fk = self._tr("idle", trigger=True)
|
||||
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
||||
|
||||
# ── RECORDING ─────────────────────────────────────────────────────
|
||||
|
||||
def test_recording_stable_no_change(self):
|
||||
s, fk = self._tr("recording", proc_running=True)
|
||||
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
||||
|
||||
def test_recording_false_trigger_stops(self):
|
||||
s, fk = self._tr("recording", trigger=False, proc_running=True)
|
||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||
|
||||
def test_recording_proc_dies_error(self):
|
||||
s, fk = self._tr("recording", proc_running=False)
|
||||
self.assertEqual(s, "error"); self.assertFalse(fk)
|
||||
|
||||
def test_recording_auto_stop_fires(self):
|
||||
# started at t=40, now=t=101 → 61 s elapsed > auto_stop=60
|
||||
s, fk = self._tr("recording", proc_running=True,
|
||||
now=101.0, rec_t=40.0, auto_stop=60.0)
|
||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||
|
||||
def test_recording_auto_stop_not_yet(self):
|
||||
# started at t=50, now=100 → 50 s < 60 s
|
||||
s, fk = self._tr("recording", proc_running=True,
|
||||
now=100.0, rec_t=50.0, auto_stop=60.0)
|
||||
self.assertEqual(s, "recording")
|
||||
|
||||
def test_recording_auto_stop_at_exactly_timeout(self):
|
||||
s, fk = self._tr("recording", proc_running=True,
|
||||
now=110.0, rec_t=50.0, auto_stop=60.0)
|
||||
self.assertEqual(s, "stopping")
|
||||
|
||||
def test_recording_auto_stop_disabled(self):
|
||||
# auto_stop_s=0 → never auto-stops
|
||||
s, fk = self._tr("recording", proc_running=True,
|
||||
now=9999.0, rec_t=0.0, auto_stop=0.0)
|
||||
self.assertEqual(s, "recording")
|
||||
|
||||
# ── STOPPING ──────────────────────────────────────────────────────
|
||||
|
||||
def test_stopping_proc_running_stays(self):
|
||||
s, fk = self._tr("stopping", proc_running=True)
|
||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||
|
||||
def test_stopping_proc_exits_idle(self):
|
||||
s, fk = self._tr("stopping", proc_running=False)
|
||||
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
||||
|
||||
def test_stopping_force_kill_after_timeout(self):
|
||||
# entered stopping at t=95, now=101 → 6 s > stop_tmo=5
|
||||
s, fk = self._tr("stopping", proc_running=True,
|
||||
now=101.0, stop_t=95.0, stop_tmo=5.0)
|
||||
self.assertEqual(s, "idle"); self.assertTrue(fk)
|
||||
|
||||
def test_stopping_not_yet_force_kill(self):
|
||||
# entered at t=98, now=100 → 2 s < 5 s
|
||||
s, fk = self._tr("stopping", proc_running=True,
|
||||
now=100.0, stop_t=98.0, stop_tmo=5.0)
|
||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||
|
||||
def test_stopping_timeout_disabled(self):
|
||||
# stop_tmo=0 → never force-kills
|
||||
s, fk = self._tr("stopping", proc_running=True,
|
||||
now=9999.0, stop_t=0.0, stop_tmo=0.0)
|
||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||
|
||||
def test_stopping_force_kill_exactly_at_timeout(self):
|
||||
s, fk = self._tr("stopping", proc_running=True,
|
||||
now=100.0, stop_t=95.0, stop_tmo=5.0)
|
||||
self.assertEqual(s, "idle"); self.assertTrue(fk)
|
||||
|
||||
# ── ERROR ─────────────────────────────────────────────────────────
|
||||
|
||||
def test_error_stays_without_trigger(self):
|
||||
s, fk = self._tr("error", trigger=None)
|
||||
self.assertEqual(s, "error"); self.assertFalse(fk)
|
||||
|
||||
def test_error_false_trigger_stays(self):
|
||||
s, fk = self._tr("error", trigger=False)
|
||||
self.assertEqual(s, "error")
|
||||
|
||||
def test_error_true_trigger_retries(self):
|
||||
s, fk = self._tr("error", trigger=True)
|
||||
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
||||
|
||||
# ── Return shape ──────────────────────────────────────────────────
|
||||
|
||||
def test_returns_tuple_of_two(self):
|
||||
result = self._tr("idle")
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
def test_force_kill_is_bool(self):
|
||||
_, fk = self._tr("idle")
|
||||
self.assertIsInstance(fk, bool)
|
||||
|
||||
|
||||
# ── Tests: node init ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestNodeInit(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_instantiates(self):
|
||||
self.assertIsNotNone(_make_node(self.mod))
|
||||
|
||||
def test_initial_state_idle(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertEqual(node.state, "idle")
|
||||
|
||||
def test_publishes_initial_idle(self):
|
||||
node = _make_node(self.mod)
|
||||
pub = _pub(node)
|
||||
self.assertEqual(pub.msgs[0].data, "idle")
|
||||
|
||||
def test_publisher_registered(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertIn("/saltybot/recording_status", node._pubs)
|
||||
|
||||
def test_subscriber_registered(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertIn("/saltybot/record_trigger", node._subs)
|
||||
|
||||
def test_timer_registered(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertGreater(len(node._timers), 0)
|
||||
|
||||
def test_custom_topics(self):
|
||||
node = _make_node(self.mod,
|
||||
trigger_topic="/my/trigger",
|
||||
status_topic="/my/status")
|
||||
self.assertIn("/my/trigger", node._subs)
|
||||
self.assertIn("/my/status", node._pubs)
|
||||
|
||||
def test_topics_parsed_correctly(self):
|
||||
node = _make_node(self.mod, topics="/a,/b,/c")
|
||||
self.assertEqual(node._topics, ["/a", "/b", "/c"])
|
||||
|
||||
def test_empty_topics_means_all(self):
|
||||
node = _make_node(self.mod, topics="")
|
||||
self.assertEqual(node._topics, [])
|
||||
|
||||
def test_auto_stop_s_stored(self):
|
||||
node = _make_node(self.mod, auto_stop_s=30.0)
|
||||
self.assertAlmostEqual(node._auto_stop_s, 30.0)
|
||||
|
||||
def test_current_bag_empty_initially(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertEqual(node.current_bag, "")
|
||||
|
||||
|
||||
# ── Tests: _on_trigger ────────────────────────────────────────────────────────
|
||||
|
||||
class TestOnTrigger(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_stores_true_trigger(self):
|
||||
node = _make_node(self.mod)
|
||||
_trigger(node, True)
|
||||
with node._lock:
|
||||
self.assertTrue(node._trigger)
|
||||
|
||||
def test_stores_false_trigger(self):
|
||||
node = _make_node(self.mod)
|
||||
_trigger(node, False)
|
||||
with node._lock:
|
||||
self.assertFalse(node._trigger)
|
||||
|
||||
def test_trigger_consumed_after_poll(self):
|
||||
rec = _MockRecorder()
|
||||
node = _make_node(self.mod, recorder=rec)
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
with node._lock:
|
||||
self.assertIsNone(node._trigger)
|
||||
|
||||
|
||||
# ── Tests: poll loop — full state machine ────────────────────────────────────
|
||||
|
||||
class TestPollCb(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def _node(self, **kwargs):
|
||||
rec = _MockRecorder()
|
||||
node = _make_node(self.mod, recorder=rec, **kwargs)
|
||||
return node, rec
|
||||
|
||||
def test_true_trigger_starts_recording(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
self.assertEqual(node.state, "recording")
|
||||
self.assertIn("start", rec.call_types())
|
||||
|
||||
def test_recording_publishes_status(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
self.assertEqual(_pub(node).msgs[-1].data, "recording")
|
||||
|
||||
def test_false_trigger_stops_recording(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb() # → recording
|
||||
_trigger(node, False); node._poll_cb() # → stopping
|
||||
self.assertEqual(node.state, "stopping")
|
||||
self.assertIn("stop", rec.call_types())
|
||||
|
||||
def test_stopping_publishes_status(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb()
|
||||
_trigger(node, False); node._poll_cb()
|
||||
self.assertEqual(_pub(node).msgs[-1].data, "stopping")
|
||||
|
||||
def test_after_stop_proc_exit_idle(self):
|
||||
"""Once the mock recorder stops, next poll resolves to idle."""
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb() # recording
|
||||
_trigger(node, False); node._poll_cb() # stopping (rec.stop() sets running=False)
|
||||
node._poll_cb() # proc not running → idle
|
||||
self.assertEqual(node.state, "idle")
|
||||
|
||||
def test_idle_publishes_after_stop(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb()
|
||||
_trigger(node, False); node._poll_cb()
|
||||
node._poll_cb()
|
||||
self.assertEqual(_pub(node).msgs[-1].data, "idle")
|
||||
|
||||
def test_auto_stop_triggers_stopping(self):
|
||||
node, rec = self._node(auto_stop_s=1.0)
|
||||
_trigger(node, True)
|
||||
node._poll_cb() # → recording
|
||||
# Back-date start time so auto-stop fires
|
||||
with node._lock:
|
||||
node._record_start_t = time.monotonic() - 10.0
|
||||
node._poll_cb() # → stopping
|
||||
self.assertEqual(node.state, "stopping")
|
||||
|
||||
def test_auto_stop_disabled(self):
|
||||
node, rec = self._node(auto_stop_s=0.0)
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
with node._lock:
|
||||
node._record_start_t = time.monotonic() - 9999.0
|
||||
node._poll_cb()
|
||||
# Should still be recording (auto-stop disabled)
|
||||
self.assertEqual(node.state, "recording")
|
||||
|
||||
def test_proc_dies_error(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb() # → recording
|
||||
rec._running = False # simulate unexpected exit
|
||||
node._poll_cb()
|
||||
self.assertEqual(node.state, "error")
|
||||
|
||||
def test_error_publishes_status(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb()
|
||||
rec._running = False
|
||||
node._poll_cb()
|
||||
self.assertEqual(_pub(node).msgs[-1].data, "error")
|
||||
|
||||
def test_error_retries_on_true_trigger(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb() # recording
|
||||
rec._running = False
|
||||
node._poll_cb() # error
|
||||
_trigger(node, True); node._poll_cb() # retry → recording
|
||||
self.assertEqual(node.state, "recording")
|
||||
|
||||
def test_start_failure_enters_error(self):
|
||||
rec = _MockRecorder(start_succeeds=False)
|
||||
node = _make_node(self.mod, recorder=rec)
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
self.assertEqual(node.state, "error")
|
||||
|
||||
def test_force_kill_on_stop_timeout(self):
|
||||
"""Stubborn process that ignores SIGINT → force-killed after timeout."""
|
||||
class _StubbornRecorder(_MockRecorder):
|
||||
def stop(self):
|
||||
self.calls.append(("stop",))
|
||||
# Don't set _running = False — simulates process ignoring SIGINT
|
||||
|
||||
stubborn = _StubbornRecorder()
|
||||
node = _make_node(self.mod, recorder=stubborn, stop_timeout_s=2.0)
|
||||
_trigger(node, True); node._poll_cb() # → recording
|
||||
_trigger(node, False); node._poll_cb() # → stopping (process stays alive)
|
||||
self.assertEqual(node.state, "stopping")
|
||||
# Expire the stop timeout
|
||||
with node._lock:
|
||||
node._stop_start_t = time.monotonic() - 10.0
|
||||
node._poll_cb() # → force kill → idle
|
||||
self.assertIn("kill", stubborn.call_types())
|
||||
self.assertEqual(node.state, "idle")
|
||||
|
||||
def test_bag_path_set_when_recording(self):
|
||||
node, rec = self._node(bag_prefix="mytest")
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
self.assertIn("mytest", node.current_bag)
|
||||
|
||||
def test_bag_path_cleared_after_idle(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb()
|
||||
_trigger(node, False); node._poll_cb()
|
||||
node._poll_cb()
|
||||
self.assertEqual(node.current_bag, "")
|
||||
|
||||
def test_topics_passed_to_recorder(self):
|
||||
rec = _MockRecorder()
|
||||
node = _make_node(self.mod, recorder=rec, topics="/a,/b")
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
start_calls = [c for c in rec.calls if c[0] == "start"]
|
||||
self.assertEqual(len(start_calls), 1)
|
||||
self.assertEqual(start_calls[0][1], ["/a", "/b"])
|
||||
|
||||
def test_empty_topics_passes_empty_list(self):
|
||||
rec = _MockRecorder()
|
||||
node = _make_node(self.mod, recorder=rec, topics="")
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
start_calls = [c for c in rec.calls if c[0] == "start"]
|
||||
self.assertEqual(start_calls[0][1], [])
|
||||
|
||||
def test_recorder_reset_on_idle(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb()
|
||||
_trigger(node, False); node._poll_cb()
|
||||
node._poll_cb() # idle
|
||||
self.assertIn("reset", rec.call_types())
|
||||
|
||||
def test_full_lifecycle_status_sequence(self):
|
||||
"""idle → recording → stopping → idle."""
|
||||
node, rec = self._node()
|
||||
pub = _pub(node)
|
||||
|
||||
_trigger(node, True); node._poll_cb()
|
||||
_trigger(node, False); node._poll_cb()
|
||||
node._poll_cb()
|
||||
|
||||
statuses = [m.data for m in pub.msgs]
|
||||
self.assertIn("idle", statuses)
|
||||
self.assertIn("recording", statuses)
|
||||
self.assertIn("stopping", statuses)
|
||||
|
||||
def test_logging_on_start(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True)
|
||||
node._poll_cb()
|
||||
infos = [m for lvl, m in node._logs if lvl == "INFO"]
|
||||
self.assertTrue(any("recording" in m.lower() for m in infos))
|
||||
|
||||
def test_logging_on_stop(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb()
|
||||
_trigger(node, False); node._poll_cb()
|
||||
infos = [m for lvl, m in node._logs if lvl == "INFO"]
|
||||
self.assertTrue(any("stop" in m.lower() for m in infos))
|
||||
|
||||
def test_logging_on_error(self):
|
||||
node, rec = self._node()
|
||||
_trigger(node, True); node._poll_cb()
|
||||
rec._running = False
|
||||
node._poll_cb()
|
||||
errors = [m for lvl, m in node._logs if lvl == "ERROR"]
|
||||
self.assertTrue(len(errors) > 0)
|
||||
|
||||
|
||||
# ── Tests: source content ─────────────────────────────────────────────────────
|
||||
|
||||
class TestNodeSrc(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
with open(_SRC) as f: cls.src = f.read()
|
||||
|
||||
def test_issue_tag(self): self.assertIn("#332", self.src)
|
||||
def test_trigger_topic(self): self.assertIn("/saltybot/record_trigger", self.src)
|
||||
def test_status_topic(self): self.assertIn("/saltybot/recording_status", self.src)
|
||||
def test_status_idle(self): self.assertIn('"idle"', self.src)
|
||||
def test_status_recording(self): self.assertIn('"recording"', self.src)
|
||||
def test_status_stopping(self): self.assertIn('"stopping"', self.src)
|
||||
def test_status_error(self): self.assertIn('"error"', self.src)
|
||||
def test_compute_transition_fn(self): self.assertIn("compute_recording_transition", self.src)
|
||||
def test_bag_recorder_process(self): self.assertIn("BagRecorderProcess", self.src)
|
||||
def test_make_bag_path(self): self.assertIn("make_bag_path", self.src)
|
||||
def test_parse_topics(self): self.assertIn("parse_topics", self.src)
|
||||
def test_auto_stop_param(self): self.assertIn("auto_stop_s", self.src)
|
||||
def test_stop_timeout_param(self): self.assertIn("stop_timeout_s", self.src)
|
||||
def test_compression_param(self): self.assertIn("compression", self.src)
|
||||
def test_subprocess_used(self): self.assertIn("subprocess", self.src)
|
||||
def test_sigint_used(self): self.assertIn("SIGINT", self.src)
|
||||
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
|
||||
def test_recorder_injectable(self): self.assertIn("_recorder", self.src)
|
||||
def test_main_defined(self): self.assertIn("def main", self.src)
|
||||
def test_bool_subscription(self): self.assertIn("Bool", self.src)
|
||||
def test_string_publication(self): self.assertIn("String", self.src)
|
||||
|
||||
|
||||
# ── Tests: config / launch / setup ────────────────────────────────────────────
|
||||
|
||||
class TestConfig(unittest.TestCase):
|
||||
_CONFIG = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/config/rosbag_recorder_params.yaml"
|
||||
)
|
||||
_LAUNCH = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/launch/rosbag_recorder.launch.py"
|
||||
)
|
||||
_SETUP = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/setup.py"
|
||||
)
|
||||
|
||||
def test_config_exists(self):
|
||||
import os; self.assertTrue(os.path.exists(self._CONFIG))
|
||||
|
||||
def test_config_auto_stop(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("auto_stop_s", c)
|
||||
|
||||
def test_config_bag_dir(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("bag_dir", c)
|
||||
|
||||
def test_config_topics(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("topics", c)
|
||||
|
||||
def test_config_compression(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("compression", c)
|
||||
|
||||
def test_config_stop_timeout(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("stop_timeout_s", c)
|
||||
|
||||
def test_launch_exists(self):
|
||||
import os; self.assertTrue(os.path.exists(self._LAUNCH))
|
||||
|
||||
def test_launch_auto_stop_arg(self):
|
||||
with open(self._LAUNCH) as f: c = f.read()
|
||||
self.assertIn("auto_stop_s", c)
|
||||
|
||||
def test_launch_topics_arg(self):
|
||||
with open(self._LAUNCH) as f: c = f.read()
|
||||
self.assertIn("topics", c)
|
||||
|
||||
def test_entry_point_in_setup(self):
|
||||
with open(self._SETUP) as f: c = f.read()
|
||||
self.assertIn("rosbag_recorder_node", c)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Loading…
x
Reference in New Issue
Block a user