Compare commits
No commits in common. "3bee8f3cb4472d935a9d26c960a5229657a2dc06" and "9b538395c007b0ce9330f8034ed7101ab9c92ee5" have entirely different histories.
3bee8f3cb4
...
9b538395c0
@ -1,20 +0,0 @@
|
|||||||
rosbag_recorder_node:
|
|
||||||
ros__parameters:
|
|
||||||
trigger_topic: "/saltybot/record_trigger"
|
|
||||||
status_topic: "/saltybot/recording_status"
|
|
||||||
|
|
||||||
# Comma-separated topic list to record.
|
|
||||||
# Empty string = record all topics (ros2 bag record --all).
|
|
||||||
# Example: "/saltybot/camera_status,/saltybot/wake_word_detected,/cmd_vel"
|
|
||||||
topics: ""
|
|
||||||
|
|
||||||
bag_dir: "/tmp/saltybot_bags" # output directory (created if absent)
|
|
||||||
bag_prefix: "saltybot" # filename prefix; timestamp appended
|
|
||||||
|
|
||||||
auto_stop_s: 60.0 # auto-stop after N seconds; 0 = disabled
|
|
||||||
stop_timeout_s: 5.0 # force-kill if subprocess won't stop within N s
|
|
||||||
|
|
||||||
compression: false # enable zstd file-level compression
|
|
||||||
max_bag_size_mb: 0.0 # split bags at this size (MiB); 0 = no limit
|
|
||||||
|
|
||||||
poll_rate: 2.0 # state-machine check frequency (Hz)
|
|
||||||
@ -1,47 +0,0 @@
|
|||||||
"""rosbag_recorder.launch.py — Launch trigger-based bag recorder (Issue #332).
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
ros2 launch saltybot_social rosbag_recorder.launch.py
|
|
||||||
ros2 launch saltybot_social rosbag_recorder.launch.py auto_stop_s:=120.0
|
|
||||||
ros2 launch saltybot_social rosbag_recorder.launch.py \\
|
|
||||||
topics:="/saltybot/camera_status,/cmd_vel" bag_dir:=/data/bags
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
from ament_index_python.packages import get_package_share_directory
|
|
||||||
from launch import LaunchDescription
|
|
||||||
from launch.actions import DeclareLaunchArgument
|
|
||||||
from launch.substitutions import LaunchConfiguration
|
|
||||||
from launch_ros.actions import Node
|
|
||||||
|
|
||||||
|
|
||||||
def generate_launch_description():
|
|
||||||
pkg = get_package_share_directory("saltybot_social")
|
|
||||||
cfg = os.path.join(pkg, "config", "rosbag_recorder_params.yaml")
|
|
||||||
|
|
||||||
return LaunchDescription([
|
|
||||||
DeclareLaunchArgument("topics", default_value="",
|
|
||||||
description="Comma-separated topics (empty=all)"),
|
|
||||||
DeclareLaunchArgument("bag_dir", default_value="/tmp/saltybot_bags",
|
|
||||||
description="Output directory for bag files"),
|
|
||||||
DeclareLaunchArgument("auto_stop_s", default_value="60.0",
|
|
||||||
description="Auto-stop timeout in seconds (0=off)"),
|
|
||||||
DeclareLaunchArgument("compression", default_value="false",
|
|
||||||
description="Enable zstd compression"),
|
|
||||||
|
|
||||||
Node(
|
|
||||||
package="saltybot_social",
|
|
||||||
executable="rosbag_recorder_node",
|
|
||||||
name="rosbag_recorder_node",
|
|
||||||
output="screen",
|
|
||||||
parameters=[
|
|
||||||
cfg,
|
|
||||||
{
|
|
||||||
"topics": LaunchConfiguration("topics"),
|
|
||||||
"bag_dir": LaunchConfiguration("bag_dir"),
|
|
||||||
"auto_stop_s": LaunchConfiguration("auto_stop_s"),
|
|
||||||
"compression": LaunchConfiguration("compression"),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
),
|
|
||||||
])
|
|
||||||
@ -1,373 +0,0 @@
|
|||||||
"""rosbag_recorder_node.py — Trigger-based ROS2 bag recorder.
|
|
||||||
Issue #332
|
|
||||||
|
|
||||||
Subscribes to /saltybot/record_trigger (Bool). True starts recording;
|
|
||||||
False stops it. Auto-stop fires after ``auto_stop_s`` seconds if still
|
|
||||||
running. Recording is performed by spawning a ``ros2 bag record``
|
|
||||||
subprocess which is sent SIGINT for graceful shutdown and SIGKILL if it
|
|
||||||
does not exit within ``stop_timeout_s``.
|
|
||||||
|
|
||||||
Status values
|
|
||||||
─────────────
|
|
||||||
"idle" — not recording
|
|
||||||
"recording" — subprocess active, writing to bag file
|
|
||||||
"stopping" — SIGINT sent, waiting for subprocess to exit
|
|
||||||
"error" — subprocess died unexpectedly; new trigger retries
|
|
||||||
|
|
||||||
Subscriptions
|
|
||||||
─────────────
|
|
||||||
/saltybot/record_trigger std_msgs/Bool — True = start, False = stop
|
|
||||||
|
|
||||||
Publications
|
|
||||||
────────────
|
|
||||||
/saltybot/recording_status std_msgs/String — status value (see above)
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
──────────
|
|
||||||
trigger_topic (str, "/saltybot/record_trigger")
|
|
||||||
status_topic (str, "/saltybot/recording_status")
|
|
||||||
topics (str, "") comma-separated topic list;
|
|
||||||
empty string → record all topics (-a)
|
|
||||||
bag_dir (str, "/tmp/saltybot_bags")
|
|
||||||
bag_prefix (str, "saltybot")
|
|
||||||
auto_stop_s (float, 60.0) 0 = no auto-stop
|
|
||||||
stop_timeout_s (float, 5.0) force-kill after this many seconds
|
|
||||||
compression (bool, False) enable zstd file compression
|
|
||||||
max_bag_size_mb (float, 0.0) 0 = unlimited
|
|
||||||
poll_rate (float, 2.0) state-machine check frequency (Hz)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import datetime
|
|
||||||
import os
|
|
||||||
import signal
|
|
||||||
import subprocess
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
from typing import List, Optional, Tuple
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy.qos import QoSProfile
|
|
||||||
from std_msgs.msg import Bool, String
|
|
||||||
|
|
||||||
|
|
||||||
# ── Status constants ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
STATUS_IDLE = "idle"
|
|
||||||
STATUS_RECORDING = "recording"
|
|
||||||
STATUS_STOPPING = "stopping"
|
|
||||||
STATUS_ERROR = "error"
|
|
||||||
|
|
||||||
|
|
||||||
# ── Pure helpers ───────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def make_bag_path(bag_dir: str, prefix: str) -> str:
|
|
||||||
"""Return a timestamped output path for a new bag (no file created)."""
|
|
||||||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
return os.path.join(bag_dir, f"{prefix}_{ts}")
|
|
||||||
|
|
||||||
|
|
||||||
def parse_topics(topics_str: str) -> List[str]:
|
|
||||||
"""Parse a comma-separated topic string into a clean list.
|
|
||||||
|
|
||||||
Returns an empty list when *topics_str* is blank (meaning record-all).
|
|
||||||
"""
|
|
||||||
if not topics_str or not topics_str.strip():
|
|
||||||
return []
|
|
||||||
return [t.strip() for t in topics_str.split(",") if t.strip()]
|
|
||||||
|
|
||||||
|
|
||||||
def compute_recording_transition(
|
|
||||||
state: str,
|
|
||||||
trigger: Optional[bool],
|
|
||||||
proc_running: bool,
|
|
||||||
now: float,
|
|
||||||
record_start_t: float,
|
|
||||||
stop_start_t: float,
|
|
||||||
auto_stop_s: float,
|
|
||||||
stop_timeout_s: float,
|
|
||||||
) -> Tuple[str, bool]:
|
|
||||||
"""Pure state-machine step — no I/O, no ROS.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
state : current status string
|
|
||||||
trigger : latest trigger value (True=start, False=stop, None=none)
|
|
||||||
proc_running : whether the recorder subprocess is alive
|
|
||||||
now : current monotonic time (s)
|
|
||||||
record_start_t : monotonic time recording began (0 if not recording)
|
|
||||||
stop_start_t : monotonic time STOPPING began (0 if not stopping)
|
|
||||||
auto_stop_s : auto-stop after this many seconds (0 = disabled)
|
|
||||||
stop_timeout_s : force-kill if stopping > this long (0 = disabled)
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
(new_state, force_kill)
|
|
||||||
force_kill=True signals the caller to SIGKILL the process.
|
|
||||||
"""
|
|
||||||
if state == STATUS_IDLE:
|
|
||||||
if trigger is True:
|
|
||||||
return STATUS_RECORDING, False
|
|
||||||
return STATUS_IDLE, False
|
|
||||||
|
|
||||||
if state == STATUS_RECORDING:
|
|
||||||
if not proc_running:
|
|
||||||
return STATUS_ERROR, False
|
|
||||||
if trigger is False:
|
|
||||||
return STATUS_STOPPING, False
|
|
||||||
if (auto_stop_s > 0 and record_start_t > 0
|
|
||||||
and (now - record_start_t) >= auto_stop_s):
|
|
||||||
return STATUS_STOPPING, False
|
|
||||||
return STATUS_RECORDING, False
|
|
||||||
|
|
||||||
if state == STATUS_STOPPING:
|
|
||||||
if not proc_running:
|
|
||||||
return STATUS_IDLE, False
|
|
||||||
if (stop_timeout_s > 0 and stop_start_t > 0
|
|
||||||
and (now - stop_start_t) >= stop_timeout_s):
|
|
||||||
return STATUS_IDLE, True # force-kill
|
|
||||||
return STATUS_STOPPING, False
|
|
||||||
|
|
||||||
# STATUS_ERROR
|
|
||||||
if trigger is True:
|
|
||||||
return STATUS_RECORDING, False
|
|
||||||
return STATUS_ERROR, False
|
|
||||||
|
|
||||||
|
|
||||||
# ── Subprocess wrapper ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class BagRecorderProcess:
|
|
||||||
"""Thin wrapper around a ``ros2 bag record`` subprocess."""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self._proc: Optional[subprocess.Popen] = None
|
|
||||||
|
|
||||||
def start(self, topics: List[str], output_path: str,
|
|
||||||
compression: bool = False,
|
|
||||||
max_size_mb: float = 0.0) -> bool:
|
|
||||||
"""Launch the recorder. Returns False if already running or on error."""
|
|
||||||
if self.is_running():
|
|
||||||
return False
|
|
||||||
|
|
||||||
cmd = ["ros2", "bag", "record", "--output", output_path]
|
|
||||||
if topics:
|
|
||||||
cmd += topics
|
|
||||||
else:
|
|
||||||
cmd += ["--all"]
|
|
||||||
if compression:
|
|
||||||
cmd += ["--compression-mode", "file",
|
|
||||||
"--compression-format", "zstd"]
|
|
||||||
if max_size_mb > 0:
|
|
||||||
cmd += ["--max-bag-size",
|
|
||||||
str(int(max_size_mb * 1024 * 1024))]
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._proc = subprocess.Popen(
|
|
||||||
cmd,
|
|
||||||
stdout=subprocess.DEVNULL,
|
|
||||||
stderr=subprocess.PIPE,
|
|
||||||
preexec_fn=os.setsid,
|
|
||||||
)
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
self._proc = None
|
|
||||||
return False
|
|
||||||
|
|
||||||
def stop(self) -> None:
|
|
||||||
"""Send SIGINT to the process group for graceful shutdown."""
|
|
||||||
if self._proc is not None and self._proc.poll() is None:
|
|
||||||
try:
|
|
||||||
os.killpg(os.getpgid(self._proc.pid), signal.SIGINT)
|
|
||||||
except (ProcessLookupError, PermissionError, OSError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def kill(self) -> None:
|
|
||||||
"""Send SIGKILL to the process group for forced shutdown."""
|
|
||||||
if self._proc is not None:
|
|
||||||
try:
|
|
||||||
os.killpg(os.getpgid(self._proc.pid), signal.SIGKILL)
|
|
||||||
except (ProcessLookupError, PermissionError, OSError):
|
|
||||||
pass
|
|
||||||
self._proc = None
|
|
||||||
|
|
||||||
def is_running(self) -> bool:
|
|
||||||
return self._proc is not None and self._proc.poll() is None
|
|
||||||
|
|
||||||
def reset(self) -> None:
|
|
||||||
"""Clear internal proc reference (call after graceful exit)."""
|
|
||||||
self._proc = None
|
|
||||||
|
|
||||||
|
|
||||||
# ── ROS2 node ──────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class RosbagRecorderNode(Node):
|
|
||||||
"""Trigger-based ROS bag recorder with auto-stop and status reporting."""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
super().__init__("rosbag_recorder_node")
|
|
||||||
|
|
||||||
self.declare_parameter("trigger_topic", "/saltybot/record_trigger")
|
|
||||||
self.declare_parameter("status_topic", "/saltybot/recording_status")
|
|
||||||
self.declare_parameter("topics", "")
|
|
||||||
self.declare_parameter("bag_dir", "/tmp/saltybot_bags")
|
|
||||||
self.declare_parameter("bag_prefix", "saltybot")
|
|
||||||
self.declare_parameter("auto_stop_s", 60.0)
|
|
||||||
self.declare_parameter("stop_timeout_s", 5.0)
|
|
||||||
self.declare_parameter("compression", False)
|
|
||||||
self.declare_parameter("max_bag_size_mb", 0.0)
|
|
||||||
self.declare_parameter("poll_rate", 2.0)
|
|
||||||
|
|
||||||
trigger_topic = self.get_parameter("trigger_topic").value
|
|
||||||
status_topic = self.get_parameter("status_topic").value
|
|
||||||
topics_str = self.get_parameter("topics").value
|
|
||||||
self._bag_dir = str(self.get_parameter("bag_dir").value)
|
|
||||||
self._bag_prefix = str(self.get_parameter("bag_prefix").value)
|
|
||||||
self._auto_stop_s = float(self.get_parameter("auto_stop_s").value)
|
|
||||||
self._stop_tmo_s = float(self.get_parameter("stop_timeout_s").value)
|
|
||||||
self._compression = bool(self.get_parameter("compression").value)
|
|
||||||
self._max_mb = float(self.get_parameter("max_bag_size_mb").value)
|
|
||||||
poll_rate = float(self.get_parameter("poll_rate").value)
|
|
||||||
|
|
||||||
self._topics: List[str] = parse_topics(str(topics_str))
|
|
||||||
|
|
||||||
# Recorder process — injectable for tests
|
|
||||||
self._recorder: BagRecorderProcess = BagRecorderProcess()
|
|
||||||
|
|
||||||
# State
|
|
||||||
self._state = STATUS_IDLE
|
|
||||||
self._trigger: Optional[bool] = None
|
|
||||||
self._record_start_t: float = 0.0
|
|
||||||
self._stop_start_t: float = 0.0
|
|
||||||
self._current_bag: str = ""
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
|
|
||||||
qos = QoSProfile(depth=10)
|
|
||||||
self._status_pub = self.create_publisher(String, status_topic, qos)
|
|
||||||
self._trigger_sub = self.create_subscription(
|
|
||||||
Bool, trigger_topic, self._on_trigger, qos
|
|
||||||
)
|
|
||||||
self._timer = self.create_timer(1.0 / poll_rate, self._poll_cb)
|
|
||||||
|
|
||||||
# Publish initial state
|
|
||||||
self._publish(STATUS_IDLE)
|
|
||||||
|
|
||||||
topic_desc = ",".join(self._topics) if self._topics else "<all>"
|
|
||||||
self.get_logger().info(
|
|
||||||
f"RosbagRecorderNode ready — topics={topic_desc}, "
|
|
||||||
f"bag_dir={self._bag_dir}, auto_stop={self._auto_stop_s}s"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Subscription ───────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _on_trigger(self, msg) -> None:
|
|
||||||
with self._lock:
|
|
||||||
self._trigger = bool(msg.data)
|
|
||||||
|
|
||||||
# ── Poll / state machine ────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _poll_cb(self) -> None:
|
|
||||||
now = time.monotonic()
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
trigger = self._trigger
|
|
||||||
self._trigger = None # consume
|
|
||||||
state = self._state
|
|
||||||
rec_t = self._record_start_t
|
|
||||||
stop_t = self._stop_start_t
|
|
||||||
|
|
||||||
proc_running = self._recorder.is_running()
|
|
||||||
|
|
||||||
new_state, force_kill = compute_recording_transition(
|
|
||||||
state, trigger, proc_running, now,
|
|
||||||
rec_t, stop_t, self._auto_stop_s, self._stop_tmo_s,
|
|
||||||
)
|
|
||||||
|
|
||||||
if force_kill:
|
|
||||||
self._recorder.kill()
|
|
||||||
self.get_logger().warn(
|
|
||||||
f"RosbagRecorder: force-killed (stop_timeout={self._stop_tmo_s}s)"
|
|
||||||
)
|
|
||||||
|
|
||||||
if new_state != state:
|
|
||||||
self._enter_state(new_state, now)
|
|
||||||
|
|
||||||
def _enter_state(self, new_state: str, now: float) -> None:
|
|
||||||
if new_state == STATUS_RECORDING:
|
|
||||||
bag_path = make_bag_path(self._bag_dir, self._bag_prefix)
|
|
||||||
started = self._recorder.start(
|
|
||||||
self._topics, bag_path,
|
|
||||||
compression=self._compression,
|
|
||||||
max_size_mb=self._max_mb,
|
|
||||||
)
|
|
||||||
if not started:
|
|
||||||
new_state = STATUS_ERROR
|
|
||||||
self.get_logger().error(
|
|
||||||
"RosbagRecorder: failed to start recorder subprocess"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
with self._lock:
|
|
||||||
self._record_start_t = now
|
|
||||||
self._current_bag = bag_path
|
|
||||||
self.get_logger().info(
|
|
||||||
f"RosbagRecorder: recording started → {bag_path}"
|
|
||||||
)
|
|
||||||
|
|
||||||
elif new_state == STATUS_STOPPING:
|
|
||||||
self._recorder.stop()
|
|
||||||
with self._lock:
|
|
||||||
self._stop_start_t = now
|
|
||||||
self.get_logger().info("RosbagRecorder: stopping (SIGINT sent)")
|
|
||||||
|
|
||||||
elif new_state == STATUS_IDLE:
|
|
||||||
bag = self._current_bag
|
|
||||||
with self._lock:
|
|
||||||
self._record_start_t = 0.0
|
|
||||||
self._stop_start_t = 0.0
|
|
||||||
self._current_bag = ""
|
|
||||||
self._recorder.reset()
|
|
||||||
self.get_logger().info(
|
|
||||||
f"RosbagRecorder: recording complete → {bag}"
|
|
||||||
)
|
|
||||||
|
|
||||||
elif new_state == STATUS_ERROR:
|
|
||||||
self.get_logger().error(
|
|
||||||
"RosbagRecorder: subprocess exited unexpectedly"
|
|
||||||
)
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
self._state = new_state
|
|
||||||
|
|
||||||
self._publish(new_state)
|
|
||||||
|
|
||||||
# ── Publish ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _publish(self, status: str) -> None:
|
|
||||||
msg = String()
|
|
||||||
msg.data = status
|
|
||||||
self._status_pub.publish(msg)
|
|
||||||
|
|
||||||
# ── Public accessors ─────────────────────────────────────────────────
|
|
||||||
|
|
||||||
@property
|
|
||||||
def state(self) -> str:
|
|
||||||
with self._lock:
|
|
||||||
return self._state
|
|
||||||
|
|
||||||
@property
|
|
||||||
def current_bag(self) -> str:
|
|
||||||
with self._lock:
|
|
||||||
return self._current_bag
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None) -> None:
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = RosbagRecorderNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.shutdown()
|
|
||||||
@ -61,8 +61,6 @@ setup(
|
|||||||
'wake_word_node = saltybot_social.wake_word_node:main',
|
'wake_word_node = saltybot_social.wake_word_node:main',
|
||||||
# USB camera hot-plug monitor (Issue #320)
|
# USB camera hot-plug monitor (Issue #320)
|
||||||
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
|
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
|
||||||
# Trigger-based ROS2 bag recorder (Issue #332)
|
|
||||||
'rosbag_recorder_node = saltybot_social.rosbag_recorder_node:main',
|
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -1,719 +0,0 @@
|
|||||||
"""test_rosbag_recorder.py — Offline tests for rosbag_recorder_node (Issue #332).
|
|
||||||
|
|
||||||
Stubs out rclpy and ROS message types.
|
|
||||||
BagRecorderProcess is replaced with MockRecorder — no subprocesses are spawned.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import importlib.util
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import types
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
|
|
||||||
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _make_ros_stubs():
|
|
||||||
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
|
|
||||||
"std_msgs", "std_msgs.msg"):
|
|
||||||
if mod_name not in sys.modules:
|
|
||||||
sys.modules[mod_name] = types.ModuleType(mod_name)
|
|
||||||
|
|
||||||
class _Node:
|
|
||||||
def __init__(self, name="node"):
|
|
||||||
self._name = name
|
|
||||||
if not hasattr(self, "_params"):
|
|
||||||
self._params = {}
|
|
||||||
self._pubs = {}
|
|
||||||
self._subs = {}
|
|
||||||
self._logs = []
|
|
||||||
self._timers = []
|
|
||||||
|
|
||||||
def declare_parameter(self, name, default):
|
|
||||||
if name not in self._params:
|
|
||||||
self._params[name] = default
|
|
||||||
|
|
||||||
def get_parameter(self, name):
|
|
||||||
class _P:
|
|
||||||
def __init__(self, v): self.value = v
|
|
||||||
return _P(self._params.get(name))
|
|
||||||
|
|
||||||
def create_publisher(self, msg_type, topic, qos):
|
|
||||||
pub = _FakePub()
|
|
||||||
self._pubs[topic] = pub
|
|
||||||
return pub
|
|
||||||
|
|
||||||
def create_subscription(self, msg_type, topic, cb, qos):
|
|
||||||
self._subs[topic] = cb
|
|
||||||
return object()
|
|
||||||
|
|
||||||
def create_timer(self, period, cb):
|
|
||||||
self._timers.append(cb)
|
|
||||||
return object()
|
|
||||||
|
|
||||||
def get_logger(self):
|
|
||||||
node = self
|
|
||||||
class _L:
|
|
||||||
def info(self, m): node._logs.append(("INFO", m))
|
|
||||||
def warn(self, m): node._logs.append(("WARN", m))
|
|
||||||
def error(self, m): node._logs.append(("ERROR", m))
|
|
||||||
return _L()
|
|
||||||
|
|
||||||
def destroy_node(self): pass
|
|
||||||
|
|
||||||
class _FakePub:
|
|
||||||
def __init__(self):
|
|
||||||
self.msgs = []
|
|
||||||
def publish(self, msg):
|
|
||||||
self.msgs.append(msg)
|
|
||||||
|
|
||||||
class _QoSProfile:
|
|
||||||
def __init__(self, depth=10): self.depth = depth
|
|
||||||
|
|
||||||
class _Bool:
|
|
||||||
def __init__(self): self.data = False
|
|
||||||
|
|
||||||
class _String:
|
|
||||||
def __init__(self): self.data = ""
|
|
||||||
|
|
||||||
rclpy_mod = sys.modules["rclpy"]
|
|
||||||
rclpy_mod.init = lambda args=None: None
|
|
||||||
rclpy_mod.spin = lambda node: None
|
|
||||||
rclpy_mod.shutdown = lambda: None
|
|
||||||
|
|
||||||
sys.modules["rclpy.node"].Node = _Node
|
|
||||||
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
|
|
||||||
sys.modules["std_msgs.msg"].Bool = _Bool
|
|
||||||
sys.modules["std_msgs.msg"].String = _String
|
|
||||||
|
|
||||||
return _Node, _FakePub, _Bool, _String
|
|
||||||
|
|
||||||
|
|
||||||
_Node, _FakePub, _Bool, _String = _make_ros_stubs()
|
|
||||||
|
|
||||||
|
|
||||||
# ── Module loader ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
_SRC = (
|
|
||||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
|
||||||
"saltybot_social/saltybot_social/rosbag_recorder_node.py"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _load_mod():
|
|
||||||
spec = importlib.util.spec_from_file_location("rosbag_recorder_testmod", _SRC)
|
|
||||||
mod = importlib.util.module_from_spec(spec)
|
|
||||||
spec.loader.exec_module(mod)
|
|
||||||
return mod
|
|
||||||
|
|
||||||
|
|
||||||
class _MockRecorder:
|
|
||||||
"""Injectable replacement for BagRecorderProcess."""
|
|
||||||
|
|
||||||
def __init__(self, start_succeeds: bool = True) -> None:
|
|
||||||
self.start_succeeds = start_succeeds
|
|
||||||
self._running = False
|
|
||||||
self.calls: list = []
|
|
||||||
|
|
||||||
def start(self, topics, output_path, compression=False, max_size_mb=0.0):
|
|
||||||
self.calls.append(("start", list(topics), output_path))
|
|
||||||
if self.start_succeeds:
|
|
||||||
self._running = True
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.calls.append(("stop",))
|
|
||||||
self._running = False # immediately "gone" for deterministic tests
|
|
||||||
|
|
||||||
def kill(self):
|
|
||||||
self.calls.append(("kill",))
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def is_running(self):
|
|
||||||
return self._running
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
self.calls.append(("reset",))
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def call_types(self):
|
|
||||||
return [c[0] for c in self.calls]
|
|
||||||
|
|
||||||
|
|
||||||
def _make_node(mod, recorder=None, **kwargs):
|
|
||||||
node = mod.RosbagRecorderNode.__new__(mod.RosbagRecorderNode)
|
|
||||||
defaults = {
|
|
||||||
"trigger_topic": "/saltybot/record_trigger",
|
|
||||||
"status_topic": "/saltybot/recording_status",
|
|
||||||
"topics": "",
|
|
||||||
"bag_dir": "/tmp/test_bags",
|
|
||||||
"bag_prefix": "test",
|
|
||||||
"auto_stop_s": 60.0,
|
|
||||||
"stop_timeout_s": 5.0,
|
|
||||||
"compression": False,
|
|
||||||
"max_bag_size_mb": 0.0,
|
|
||||||
"poll_rate": 2.0,
|
|
||||||
}
|
|
||||||
defaults.update(kwargs)
|
|
||||||
node._params = dict(defaults)
|
|
||||||
mod.RosbagRecorderNode.__init__(node)
|
|
||||||
if recorder is not None:
|
|
||||||
node._recorder = recorder
|
|
||||||
return node
|
|
||||||
|
|
||||||
|
|
||||||
def _trigger(node, value: bool):
|
|
||||||
"""Deliver a Bool trigger message."""
|
|
||||||
msg = _Bool()
|
|
||||||
msg.data = value
|
|
||||||
node._subs["/saltybot/record_trigger"](msg)
|
|
||||||
|
|
||||||
|
|
||||||
def _pub(node):
|
|
||||||
return node._pubs["/saltybot/recording_status"]
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tests: pure helpers ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestMakeBagPath(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls): cls.mod = _load_mod()
|
|
||||||
|
|
||||||
def test_contains_prefix(self):
|
|
||||||
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
|
|
||||||
self.assertIn("saltybot", p)
|
|
||||||
|
|
||||||
def test_contains_bag_dir(self):
|
|
||||||
p = self.mod.make_bag_path("/tmp/bags", "saltybot")
|
|
||||||
self.assertTrue(p.startswith("/tmp/bags"))
|
|
||||||
|
|
||||||
def test_unique_per_call(self):
|
|
||||||
# Two calls in tight succession may share a second but that's fine;
|
|
||||||
# just ensure the function doesn't crash and returns a string.
|
|
||||||
p1 = self.mod.make_bag_path("/tmp", "t")
|
|
||||||
self.assertIsInstance(p1, str)
|
|
||||||
self.assertTrue(len(p1) > 0)
|
|
||||||
|
|
||||||
|
|
||||||
class TestParseTopics(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls): cls.mod = _load_mod()
|
|
||||||
|
|
||||||
def test_empty_string_returns_empty_list(self):
|
|
||||||
self.assertEqual(self.mod.parse_topics(""), [])
|
|
||||||
|
|
||||||
def test_whitespace_only_returns_empty(self):
|
|
||||||
self.assertEqual(self.mod.parse_topics(" "), [])
|
|
||||||
|
|
||||||
def test_single_topic(self):
|
|
||||||
self.assertEqual(self.mod.parse_topics("/topic/foo"), ["/topic/foo"])
|
|
||||||
|
|
||||||
def test_multiple_topics(self):
|
|
||||||
r = self.mod.parse_topics("/a,/b,/c")
|
|
||||||
self.assertEqual(r, ["/a", "/b", "/c"])
|
|
||||||
|
|
||||||
def test_strips_whitespace(self):
|
|
||||||
r = self.mod.parse_topics(" /a , /b ")
|
|
||||||
self.assertEqual(r, ["/a", "/b"])
|
|
||||||
|
|
||||||
def test_ignores_empty_segments(self):
|
|
||||||
r = self.mod.parse_topics("/a,,/b")
|
|
||||||
self.assertEqual(r, ["/a", "/b"])
|
|
||||||
|
|
||||||
|
|
||||||
class TestStatusConstants(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls): cls.mod = _load_mod()
|
|
||||||
|
|
||||||
def test_idle(self): self.assertEqual(self.mod.STATUS_IDLE, "idle")
|
|
||||||
def test_recording(self): self.assertEqual(self.mod.STATUS_RECORDING, "recording")
|
|
||||||
def test_stopping(self): self.assertEqual(self.mod.STATUS_STOPPING, "stopping")
|
|
||||||
def test_error(self): self.assertEqual(self.mod.STATUS_ERROR, "error")
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tests: compute_recording_transition ──────────────────────────────────────
|
|
||||||
|
|
||||||
class TestComputeRecordingTransition(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls): cls.mod = _load_mod()
|
|
||||||
|
|
||||||
def _tr(self, state, trigger=None, proc_running=True,
|
|
||||||
now=100.0, rec_t=0.0, stop_t=0.0,
|
|
||||||
auto_stop=60.0, stop_tmo=5.0):
|
|
||||||
return self.mod.compute_recording_transition(
|
|
||||||
state, trigger, proc_running, now,
|
|
||||||
rec_t, stop_t, auto_stop, stop_tmo,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── IDLE ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_idle_no_trigger_stays_idle(self):
|
|
||||||
s, fk = self._tr("idle", trigger=None)
|
|
||||||
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_idle_false_trigger_stays_idle(self):
|
|
||||||
s, fk = self._tr("idle", trigger=False)
|
|
||||||
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_idle_true_trigger_starts_recording(self):
|
|
||||||
s, fk = self._tr("idle", trigger=True)
|
|
||||||
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
# ── RECORDING ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_recording_stable_no_change(self):
|
|
||||||
s, fk = self._tr("recording", proc_running=True)
|
|
||||||
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_recording_false_trigger_stops(self):
|
|
||||||
s, fk = self._tr("recording", trigger=False, proc_running=True)
|
|
||||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_recording_proc_dies_error(self):
|
|
||||||
s, fk = self._tr("recording", proc_running=False)
|
|
||||||
self.assertEqual(s, "error"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_recording_auto_stop_fires(self):
|
|
||||||
# started at t=40, now=t=101 → 61 s elapsed > auto_stop=60
|
|
||||||
s, fk = self._tr("recording", proc_running=True,
|
|
||||||
now=101.0, rec_t=40.0, auto_stop=60.0)
|
|
||||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_recording_auto_stop_not_yet(self):
|
|
||||||
# started at t=50, now=100 → 50 s < 60 s
|
|
||||||
s, fk = self._tr("recording", proc_running=True,
|
|
||||||
now=100.0, rec_t=50.0, auto_stop=60.0)
|
|
||||||
self.assertEqual(s, "recording")
|
|
||||||
|
|
||||||
def test_recording_auto_stop_at_exactly_timeout(self):
|
|
||||||
s, fk = self._tr("recording", proc_running=True,
|
|
||||||
now=110.0, rec_t=50.0, auto_stop=60.0)
|
|
||||||
self.assertEqual(s, "stopping")
|
|
||||||
|
|
||||||
def test_recording_auto_stop_disabled(self):
|
|
||||||
# auto_stop_s=0 → never auto-stops
|
|
||||||
s, fk = self._tr("recording", proc_running=True,
|
|
||||||
now=9999.0, rec_t=0.0, auto_stop=0.0)
|
|
||||||
self.assertEqual(s, "recording")
|
|
||||||
|
|
||||||
# ── STOPPING ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_stopping_proc_running_stays(self):
|
|
||||||
s, fk = self._tr("stopping", proc_running=True)
|
|
||||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_stopping_proc_exits_idle(self):
|
|
||||||
s, fk = self._tr("stopping", proc_running=False)
|
|
||||||
self.assertEqual(s, "idle"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_stopping_force_kill_after_timeout(self):
|
|
||||||
# entered stopping at t=95, now=101 → 6 s > stop_tmo=5
|
|
||||||
s, fk = self._tr("stopping", proc_running=True,
|
|
||||||
now=101.0, stop_t=95.0, stop_tmo=5.0)
|
|
||||||
self.assertEqual(s, "idle"); self.assertTrue(fk)
|
|
||||||
|
|
||||||
def test_stopping_not_yet_force_kill(self):
|
|
||||||
# entered at t=98, now=100 → 2 s < 5 s
|
|
||||||
s, fk = self._tr("stopping", proc_running=True,
|
|
||||||
now=100.0, stop_t=98.0, stop_tmo=5.0)
|
|
||||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_stopping_timeout_disabled(self):
|
|
||||||
# stop_tmo=0 → never force-kills
|
|
||||||
s, fk = self._tr("stopping", proc_running=True,
|
|
||||||
now=9999.0, stop_t=0.0, stop_tmo=0.0)
|
|
||||||
self.assertEqual(s, "stopping"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_stopping_force_kill_exactly_at_timeout(self):
|
|
||||||
s, fk = self._tr("stopping", proc_running=True,
|
|
||||||
now=100.0, stop_t=95.0, stop_tmo=5.0)
|
|
||||||
self.assertEqual(s, "idle"); self.assertTrue(fk)
|
|
||||||
|
|
||||||
# ── ERROR ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_error_stays_without_trigger(self):
|
|
||||||
s, fk = self._tr("error", trigger=None)
|
|
||||||
self.assertEqual(s, "error"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
def test_error_false_trigger_stays(self):
|
|
||||||
s, fk = self._tr("error", trigger=False)
|
|
||||||
self.assertEqual(s, "error")
|
|
||||||
|
|
||||||
def test_error_true_trigger_retries(self):
|
|
||||||
s, fk = self._tr("error", trigger=True)
|
|
||||||
self.assertEqual(s, "recording"); self.assertFalse(fk)
|
|
||||||
|
|
||||||
# ── Return shape ──────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def test_returns_tuple_of_two(self):
|
|
||||||
result = self._tr("idle")
|
|
||||||
self.assertEqual(len(result), 2)
|
|
||||||
|
|
||||||
def test_force_kill_is_bool(self):
|
|
||||||
_, fk = self._tr("idle")
|
|
||||||
self.assertIsInstance(fk, bool)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tests: node init ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestNodeInit(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls): cls.mod = _load_mod()
|
|
||||||
|
|
||||||
def test_instantiates(self):
|
|
||||||
self.assertIsNotNone(_make_node(self.mod))
|
|
||||||
|
|
||||||
def test_initial_state_idle(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
self.assertEqual(node.state, "idle")
|
|
||||||
|
|
||||||
def test_publishes_initial_idle(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
pub = _pub(node)
|
|
||||||
self.assertEqual(pub.msgs[0].data, "idle")
|
|
||||||
|
|
||||||
def test_publisher_registered(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
self.assertIn("/saltybot/recording_status", node._pubs)
|
|
||||||
|
|
||||||
def test_subscriber_registered(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
self.assertIn("/saltybot/record_trigger", node._subs)
|
|
||||||
|
|
||||||
def test_timer_registered(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
self.assertGreater(len(node._timers), 0)
|
|
||||||
|
|
||||||
def test_custom_topics(self):
|
|
||||||
node = _make_node(self.mod,
|
|
||||||
trigger_topic="/my/trigger",
|
|
||||||
status_topic="/my/status")
|
|
||||||
self.assertIn("/my/trigger", node._subs)
|
|
||||||
self.assertIn("/my/status", node._pubs)
|
|
||||||
|
|
||||||
def test_topics_parsed_correctly(self):
|
|
||||||
node = _make_node(self.mod, topics="/a,/b,/c")
|
|
||||||
self.assertEqual(node._topics, ["/a", "/b", "/c"])
|
|
||||||
|
|
||||||
def test_empty_topics_means_all(self):
|
|
||||||
node = _make_node(self.mod, topics="")
|
|
||||||
self.assertEqual(node._topics, [])
|
|
||||||
|
|
||||||
def test_auto_stop_s_stored(self):
|
|
||||||
node = _make_node(self.mod, auto_stop_s=30.0)
|
|
||||||
self.assertAlmostEqual(node._auto_stop_s, 30.0)
|
|
||||||
|
|
||||||
def test_current_bag_empty_initially(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
self.assertEqual(node.current_bag, "")
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tests: _on_trigger ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestOnTrigger(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls): cls.mod = _load_mod()
|
|
||||||
|
|
||||||
def test_stores_true_trigger(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
_trigger(node, True)
|
|
||||||
with node._lock:
|
|
||||||
self.assertTrue(node._trigger)
|
|
||||||
|
|
||||||
def test_stores_false_trigger(self):
|
|
||||||
node = _make_node(self.mod)
|
|
||||||
_trigger(node, False)
|
|
||||||
with node._lock:
|
|
||||||
self.assertFalse(node._trigger)
|
|
||||||
|
|
||||||
def test_trigger_consumed_after_poll(self):
|
|
||||||
rec = _MockRecorder()
|
|
||||||
node = _make_node(self.mod, recorder=rec)
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
with node._lock:
|
|
||||||
self.assertIsNone(node._trigger)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tests: poll loop — full state machine ────────────────────────────────────
|
|
||||||
|
|
||||||
class TestPollCb(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls): cls.mod = _load_mod()
|
|
||||||
|
|
||||||
def _node(self, **kwargs):
|
|
||||||
rec = _MockRecorder()
|
|
||||||
node = _make_node(self.mod, recorder=rec, **kwargs)
|
|
||||||
return node, rec
|
|
||||||
|
|
||||||
def test_true_trigger_starts_recording(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertEqual(node.state, "recording")
|
|
||||||
self.assertIn("start", rec.call_types())
|
|
||||||
|
|
||||||
def test_recording_publishes_status(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertEqual(_pub(node).msgs[-1].data, "recording")
|
|
||||||
|
|
||||||
def test_false_trigger_stops_recording(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb() # → recording
|
|
||||||
_trigger(node, False); node._poll_cb() # → stopping
|
|
||||||
self.assertEqual(node.state, "stopping")
|
|
||||||
self.assertIn("stop", rec.call_types())
|
|
||||||
|
|
||||||
def test_stopping_publishes_status(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
_trigger(node, False); node._poll_cb()
|
|
||||||
self.assertEqual(_pub(node).msgs[-1].data, "stopping")
|
|
||||||
|
|
||||||
def test_after_stop_proc_exit_idle(self):
|
|
||||||
"""Once the mock recorder stops, next poll resolves to idle."""
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb() # recording
|
|
||||||
_trigger(node, False); node._poll_cb() # stopping (rec.stop() sets running=False)
|
|
||||||
node._poll_cb() # proc not running → idle
|
|
||||||
self.assertEqual(node.state, "idle")
|
|
||||||
|
|
||||||
def test_idle_publishes_after_stop(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
_trigger(node, False); node._poll_cb()
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertEqual(_pub(node).msgs[-1].data, "idle")
|
|
||||||
|
|
||||||
def test_auto_stop_triggers_stopping(self):
|
|
||||||
node, rec = self._node(auto_stop_s=1.0)
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb() # → recording
|
|
||||||
# Back-date start time so auto-stop fires
|
|
||||||
with node._lock:
|
|
||||||
node._record_start_t = time.monotonic() - 10.0
|
|
||||||
node._poll_cb() # → stopping
|
|
||||||
self.assertEqual(node.state, "stopping")
|
|
||||||
|
|
||||||
def test_auto_stop_disabled(self):
|
|
||||||
node, rec = self._node(auto_stop_s=0.0)
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
with node._lock:
|
|
||||||
node._record_start_t = time.monotonic() - 9999.0
|
|
||||||
node._poll_cb()
|
|
||||||
# Should still be recording (auto-stop disabled)
|
|
||||||
self.assertEqual(node.state, "recording")
|
|
||||||
|
|
||||||
def test_proc_dies_error(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb() # → recording
|
|
||||||
rec._running = False # simulate unexpected exit
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertEqual(node.state, "error")
|
|
||||||
|
|
||||||
def test_error_publishes_status(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
rec._running = False
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertEqual(_pub(node).msgs[-1].data, "error")
|
|
||||||
|
|
||||||
def test_error_retries_on_true_trigger(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb() # recording
|
|
||||||
rec._running = False
|
|
||||||
node._poll_cb() # error
|
|
||||||
_trigger(node, True); node._poll_cb() # retry → recording
|
|
||||||
self.assertEqual(node.state, "recording")
|
|
||||||
|
|
||||||
def test_start_failure_enters_error(self):
|
|
||||||
rec = _MockRecorder(start_succeeds=False)
|
|
||||||
node = _make_node(self.mod, recorder=rec)
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertEqual(node.state, "error")
|
|
||||||
|
|
||||||
def test_force_kill_on_stop_timeout(self):
|
|
||||||
"""Stubborn process that ignores SIGINT → force-killed after timeout."""
|
|
||||||
class _StubbornRecorder(_MockRecorder):
|
|
||||||
def stop(self):
|
|
||||||
self.calls.append(("stop",))
|
|
||||||
# Don't set _running = False — simulates process ignoring SIGINT
|
|
||||||
|
|
||||||
stubborn = _StubbornRecorder()
|
|
||||||
node = _make_node(self.mod, recorder=stubborn, stop_timeout_s=2.0)
|
|
||||||
_trigger(node, True); node._poll_cb() # → recording
|
|
||||||
_trigger(node, False); node._poll_cb() # → stopping (process stays alive)
|
|
||||||
self.assertEqual(node.state, "stopping")
|
|
||||||
# Expire the stop timeout
|
|
||||||
with node._lock:
|
|
||||||
node._stop_start_t = time.monotonic() - 10.0
|
|
||||||
node._poll_cb() # → force kill → idle
|
|
||||||
self.assertIn("kill", stubborn.call_types())
|
|
||||||
self.assertEqual(node.state, "idle")
|
|
||||||
|
|
||||||
def test_bag_path_set_when_recording(self):
|
|
||||||
node, rec = self._node(bag_prefix="mytest")
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertIn("mytest", node.current_bag)
|
|
||||||
|
|
||||||
def test_bag_path_cleared_after_idle(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
_trigger(node, False); node._poll_cb()
|
|
||||||
node._poll_cb()
|
|
||||||
self.assertEqual(node.current_bag, "")
|
|
||||||
|
|
||||||
def test_topics_passed_to_recorder(self):
|
|
||||||
rec = _MockRecorder()
|
|
||||||
node = _make_node(self.mod, recorder=rec, topics="/a,/b")
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
start_calls = [c for c in rec.calls if c[0] == "start"]
|
|
||||||
self.assertEqual(len(start_calls), 1)
|
|
||||||
self.assertEqual(start_calls[0][1], ["/a", "/b"])
|
|
||||||
|
|
||||||
def test_empty_topics_passes_empty_list(self):
|
|
||||||
rec = _MockRecorder()
|
|
||||||
node = _make_node(self.mod, recorder=rec, topics="")
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
start_calls = [c for c in rec.calls if c[0] == "start"]
|
|
||||||
self.assertEqual(start_calls[0][1], [])
|
|
||||||
|
|
||||||
def test_recorder_reset_on_idle(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
_trigger(node, False); node._poll_cb()
|
|
||||||
node._poll_cb() # idle
|
|
||||||
self.assertIn("reset", rec.call_types())
|
|
||||||
|
|
||||||
def test_full_lifecycle_status_sequence(self):
|
|
||||||
"""idle → recording → stopping → idle."""
|
|
||||||
node, rec = self._node()
|
|
||||||
pub = _pub(node)
|
|
||||||
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
_trigger(node, False); node._poll_cb()
|
|
||||||
node._poll_cb()
|
|
||||||
|
|
||||||
statuses = [m.data for m in pub.msgs]
|
|
||||||
self.assertIn("idle", statuses)
|
|
||||||
self.assertIn("recording", statuses)
|
|
||||||
self.assertIn("stopping", statuses)
|
|
||||||
|
|
||||||
def test_logging_on_start(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True)
|
|
||||||
node._poll_cb()
|
|
||||||
infos = [m for lvl, m in node._logs if lvl == "INFO"]
|
|
||||||
self.assertTrue(any("recording" in m.lower() for m in infos))
|
|
||||||
|
|
||||||
def test_logging_on_stop(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
_trigger(node, False); node._poll_cb()
|
|
||||||
infos = [m for lvl, m in node._logs if lvl == "INFO"]
|
|
||||||
self.assertTrue(any("stop" in m.lower() for m in infos))
|
|
||||||
|
|
||||||
def test_logging_on_error(self):
|
|
||||||
node, rec = self._node()
|
|
||||||
_trigger(node, True); node._poll_cb()
|
|
||||||
rec._running = False
|
|
||||||
node._poll_cb()
|
|
||||||
errors = [m for lvl, m in node._logs if lvl == "ERROR"]
|
|
||||||
self.assertTrue(len(errors) > 0)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tests: source content ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestNodeSrc(unittest.TestCase):
|
|
||||||
@classmethod
|
|
||||||
def setUpClass(cls):
|
|
||||||
with open(_SRC) as f: cls.src = f.read()
|
|
||||||
|
|
||||||
def test_issue_tag(self): self.assertIn("#332", self.src)
|
|
||||||
def test_trigger_topic(self): self.assertIn("/saltybot/record_trigger", self.src)
|
|
||||||
def test_status_topic(self): self.assertIn("/saltybot/recording_status", self.src)
|
|
||||||
def test_status_idle(self): self.assertIn('"idle"', self.src)
|
|
||||||
def test_status_recording(self): self.assertIn('"recording"', self.src)
|
|
||||||
def test_status_stopping(self): self.assertIn('"stopping"', self.src)
|
|
||||||
def test_status_error(self): self.assertIn('"error"', self.src)
|
|
||||||
def test_compute_transition_fn(self): self.assertIn("compute_recording_transition", self.src)
|
|
||||||
def test_bag_recorder_process(self): self.assertIn("BagRecorderProcess", self.src)
|
|
||||||
def test_make_bag_path(self): self.assertIn("make_bag_path", self.src)
|
|
||||||
def test_parse_topics(self): self.assertIn("parse_topics", self.src)
|
|
||||||
def test_auto_stop_param(self): self.assertIn("auto_stop_s", self.src)
|
|
||||||
def test_stop_timeout_param(self): self.assertIn("stop_timeout_s", self.src)
|
|
||||||
def test_compression_param(self): self.assertIn("compression", self.src)
|
|
||||||
def test_subprocess_used(self): self.assertIn("subprocess", self.src)
|
|
||||||
def test_sigint_used(self): self.assertIn("SIGINT", self.src)
|
|
||||||
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
|
|
||||||
def test_recorder_injectable(self): self.assertIn("_recorder", self.src)
|
|
||||||
def test_main_defined(self): self.assertIn("def main", self.src)
|
|
||||||
def test_bool_subscription(self): self.assertIn("Bool", self.src)
|
|
||||||
def test_string_publication(self): self.assertIn("String", self.src)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Tests: config / launch / setup ────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestConfig(unittest.TestCase):
|
|
||||||
_CONFIG = (
|
|
||||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
|
||||||
"saltybot_social/config/rosbag_recorder_params.yaml"
|
|
||||||
)
|
|
||||||
_LAUNCH = (
|
|
||||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
|
||||||
"saltybot_social/launch/rosbag_recorder.launch.py"
|
|
||||||
)
|
|
||||||
_SETUP = (
|
|
||||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
|
||||||
"saltybot_social/setup.py"
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_config_exists(self):
|
|
||||||
import os; self.assertTrue(os.path.exists(self._CONFIG))
|
|
||||||
|
|
||||||
def test_config_auto_stop(self):
|
|
||||||
with open(self._CONFIG) as f: c = f.read()
|
|
||||||
self.assertIn("auto_stop_s", c)
|
|
||||||
|
|
||||||
def test_config_bag_dir(self):
|
|
||||||
with open(self._CONFIG) as f: c = f.read()
|
|
||||||
self.assertIn("bag_dir", c)
|
|
||||||
|
|
||||||
def test_config_topics(self):
|
|
||||||
with open(self._CONFIG) as f: c = f.read()
|
|
||||||
self.assertIn("topics", c)
|
|
||||||
|
|
||||||
def test_config_compression(self):
|
|
||||||
with open(self._CONFIG) as f: c = f.read()
|
|
||||||
self.assertIn("compression", c)
|
|
||||||
|
|
||||||
def test_config_stop_timeout(self):
|
|
||||||
with open(self._CONFIG) as f: c = f.read()
|
|
||||||
self.assertIn("stop_timeout_s", c)
|
|
||||||
|
|
||||||
def test_launch_exists(self):
|
|
||||||
import os; self.assertTrue(os.path.exists(self._LAUNCH))
|
|
||||||
|
|
||||||
def test_launch_auto_stop_arg(self):
|
|
||||||
with open(self._LAUNCH) as f: c = f.read()
|
|
||||||
self.assertIn("auto_stop_s", c)
|
|
||||||
|
|
||||||
def test_launch_topics_arg(self):
|
|
||||||
with open(self._LAUNCH) as f: c = f.read()
|
|
||||||
self.assertIn("topics", c)
|
|
||||||
|
|
||||||
def test_entry_point_in_setup(self):
|
|
||||||
with open(self._SETUP) as f: c = f.read()
|
|
||||||
self.assertIn("rosbag_recorder_node", c)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
Loading…
x
Reference in New Issue
Block a user