Compare commits
1 Commits
813d6f2529
...
e1813f734f
| Author | SHA1 | Date | |
|---|---|---|---|
| e1813f734f |
@ -0,0 +1,20 @@
|
|||||||
|
rosbag_recorder_node:
|
||||||
|
ros__parameters:
|
||||||
|
trigger_topic: "/saltybot/record_trigger"
|
||||||
|
status_topic: "/saltybot/recording_status"
|
||||||
|
|
||||||
|
# Comma-separated topic list to record.
|
||||||
|
# Empty string = record all topics (ros2 bag record --all).
|
||||||
|
# Example: "/saltybot/camera_status,/saltybot/wake_word_detected,/cmd_vel"
|
||||||
|
topics: ""
|
||||||
|
|
||||||
|
bag_dir: "/tmp/saltybot_bags" # output directory (created if absent)
|
||||||
|
bag_prefix: "saltybot" # filename prefix; timestamp appended
|
||||||
|
|
||||||
|
auto_stop_s: 60.0 # auto-stop after N seconds; 0 = disabled
|
||||||
|
stop_timeout_s: 5.0 # force-kill if subprocess won't stop within N s
|
||||||
|
|
||||||
|
compression: false # enable zstd file-level compression
|
||||||
|
max_bag_size_mb: 0.0 # split bags at this size (MiB); 0 = no limit
|
||||||
|
|
||||||
|
poll_rate: 2.0 # state-machine check frequency (Hz)
|
||||||
@ -0,0 +1,47 @@
|
|||||||
|
"""rosbag_recorder.launch.py — Launch trigger-based bag recorder (Issue #332).
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_social rosbag_recorder.launch.py
|
||||||
|
ros2 launch saltybot_social rosbag_recorder.launch.py auto_stop_s:=120.0
|
||||||
|
ros2 launch saltybot_social rosbag_recorder.launch.py \\
|
||||||
|
topics:="/saltybot/camera_status,/cmd_vel" bag_dir:=/data/bags
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg = get_package_share_directory("saltybot_social")
|
||||||
|
cfg = os.path.join(pkg, "config", "rosbag_recorder_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("topics", default_value="",
|
||||||
|
description="Comma-separated topics (empty=all)"),
|
||||||
|
DeclareLaunchArgument("bag_dir", default_value="/tmp/saltybot_bags",
|
||||||
|
description="Output directory for bag files"),
|
||||||
|
DeclareLaunchArgument("auto_stop_s", default_value="60.0",
|
||||||
|
description="Auto-stop timeout in seconds (0=off)"),
|
||||||
|
DeclareLaunchArgument("compression", default_value="false",
|
||||||
|
description="Enable zstd compression"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_social",
|
||||||
|
executable="rosbag_recorder_node",
|
||||||
|
name="rosbag_recorder_node",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
cfg,
|
||||||
|
{
|
||||||
|
"topics": LaunchConfiguration("topics"),
|
||||||
|
"bag_dir": LaunchConfiguration("bag_dir"),
|
||||||
|
"auto_stop_s": LaunchConfiguration("auto_stop_s"),
|
||||||
|
"compression": LaunchConfiguration("compression"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,373 @@
|
|||||||
|
"""rosbag_recorder_node.py — Trigger-based ROS2 bag recorder.
|
||||||
|
Issue #332
|
||||||
|
|
||||||
|
Subscribes to /saltybot/record_trigger (Bool). True starts recording;
|
||||||
|
False stops it. Auto-stop fires after ``auto_stop_s`` seconds if still
|
||||||
|
running. Recording is performed by spawning a ``ros2 bag record``
|
||||||
|
subprocess which is sent SIGINT for graceful shutdown and SIGKILL if it
|
||||||
|
does not exit within ``stop_timeout_s``.
|
||||||
|
|
||||||
|
Status values
|
||||||
|
─────────────
|
||||||
|
"idle" — not recording
|
||||||
|
"recording" — subprocess active, writing to bag file
|
||||||
|
"stopping" — SIGINT sent, waiting for subprocess to exit
|
||||||
|
"error" — subprocess died unexpectedly; new trigger retries
|
||||||
|
|
||||||
|
Subscriptions
|
||||||
|
─────────────
|
||||||
|
/saltybot/record_trigger std_msgs/Bool — True = start, False = stop
|
||||||
|
|
||||||
|
Publications
|
||||||
|
────────────
|
||||||
|
/saltybot/recording_status std_msgs/String — status value (see above)
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
──────────
|
||||||
|
trigger_topic (str, "/saltybot/record_trigger")
|
||||||
|
status_topic (str, "/saltybot/recording_status")
|
||||||
|
topics (str, "") comma-separated topic list;
|
||||||
|
empty string → record all topics (-a)
|
||||||
|
bag_dir (str, "/tmp/saltybot_bags")
|
||||||
|
bag_prefix (str, "saltybot")
|
||||||
|
auto_stop_s (float, 60.0) 0 = no auto-stop
|
||||||
|
stop_timeout_s (float, 5.0) force-kill after this many seconds
|
||||||
|
compression (bool, False) enable zstd file compression
|
||||||
|
max_bag_size_mb (float, 0.0) 0 = unlimited
|
||||||
|
poll_rate (float, 2.0) state-machine check frequency (Hz)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile
|
||||||
|
from std_msgs.msg import Bool, String
|
||||||
|
|
||||||
|
|
||||||
|
# ── Status constants ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
STATUS_IDLE = "idle"
|
||||||
|
STATUS_RECORDING = "recording"
|
||||||
|
STATUS_STOPPING = "stopping"
|
||||||
|
STATUS_ERROR = "error"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Pure helpers ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def make_bag_path(bag_dir: str, prefix: str) -> str:
|
||||||
|
"""Return a timestamped output path for a new bag (no file created)."""
|
||||||
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
return os.path.join(bag_dir, f"{prefix}_{ts}")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_topics(topics_str: str) -> List[str]:
|
||||||
|
"""Parse a comma-separated topic string into a clean list.
|
||||||
|
|
||||||
|
Returns an empty list when *topics_str* is blank (meaning record-all).
|
||||||
|
"""
|
||||||
|
if not topics_str or not topics_str.strip():
|
||||||
|
return []
|
||||||
|
return [t.strip() for t in topics_str.split(",") if t.strip()]
|
||||||
|
|
||||||
|
|
||||||
|
def compute_recording_transition(
|
||||||
|
state: str,
|
||||||
|
trigger: Optional[bool],
|
||||||
|
proc_running: bool,
|
||||||
|
now: float,
|
||||||
|
record_start_t: float,
|
||||||
|
stop_start_t: float,
|
||||||
|
auto_stop_s: float,
|
||||||
|
stop_timeout_s: float,
|
||||||
|
) -> Tuple[str, bool]:
|
||||||
|
"""Pure state-machine step — no I/O, no ROS.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
state : current status string
|
||||||
|
trigger : latest trigger value (True=start, False=stop, None=none)
|
||||||
|
proc_running : whether the recorder subprocess is alive
|
||||||
|
now : current monotonic time (s)
|
||||||
|
record_start_t : monotonic time recording began (0 if not recording)
|
||||||
|
stop_start_t : monotonic time STOPPING began (0 if not stopping)
|
||||||
|
auto_stop_s : auto-stop after this many seconds (0 = disabled)
|
||||||
|
stop_timeout_s : force-kill if stopping > this long (0 = disabled)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(new_state, force_kill)
|
||||||
|
force_kill=True signals the caller to SIGKILL the process.
|
||||||
|
"""
|
||||||
|
if state == STATUS_IDLE:
|
||||||
|
if trigger is True:
|
||||||
|
return STATUS_RECORDING, False
|
||||||
|
return STATUS_IDLE, False
|
||||||
|
|
||||||
|
if state == STATUS_RECORDING:
|
||||||
|
if not proc_running:
|
||||||
|
return STATUS_ERROR, False
|
||||||
|
if trigger is False:
|
||||||
|
return STATUS_STOPPING, False
|
||||||
|
if (auto_stop_s > 0 and record_start_t > 0
|
||||||
|
and (now - record_start_t) >= auto_stop_s):
|
||||||
|
return STATUS_STOPPING, False
|
||||||
|
return STATUS_RECORDING, False
|
||||||
|
|
||||||
|
if state == STATUS_STOPPING:
|
||||||
|
if not proc_running:
|
||||||
|
return STATUS_IDLE, False
|
||||||
|
if (stop_timeout_s > 0 and stop_start_t > 0
|
||||||
|
and (now - stop_start_t) >= stop_timeout_s):
|
||||||
|
return STATUS_IDLE, True # force-kill
|
||||||
|
return STATUS_STOPPING, False
|
||||||
|
|
||||||
|
# STATUS_ERROR
|
||||||
|
if trigger is True:
|
||||||
|
return STATUS_RECORDING, False
|
||||||
|
return STATUS_ERROR, False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Subprocess wrapper ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class BagRecorderProcess:
|
||||||
|
"""Thin wrapper around a ``ros2 bag record`` subprocess."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._proc: Optional[subprocess.Popen] = None
|
||||||
|
|
||||||
|
def start(self, topics: List[str], output_path: str,
|
||||||
|
compression: bool = False,
|
||||||
|
max_size_mb: float = 0.0) -> bool:
|
||||||
|
"""Launch the recorder. Returns False if already running or on error."""
|
||||||
|
if self.is_running():
|
||||||
|
return False
|
||||||
|
|
||||||
|
cmd = ["ros2", "bag", "record", "--output", output_path]
|
||||||
|
if topics:
|
||||||
|
cmd += topics
|
||||||
|
else:
|
||||||
|
cmd += ["--all"]
|
||||||
|
if compression:
|
||||||
|
cmd += ["--compression-mode", "file",
|
||||||
|
"--compression-format", "zstd"]
|
||||||
|
if max_size_mb > 0:
|
||||||
|
cmd += ["--max-bag-size",
|
||||||
|
str(int(max_size_mb * 1024 * 1024))]
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._proc = subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.DEVNULL,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
preexec_fn=os.setsid,
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
self._proc = None
|
||||||
|
return False
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""Send SIGINT to the process group for graceful shutdown."""
|
||||||
|
if self._proc is not None and self._proc.poll() is None:
|
||||||
|
try:
|
||||||
|
os.killpg(os.getpgid(self._proc.pid), signal.SIGINT)
|
||||||
|
except (ProcessLookupError, PermissionError, OSError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def kill(self) -> None:
|
||||||
|
"""Send SIGKILL to the process group for forced shutdown."""
|
||||||
|
if self._proc is not None:
|
||||||
|
try:
|
||||||
|
os.killpg(os.getpgid(self._proc.pid), signal.SIGKILL)
|
||||||
|
except (ProcessLookupError, PermissionError, OSError):
|
||||||
|
pass
|
||||||
|
self._proc = None
|
||||||
|
|
||||||
|
def is_running(self) -> bool:
|
||||||
|
return self._proc is not None and self._proc.poll() is None
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Clear internal proc reference (call after graceful exit)."""
|
||||||
|
self._proc = None
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 node ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class RosbagRecorderNode(Node):
|
||||||
|
"""Trigger-based ROS bag recorder with auto-stop and status reporting."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("rosbag_recorder_node")
|
||||||
|
|
||||||
|
self.declare_parameter("trigger_topic", "/saltybot/record_trigger")
|
||||||
|
self.declare_parameter("status_topic", "/saltybot/recording_status")
|
||||||
|
self.declare_parameter("topics", "")
|
||||||
|
self.declare_parameter("bag_dir", "/tmp/saltybot_bags")
|
||||||
|
self.declare_parameter("bag_prefix", "saltybot")
|
||||||
|
self.declare_parameter("auto_stop_s", 60.0)
|
||||||
|
self.declare_parameter("stop_timeout_s", 5.0)
|
||||||
|
self.declare_parameter("compression", False)
|
||||||
|
self.declare_parameter("max_bag_size_mb", 0.0)
|
||||||
|
self.declare_parameter("poll_rate", 2.0)
|
||||||
|
|
||||||
|
trigger_topic = self.get_parameter("trigger_topic").value
|
||||||
|
status_topic = self.get_parameter("status_topic").value
|
||||||
|
topics_str = self.get_parameter("topics").value
|
||||||
|
self._bag_dir = str(self.get_parameter("bag_dir").value)
|
||||||
|
self._bag_prefix = str(self.get_parameter("bag_prefix").value)
|
||||||
|
self._auto_stop_s = float(self.get_parameter("auto_stop_s").value)
|
||||||
|
self._stop_tmo_s = float(self.get_parameter("stop_timeout_s").value)
|
||||||
|
self._compression = bool(self.get_parameter("compression").value)
|
||||||
|
self._max_mb = float(self.get_parameter("max_bag_size_mb").value)
|
||||||
|
poll_rate = float(self.get_parameter("poll_rate").value)
|
||||||
|
|
||||||
|
self._topics: List[str] = parse_topics(str(topics_str))
|
||||||
|
|
||||||
|
# Recorder process — injectable for tests
|
||||||
|
self._recorder: BagRecorderProcess = BagRecorderProcess()
|
||||||
|
|
||||||
|
# State
|
||||||
|
self._state = STATUS_IDLE
|
||||||
|
self._trigger: Optional[bool] = None
|
||||||
|
self._record_start_t: float = 0.0
|
||||||
|
self._stop_start_t: float = 0.0
|
||||||
|
self._current_bag: str = ""
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
qos = QoSProfile(depth=10)
|
||||||
|
self._status_pub = self.create_publisher(String, status_topic, qos)
|
||||||
|
self._trigger_sub = self.create_subscription(
|
||||||
|
Bool, trigger_topic, self._on_trigger, qos
|
||||||
|
)
|
||||||
|
self._timer = self.create_timer(1.0 / poll_rate, self._poll_cb)
|
||||||
|
|
||||||
|
# Publish initial state
|
||||||
|
self._publish(STATUS_IDLE)
|
||||||
|
|
||||||
|
topic_desc = ",".join(self._topics) if self._topics else "<all>"
|
||||||
|
self.get_logger().info(
|
||||||
|
f"RosbagRecorderNode ready — topics={topic_desc}, "
|
||||||
|
f"bag_dir={self._bag_dir}, auto_stop={self._auto_stop_s}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscription ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_trigger(self, msg) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._trigger = bool(msg.data)
|
||||||
|
|
||||||
|
# ── Poll / state machine ────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _poll_cb(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
trigger = self._trigger
|
||||||
|
self._trigger = None # consume
|
||||||
|
state = self._state
|
||||||
|
rec_t = self._record_start_t
|
||||||
|
stop_t = self._stop_start_t
|
||||||
|
|
||||||
|
proc_running = self._recorder.is_running()
|
||||||
|
|
||||||
|
new_state, force_kill = compute_recording_transition(
|
||||||
|
state, trigger, proc_running, now,
|
||||||
|
rec_t, stop_t, self._auto_stop_s, self._stop_tmo_s,
|
||||||
|
)
|
||||||
|
|
||||||
|
if force_kill:
|
||||||
|
self._recorder.kill()
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"RosbagRecorder: force-killed (stop_timeout={self._stop_tmo_s}s)"
|
||||||
|
)
|
||||||
|
|
||||||
|
if new_state != state:
|
||||||
|
self._enter_state(new_state, now)
|
||||||
|
|
||||||
|
def _enter_state(self, new_state: str, now: float) -> None:
|
||||||
|
if new_state == STATUS_RECORDING:
|
||||||
|
bag_path = make_bag_path(self._bag_dir, self._bag_prefix)
|
||||||
|
started = self._recorder.start(
|
||||||
|
self._topics, bag_path,
|
||||||
|
compression=self._compression,
|
||||||
|
max_size_mb=self._max_mb,
|
||||||
|
)
|
||||||
|
if not started:
|
||||||
|
new_state = STATUS_ERROR
|
||||||
|
self.get_logger().error(
|
||||||
|
"RosbagRecorder: failed to start recorder subprocess"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
with self._lock:
|
||||||
|
self._record_start_t = now
|
||||||
|
self._current_bag = bag_path
|
||||||
|
self.get_logger().info(
|
||||||
|
f"RosbagRecorder: recording started → {bag_path}"
|
||||||
|
)
|
||||||
|
|
||||||
|
elif new_state == STATUS_STOPPING:
|
||||||
|
self._recorder.stop()
|
||||||
|
with self._lock:
|
||||||
|
self._stop_start_t = now
|
||||||
|
self.get_logger().info("RosbagRecorder: stopping (SIGINT sent)")
|
||||||
|
|
||||||
|
elif new_state == STATUS_IDLE:
|
||||||
|
bag = self._current_bag
|
||||||
|
with self._lock:
|
||||||
|
self._record_start_t = 0.0
|
||||||
|
self._stop_start_t = 0.0
|
||||||
|
self._current_bag = ""
|
||||||
|
self._recorder.reset()
|
||||||
|
self.get_logger().info(
|
||||||
|
f"RosbagRecorder: recording complete → {bag}"
|
||||||
|
)
|
||||||
|
|
||||||
|
elif new_state == STATUS_ERROR:
|
||||||
|
self.get_logger().error(
|
||||||
|
"RosbagRecorder: subprocess exited unexpectedly"
|
||||||
|
)
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._state = new_state
|
||||||
|
|
||||||
|
self._publish(new_state)
|
||||||
|
|
||||||
|
# ── Publish ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish(self, status: str) -> None:
|
||||||
|
msg = String()
|
||||||
|
msg.data = status
|
||||||
|
self._status_pub.publish(msg)
|
||||||
|
|
||||||
|
# ── Public accessors ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self) -> str:
|
||||||
|
with self._lock:
|
||||||
|
return self._state
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_bag(self) -> str:
|
||||||
|
with self._lock:
|
||||||
|
return self._current_bag
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = RosbagRecorderNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
@ -61,6 +61,8 @@ setup(
|
|||||||
'wake_word_node = saltybot_social.wake_word_node:main',
|
'wake_word_node = saltybot_social.wake_word_node:main',
|
||||||
# USB camera hot-plug monitor (Issue #320)
|
# USB camera hot-plug monitor (Issue #320)
|
||||||
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
|
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
|
||||||
|
# Trigger-based ROS2 bag recorder (Issue #332)
|
||||||
|
'rosbag_recorder_node = saltybot_social.rosbag_recorder_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
719
jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py
Normal file
719
jetson/ros2_ws/src/saltybot_social/test/test_rosbag_recorder.py
Normal file
@ -0,0 +1,719 @@
|
|||||||
|
"""test_rosbag_recorder.py — Offline tests for rosbag_recorder_node (Issue #332).
|
||||||
|
|
||||||
|
Stubs out rclpy and ROS message types.
|
||||||
|
BagRecorderProcess is replaced with MockRecorder — no subprocesses are spawned.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _make_ros_stubs():
|
||||||
|
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
|
||||||
|
"std_msgs", "std_msgs.msg"):
|
||||||
|
if mod_name not in sys.modules:
|
||||||
|
sys.modules[mod_name] = types.ModuleType(mod_name)
|
||||||
|
|
||||||
|
class _Node:
|
||||||
|
def __init__(self, name="node"):
|
||||||
|
self._name = name
|
||||||
|
if not hasattr(self, "_params"):
|
||||||
|
self._params = {}
|
||||||
|
self._pubs = {}
|
||||||
|
self._subs = {}
|
||||||
|
self._logs = []
|
||||||
|
self._timers = []
|
||||||
|
|
||||||
|
def declare_parameter(self, name, default):
|
||||||
|
if name not in self._params:
|
||||||
|
self._params[name] = default
|
||||||
|
|
||||||
|
def get_parameter(self, name):
|
||||||
|
class _P:
|
||||||
|
def __init__(self, v): self.value = v
|
||||||
|
return _P(self._params.get(name))
|
||||||
|
|
||||||
|
def create_publisher(self, msg_type, topic, qos):
|
||||||
|
pub = _FakePub()
|
||||||
|
self._pubs[topic] = pub
|
||||||
|
return pub
|
||||||
|
|
||||||
|
def create_subscription(self, msg_type, topic, cb, qos):
|
||||||
|
self._subs[topic] = cb
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def create_timer(self, period, cb):
|
||||||
|
self._timers.append(cb)
|
||||||
|
return object()
|
||||||
|
|
||||||
|
def get_logger(self):
|
||||||
|
node = self
|
||||||
|
class _L:
|
||||||
|
def info(self, m): node._logs.append(("INFO", m))
|
||||||
|
def warn(self, m): node._logs.append(("WARN", m))
|
||||||
|
def error(self, m): node._logs.append(("ERROR", m))
|
||||||
|
return _L()
|
||||||
|
|
||||||
|
def destroy_node(self): pass
|
||||||
|
|
||||||
|
class _FakePub:
|
||||||
|
def __init__(self):
|
||||||
|
self.msgs = []
|
||||||
|
def publish(self, msg):
|
||||||
|
self.msgs.append(msg)
|
||||||
|
|
||||||
|
class _QoSProfile:
|
||||||
|
def __init__(self, depth=10): self.depth = depth
|
||||||
|
|
||||||
|
class _Bool:
|
||||||
|
def __init__(self): self.data = False
|
||||||
|
|
||||||
|
class _String:
|
||||||
|
def __init__(self): self.data = ""
|
||||||
|
|
||||||
|
rclpy_mod = sys.modules["rclpy"]
|
||||||
|
rclpy_mod.init = lambda args=None: None
|
||||||
|
rclpy_mod.spin = lambda node: None
|
||||||
|
rclpy_mod.shutdown = lambda: None
|
||||||
|
|
||||||
|
sys.modules["rclpy.node"].Node = _Node
|
||||||
|
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
|
||||||
|
sys.modules["std_msgs.msg"].Bool = _Bool
|
||||||
|
sys.modules["std_msgs.msg"].String = _String
|
||||||
|
|
||||||
|
return _Node, _FakePub, _Bool, _String
|
||||||
|
|
||||||
|
|
||||||
|
_Node, _FakePub, _Bool, _String = _make_ros_stubs()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Module loader ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_SRC = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/saltybot_social/rosbag_recorder_node.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_mod():
|
||||||
|
spec = importlib.util.spec_from_file_location("rosbag_recorder_testmod", _SRC)
|
||||||
|
mod = importlib.util.module_from_spec(spec)
|
||||||
|
spec.loader.exec_module(mod)
|
||||||
|
return mod
|
||||||
|
|
||||||
|
|
||||||
|
class _MockRecorder:
|
||||||
|
"""Injectable replacement for BagRecorderProcess."""
|
||||||
|
|
||||||
|
def __init__(self, start_succeeds: bool = True) -> None:
|
||||||
|
self.start_succeeds = start_succeeds
|
||||||
|
self._running = False
|
||||||
|
self.calls: list = []
|
||||||
|
|
||||||
|
def start(self, topics, output_path, compression=False, max_size_mb=0.0):
|
||||||
|
self.calls.append(("start", list(topics), output_path))
|
||||||
|
if self.start_succeeds:
|
||||||
|
self._running = True
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.calls.append(("stop",))
|
||||||
|
self._running = False # immediately "gone" for deterministic tests
|
||||||
|
|
||||||
|
def kill(self):
|
||||||
|
self.calls.append(("kill",))
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
def is_running(self):
|
||||||
|
return self._running
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
self.calls.append(("reset",))
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
def call_types(self):
|
||||||
|
return [c[0] for c in self.calls]
|
||||||
|
|
||||||
|
|
||||||
|
def _make_node(mod, recorder=None, **kwargs):
|
||||||
|
node = mod.RosbagRecorderNode.__new__(mod.RosbagRecorderNode)
|
||||||
|
defaults = {
|
||||||
|
"trigger_topic": "/saltybot/record_trigger",
|
||||||
|
"status_topic": "/saltybot/recording_status",
|
||||||
|
"topics": "",
|
||||||
|
"bag_dir": "/tmp/test_bags",
|
||||||
|
"bag_prefix": "test",
|
||||||
|
"auto_stop_s": 60.0,
|
||||||
|
"stop_timeout_s": 5.0,
|
||||||
|
"compression": False,
|
||||||
|
"max_bag_size_mb": 0.0,
|
||||||
|
"poll_rate": 2.0,
|
||||||
|
}
|
||||||
|
defaults.update(kwargs)
|
||||||
|
node._params = dict(defaults)
|
||||||
|
mod.RosbagRecorderNode.__init__(node)
|
||||||
|
if recorder is not None:
|
||||||
|
node._recorder = recorder
|
||||||
|
return node
|
||||||
|
|
||||||
|
|
||||||
|
def _trigger(node, value: bool):
|
||||||
|
"""Deliver a Bool trigger message."""
|
||||||
|
msg = _Bool()
|
||||||
|
msg.data = value
|
||||||
|
node._subs["/saltybot/record_trigger"](msg)
|
||||||
|
|
||||||
|
|
||||||
|
def _pub(node):
|
||||||
|
return node._pubs["/saltybot/recording_status"]
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: pure helpers ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestMakeBagPath(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_contains_prefix(self):
|
||||||
|
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
|
||||||
|
self.assertIn("saltybot", p)
|
||||||
|
|
||||||
|
def test_contains_bag_dir(self):
|
||||||
|
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
|
||||||
|
self.assertTrue(p.startswith("/tmp/bags"))
|
||||||
|
|
||||||
|
def test_unique_per_call(self):
|
||||||
|
# Two calls in tight succession may share a second but that's fine;
|
||||||
|
# just ensure the function doesn't crash and returns a string.
|
||||||
|
p1 = self.mod.make_bag_path("/tmp", "t")
|
||||||
|
self.assertIsInstance(p1, str)
|
||||||
|
self.assertTrue(len(p1) > 0)
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseTopics(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_empty_string_returns_empty_list(self):
|
||||||
|
self.assertEqual(self.mod.parse_topics(""), [])
|
||||||
|
|
||||||
|
def test_whitespace_only_returns_empty(self):
|
||||||
|
self.assertEqual(self.mod.parse_topics(" "), [])
|
||||||
|
|
||||||
|
def test_single_topic(self):
|
||||||
|
self.assertEqual(self.mod.parse_topics("/topic/foo"), ["/topic/foo"])
|
||||||
|
|
||||||
|
def test_multiple_topics(self):
|
||||||
|
r = self.mod.parse_topics("/a,/b,/c")
|
||||||
|
self.assertEqual(r, ["/a", "/b", "/c"])
|
||||||
|
|
||||||
|
def test_strips_whitespace(self):
|
||||||
|
r = self.mod.parse_topics(" /a , /b ")
|
||||||
|
self.assertEqual(r, ["/a", "/b"])
|
||||||
|
|
||||||
|
def test_ignores_empty_segments(self):
|
||||||
|
r = self.mod.parse_topics("/a,,/b")
|
||||||
|
self.assertEqual(r, ["/a", "/b"])
|
||||||
|
|
||||||
|
|
||||||
|
class TestStatusConstants(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_idle(self): self.assertEqual(self.mod.STATUS_IDLE, "idle")
|
||||||
|
def test_recording(self): self.assertEqual(self.mod.STATUS_RECORDING, "recording")
|
||||||
|
def test_stopping(self): self.assertEqual(self.mod.STATUS_STOPPING, "stopping")
|
||||||
|
def test_error(self): self.assertEqual(self.mod.STATUS_ERROR, "error")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: compute_recording_transition ──────────────────────────────────────
|
||||||
|
|
||||||
|
class TestComputeRecordingTransition(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def _tr(self, state, trigger=None, proc_running=True,
|
||||||
|
now=100.0, rec_t=0.0, stop_t=0.0,
|
||||||
|
auto_stop=60.0, stop_tmo=5.0):
|
||||||
|
return self.mod.compute_recording_transition(
|
||||||
|
state, trigger, proc_running, now,
|
||||||
|
rec_t, stop_t, auto_stop, stop_tmo,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── IDLE ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_idle_no_trigger_stays_idle(self):
|
||||||
|
s, fk = self._tr("idle", trigger=None)
|
||||||
|
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_idle_false_trigger_stays_idle(self):
|
||||||
|
s, fk = self._tr("idle", trigger=False)
|
||||||
|
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_idle_true_trigger_starts_recording(self):
|
||||||
|
s, fk = self._tr("idle", trigger=True)
|
||||||
|
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
# ── RECORDING ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_recording_stable_no_change(self):
|
||||||
|
s, fk = self._tr("recording", proc_running=True)
|
||||||
|
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_recording_false_trigger_stops(self):
|
||||||
|
s, fk = self._tr("recording", trigger=False, proc_running=True)
|
||||||
|
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_recording_proc_dies_error(self):
|
||||||
|
s, fk = self._tr("recording", proc_running=False)
|
||||||
|
self.assertEqual(s, "error"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_recording_auto_stop_fires(self):
|
||||||
|
# started at t=40, now=t=101 → 61 s elapsed > auto_stop=60
|
||||||
|
s, fk = self._tr("recording", proc_running=True,
|
||||||
|
now=101.0, rec_t=40.0, auto_stop=60.0)
|
||||||
|
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_recording_auto_stop_not_yet(self):
|
||||||
|
# started at t=50, now=100 → 50 s < 60 s
|
||||||
|
s, fk = self._tr("recording", proc_running=True,
|
||||||
|
now=100.0, rec_t=50.0, auto_stop=60.0)
|
||||||
|
self.assertEqual(s, "recording")
|
||||||
|
|
||||||
|
def test_recording_auto_stop_at_exactly_timeout(self):
|
||||||
|
s, fk = self._tr("recording", proc_running=True,
|
||||||
|
now=110.0, rec_t=50.0, auto_stop=60.0)
|
||||||
|
self.assertEqual(s, "stopping")
|
||||||
|
|
||||||
|
def test_recording_auto_stop_disabled(self):
|
||||||
|
# auto_stop_s=0 → never auto-stops
|
||||||
|
s, fk = self._tr("recording", proc_running=True,
|
||||||
|
now=9999.0, rec_t=0.0, auto_stop=0.0)
|
||||||
|
self.assertEqual(s, "recording")
|
||||||
|
|
||||||
|
# ── STOPPING ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_stopping_proc_running_stays(self):
|
||||||
|
s, fk = self._tr("stopping", proc_running=True)
|
||||||
|
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_stopping_proc_exits_idle(self):
|
||||||
|
s, fk = self._tr("stopping", proc_running=False)
|
||||||
|
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_stopping_force_kill_after_timeout(self):
|
||||||
|
# entered stopping at t=95, now=101 → 6 s > stop_tmo=5
|
||||||
|
s, fk = self._tr("stopping", proc_running=True,
|
||||||
|
now=101.0, stop_t=95.0, stop_tmo=5.0)
|
||||||
|
self.assertEqual(s, "idle"); self.assertTrue(fk)
|
||||||
|
|
||||||
|
def test_stopping_not_yet_force_kill(self):
|
||||||
|
# entered at t=98, now=100 → 2 s < 5 s
|
||||||
|
s, fk = self._tr("stopping", proc_running=True,
|
||||||
|
now=100.0, stop_t=98.0, stop_tmo=5.0)
|
||||||
|
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_stopping_timeout_disabled(self):
|
||||||
|
# stop_tmo=0 → never force-kills
|
||||||
|
s, fk = self._tr("stopping", proc_running=True,
|
||||||
|
now=9999.0, stop_t=0.0, stop_tmo=0.0)
|
||||||
|
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_stopping_force_kill_exactly_at_timeout(self):
|
||||||
|
s, fk = self._tr("stopping", proc_running=True,
|
||||||
|
now=100.0, stop_t=95.0, stop_tmo=5.0)
|
||||||
|
self.assertEqual(s, "idle"); self.assertTrue(fk)
|
||||||
|
|
||||||
|
# ── ERROR ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_error_stays_without_trigger(self):
|
||||||
|
s, fk = self._tr("error", trigger=None)
|
||||||
|
self.assertEqual(s, "error"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
def test_error_false_trigger_stays(self):
|
||||||
|
s, fk = self._tr("error", trigger=False)
|
||||||
|
self.assertEqual(s, "error")
|
||||||
|
|
||||||
|
def test_error_true_trigger_retries(self):
|
||||||
|
s, fk = self._tr("error", trigger=True)
|
||||||
|
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
||||||
|
|
||||||
|
# ── Return shape ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_returns_tuple_of_two(self):
|
||||||
|
result = self._tr("idle")
|
||||||
|
self.assertEqual(len(result), 2)
|
||||||
|
|
||||||
|
def test_force_kill_is_bool(self):
|
||||||
|
_, fk = self._tr("idle")
|
||||||
|
self.assertIsInstance(fk, bool)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: node init ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNodeInit(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_instantiates(self):
|
||||||
|
self.assertIsNotNone(_make_node(self.mod))
|
||||||
|
|
||||||
|
def test_initial_state_idle(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertEqual(node.state, "idle")
|
||||||
|
|
||||||
|
def test_publishes_initial_idle(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
pub = _pub(node)
|
||||||
|
self.assertEqual(pub.msgs[0].data, "idle")
|
||||||
|
|
||||||
|
def test_publisher_registered(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/saltybot/recording_status", node._pubs)
|
||||||
|
|
||||||
|
def test_subscriber_registered(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertIn("/saltybot/record_trigger", node._subs)
|
||||||
|
|
||||||
|
def test_timer_registered(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertGreater(len(node._timers), 0)
|
||||||
|
|
||||||
|
def test_custom_topics(self):
|
||||||
|
node = _make_node(self.mod,
|
||||||
|
trigger_topic="/my/trigger",
|
||||||
|
status_topic="/my/status")
|
||||||
|
self.assertIn("/my/trigger", node._subs)
|
||||||
|
self.assertIn("/my/status", node._pubs)
|
||||||
|
|
||||||
|
def test_topics_parsed_correctly(self):
|
||||||
|
node = _make_node(self.mod, topics="/a,/b,/c")
|
||||||
|
self.assertEqual(node._topics, ["/a", "/b", "/c"])
|
||||||
|
|
||||||
|
def test_empty_topics_means_all(self):
|
||||||
|
node = _make_node(self.mod, topics="")
|
||||||
|
self.assertEqual(node._topics, [])
|
||||||
|
|
||||||
|
def test_auto_stop_s_stored(self):
|
||||||
|
node = _make_node(self.mod, auto_stop_s=30.0)
|
||||||
|
self.assertAlmostEqual(node._auto_stop_s, 30.0)
|
||||||
|
|
||||||
|
def test_current_bag_empty_initially(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
self.assertEqual(node.current_bag, "")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: _on_trigger ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestOnTrigger(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def test_stores_true_trigger(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
_trigger(node, True)
|
||||||
|
with node._lock:
|
||||||
|
self.assertTrue(node._trigger)
|
||||||
|
|
||||||
|
def test_stores_false_trigger(self):
|
||||||
|
node = _make_node(self.mod)
|
||||||
|
_trigger(node, False)
|
||||||
|
with node._lock:
|
||||||
|
self.assertFalse(node._trigger)
|
||||||
|
|
||||||
|
def test_trigger_consumed_after_poll(self):
|
||||||
|
rec = _MockRecorder()
|
||||||
|
node = _make_node(self.mod, recorder=rec)
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
with node._lock:
|
||||||
|
self.assertIsNone(node._trigger)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: poll loop — full state machine ────────────────────────────────────
|
||||||
|
|
||||||
|
class TestPollCb(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls): cls.mod = _load_mod()
|
||||||
|
|
||||||
|
def _node(self, **kwargs):
|
||||||
|
rec = _MockRecorder()
|
||||||
|
node = _make_node(self.mod, recorder=rec, **kwargs)
|
||||||
|
return node, rec
|
||||||
|
|
||||||
|
def test_true_trigger_starts_recording(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertEqual(node.state, "recording")
|
||||||
|
self.assertIn("start", rec.call_types())
|
||||||
|
|
||||||
|
def test_recording_publishes_status(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertEqual(_pub(node).msgs[-1].data, "recording")
|
||||||
|
|
||||||
|
def test_false_trigger_stops_recording(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb() # → recording
|
||||||
|
_trigger(node, False); node._poll_cb() # → stopping
|
||||||
|
self.assertEqual(node.state, "stopping")
|
||||||
|
self.assertIn("stop", rec.call_types())
|
||||||
|
|
||||||
|
def test_stopping_publishes_status(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
_trigger(node, False); node._poll_cb()
|
||||||
|
self.assertEqual(_pub(node).msgs[-1].data, "stopping")
|
||||||
|
|
||||||
|
def test_after_stop_proc_exit_idle(self):
|
||||||
|
"""Once the mock recorder stops, next poll resolves to idle."""
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb() # recording
|
||||||
|
_trigger(node, False); node._poll_cb() # stopping (rec.stop() sets running=False)
|
||||||
|
node._poll_cb() # proc not running → idle
|
||||||
|
self.assertEqual(node.state, "idle")
|
||||||
|
|
||||||
|
def test_idle_publishes_after_stop(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
_trigger(node, False); node._poll_cb()
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertEqual(_pub(node).msgs[-1].data, "idle")
|
||||||
|
|
||||||
|
def test_auto_stop_triggers_stopping(self):
|
||||||
|
node, rec = self._node(auto_stop_s=1.0)
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb() # → recording
|
||||||
|
# Back-date start time so auto-stop fires
|
||||||
|
with node._lock:
|
||||||
|
node._record_start_t = time.monotonic() - 10.0
|
||||||
|
node._poll_cb() # → stopping
|
||||||
|
self.assertEqual(node.state, "stopping")
|
||||||
|
|
||||||
|
def test_auto_stop_disabled(self):
|
||||||
|
node, rec = self._node(auto_stop_s=0.0)
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
with node._lock:
|
||||||
|
node._record_start_t = time.monotonic() - 9999.0
|
||||||
|
node._poll_cb()
|
||||||
|
# Should still be recording (auto-stop disabled)
|
||||||
|
self.assertEqual(node.state, "recording")
|
||||||
|
|
||||||
|
def test_proc_dies_error(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb() # → recording
|
||||||
|
rec._running = False # simulate unexpected exit
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertEqual(node.state, "error")
|
||||||
|
|
||||||
|
def test_error_publishes_status(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
rec._running = False
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertEqual(_pub(node).msgs[-1].data, "error")
|
||||||
|
|
||||||
|
def test_error_retries_on_true_trigger(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb() # recording
|
||||||
|
rec._running = False
|
||||||
|
node._poll_cb() # error
|
||||||
|
_trigger(node, True); node._poll_cb() # retry → recording
|
||||||
|
self.assertEqual(node.state, "recording")
|
||||||
|
|
||||||
|
def test_start_failure_enters_error(self):
|
||||||
|
rec = _MockRecorder(start_succeeds=False)
|
||||||
|
node = _make_node(self.mod, recorder=rec)
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertEqual(node.state, "error")
|
||||||
|
|
||||||
|
def test_force_kill_on_stop_timeout(self):
|
||||||
|
"""Stubborn process that ignores SIGINT → force-killed after timeout."""
|
||||||
|
class _StubbornRecorder(_MockRecorder):
|
||||||
|
def stop(self):
|
||||||
|
self.calls.append(("stop",))
|
||||||
|
# Don't set _running = False — simulates process ignoring SIGINT
|
||||||
|
|
||||||
|
stubborn = _StubbornRecorder()
|
||||||
|
node = _make_node(self.mod, recorder=stubborn, stop_timeout_s=2.0)
|
||||||
|
_trigger(node, True); node._poll_cb() # → recording
|
||||||
|
_trigger(node, False); node._poll_cb() # → stopping (process stays alive)
|
||||||
|
self.assertEqual(node.state, "stopping")
|
||||||
|
# Expire the stop timeout
|
||||||
|
with node._lock:
|
||||||
|
node._stop_start_t = time.monotonic() - 10.0
|
||||||
|
node._poll_cb() # → force kill → idle
|
||||||
|
self.assertIn("kill", stubborn.call_types())
|
||||||
|
self.assertEqual(node.state, "idle")
|
||||||
|
|
||||||
|
def test_bag_path_set_when_recording(self):
|
||||||
|
node, rec = self._node(bag_prefix="mytest")
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertIn("mytest", node.current_bag)
|
||||||
|
|
||||||
|
def test_bag_path_cleared_after_idle(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
_trigger(node, False); node._poll_cb()
|
||||||
|
node._poll_cb()
|
||||||
|
self.assertEqual(node.current_bag, "")
|
||||||
|
|
||||||
|
def test_topics_passed_to_recorder(self):
|
||||||
|
rec = _MockRecorder()
|
||||||
|
node = _make_node(self.mod, recorder=rec, topics="/a,/b")
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
start_calls = [c for c in rec.calls if c[0] == "start"]
|
||||||
|
self.assertEqual(len(start_calls), 1)
|
||||||
|
self.assertEqual(start_calls[0][1], ["/a", "/b"])
|
||||||
|
|
||||||
|
def test_empty_topics_passes_empty_list(self):
|
||||||
|
rec = _MockRecorder()
|
||||||
|
node = _make_node(self.mod, recorder=rec, topics="")
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
start_calls = [c for c in rec.calls if c[0] == "start"]
|
||||||
|
self.assertEqual(start_calls[0][1], [])
|
||||||
|
|
||||||
|
def test_recorder_reset_on_idle(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
_trigger(node, False); node._poll_cb()
|
||||||
|
node._poll_cb() # idle
|
||||||
|
self.assertIn("reset", rec.call_types())
|
||||||
|
|
||||||
|
def test_full_lifecycle_status_sequence(self):
|
||||||
|
"""idle → recording → stopping → idle."""
|
||||||
|
node, rec = self._node()
|
||||||
|
pub = _pub(node)
|
||||||
|
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
_trigger(node, False); node._poll_cb()
|
||||||
|
node._poll_cb()
|
||||||
|
|
||||||
|
statuses = [m.data for m in pub.msgs]
|
||||||
|
self.assertIn("idle", statuses)
|
||||||
|
self.assertIn("recording", statuses)
|
||||||
|
self.assertIn("stopping", statuses)
|
||||||
|
|
||||||
|
def test_logging_on_start(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True)
|
||||||
|
node._poll_cb()
|
||||||
|
infos = [m for lvl, m in node._logs if lvl == "INFO"]
|
||||||
|
self.assertTrue(any("recording" in m.lower() for m in infos))
|
||||||
|
|
||||||
|
def test_logging_on_stop(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
_trigger(node, False); node._poll_cb()
|
||||||
|
infos = [m for lvl, m in node._logs if lvl == "INFO"]
|
||||||
|
self.assertTrue(any("stop" in m.lower() for m in infos))
|
||||||
|
|
||||||
|
def test_logging_on_error(self):
|
||||||
|
node, rec = self._node()
|
||||||
|
_trigger(node, True); node._poll_cb()
|
||||||
|
rec._running = False
|
||||||
|
node._poll_cb()
|
||||||
|
errors = [m for lvl, m in node._logs if lvl == "ERROR"]
|
||||||
|
self.assertTrue(len(errors) > 0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: source content ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestNodeSrc(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
with open(_SRC) as f: cls.src = f.read()
|
||||||
|
|
||||||
|
def test_issue_tag(self): self.assertIn("#332", self.src)
|
||||||
|
def test_trigger_topic(self): self.assertIn("/saltybot/record_trigger", self.src)
|
||||||
|
def test_status_topic(self): self.assertIn("/saltybot/recording_status", self.src)
|
||||||
|
def test_status_idle(self): self.assertIn('"idle"', self.src)
|
||||||
|
def test_status_recording(self): self.assertIn('"recording"', self.src)
|
||||||
|
def test_status_stopping(self): self.assertIn('"stopping"', self.src)
|
||||||
|
def test_status_error(self): self.assertIn('"error"', self.src)
|
||||||
|
def test_compute_transition_fn(self): self.assertIn("compute_recording_transition", self.src)
|
||||||
|
def test_bag_recorder_process(self): self.assertIn("BagRecorderProcess", self.src)
|
||||||
|
def test_make_bag_path(self): self.assertIn("make_bag_path", self.src)
|
||||||
|
def test_parse_topics(self): self.assertIn("parse_topics", self.src)
|
||||||
|
def test_auto_stop_param(self): self.assertIn("auto_stop_s", self.src)
|
||||||
|
def test_stop_timeout_param(self): self.assertIn("stop_timeout_s", self.src)
|
||||||
|
def test_compression_param(self): self.assertIn("compression", self.src)
|
||||||
|
def test_subprocess_used(self): self.assertIn("subprocess", self.src)
|
||||||
|
def test_sigint_used(self): self.assertIn("SIGINT", self.src)
|
||||||
|
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
|
||||||
|
def test_recorder_injectable(self): self.assertIn("_recorder", self.src)
|
||||||
|
def test_main_defined(self): self.assertIn("def main", self.src)
|
||||||
|
def test_bool_subscription(self): self.assertIn("Bool", self.src)
|
||||||
|
def test_string_publication(self): self.assertIn("String", self.src)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests: config / launch / setup ────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestConfig(unittest.TestCase):
|
||||||
|
_CONFIG = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/config/rosbag_recorder_params.yaml"
|
||||||
|
)
|
||||||
|
_LAUNCH = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/launch/rosbag_recorder.launch.py"
|
||||||
|
)
|
||||||
|
_SETUP = (
|
||||||
|
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||||
|
"saltybot_social/setup.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_config_exists(self):
|
||||||
|
import os; self.assertTrue(os.path.exists(self._CONFIG))
|
||||||
|
|
||||||
|
def test_config_auto_stop(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("auto_stop_s", c)
|
||||||
|
|
||||||
|
def test_config_bag_dir(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("bag_dir", c)
|
||||||
|
|
||||||
|
def test_config_topics(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("topics", c)
|
||||||
|
|
||||||
|
def test_config_compression(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("compression", c)
|
||||||
|
|
||||||
|
def test_config_stop_timeout(self):
|
||||||
|
with open(self._CONFIG) as f: c = f.read()
|
||||||
|
self.assertIn("stop_timeout_s", c)
|
||||||
|
|
||||||
|
def test_launch_exists(self):
|
||||||
|
import os; self.assertTrue(os.path.exists(self._LAUNCH))
|
||||||
|
|
||||||
|
def test_launch_auto_stop_arg(self):
|
||||||
|
with open(self._LAUNCH) as f: c = f.read()
|
||||||
|
self.assertIn("auto_stop_s", c)
|
||||||
|
|
||||||
|
def test_launch_topics_arg(self):
|
||||||
|
with open(self._LAUNCH) as f: c = f.read()
|
||||||
|
self.assertIn("topics", c)
|
||||||
|
|
||||||
|
def test_entry_point_in_setup(self):
|
||||||
|
with open(self._SETUP) as f: c = f.read()
|
||||||
|
self.assertIn("rosbag_recorder_node", c)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user