feat: ROS2 bag recording manager (Issue #615)

Upgrades saltybot_bag_recorder (Issue #488) with:

- Motion-triggered auto-record: subscribes /cmd_vel, starts on non-zero
  velocity, stops after 30s idle timeout (configurable)
- Auto-split at 1 GB or 10 min via subprocess restart
- USB/NVMe storage selection: ordered priority list, picks first path
  with >= 2 GB free (/media/usb0 -> /media/usb1 -> /mnt/nvme -> ~/bags)
- Disk monitoring: warns at 70%, triggers cleanup of bags >7 days at 80%
- JSON status on /saltybot/bag_recorder/status at 1 Hz
- Services: /saltybot/bag_recorder/{start,stop,split}
  (legacy /saltybot/{start,stop}_recording kept for compatibility)
- bag_policy.py: pure-Python MotionState, DiskInfo, StorageSelector,
  BagPolicy — ROS2-free, fully unit-testable
- 76 unit tests passing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-jetson 2026-03-15 10:12:17 -04:00
parent c0bb4f6276
commit a4da93de7e
4 changed files with 1047 additions and 256 deletions

View File

@ -1,28 +1,33 @@
# bag_recorder.yaml — Bag recording manager config (Issue #615).
bag_recorder: bag_recorder:
ros__parameters: ros__parameters:
# Path where bags are stored (Issue #488: mission logging) storage_paths:
bag_dir: '~/saltybot-data/bags' - '/media/usb0'
- '/media/usb1'
# Topics to record for mission logging (Issue #488) - '/mnt/nvme/saltybot-bags'
- '~/saltybot-data/bags'
topics: topics:
- '/scan' - '/scan'
- '/cmd_vel' - '/cmd_vel'
- '/odom' - '/odom'
- '/tf' - '/tf'
- '/camera/color/image_raw/compressed' - '/tf_static'
- '/saltybot/pose/fused_cov'
- '/saltybot/diagnostics' - '/saltybot/diagnostics'
- '/saltybot/sensor_health'
# Rotation interval: save new bag every N minutes (Issue #488: 30 min) - '/saltybot/battery'
buffer_duration_minutes: 30 - '/saltybot/motor_daemon/status'
- '/camera/color/image_raw/compressed'
# Storage management (Issue #488: FIFO 20GB limit) - '/camera/depth/image_rect_raw/compressed'
storage_ttl_days: 7 # Remove bags older than 7 days - '/imu/data'
max_storage_gb: 20 # Enforce 20GB FIFO quota motion_trigger: true
idle_timeout_s: 30.0
# Storage format (Issue #488: prefer MCAP) split_size_gb: 1.0
storage_format: 'mcap' # Options: mcap, sqlite3 split_duration_min: 10.0
storage_format: 'mcap'
# NAS sync (optional) warn_disk_pct: 70.0
enable_rsync: false cleanup_disk_pct: 80.0
rsync_destination: '' cleanup_age_days: 7
# rsync_destination: 'user@nas:/path/to/backups/' min_free_gb: 2.0
status_hz: 1.0
check_hz: 0.033

View File

@ -0,0 +1,293 @@
"""bag_policy.py — Pure-Python recording policy logic (Issue #615).
No ROS2 dependencies importable and fully unit-testable without a live
ROS2 install.
Classes
MotionState tracks /cmd_vel activity and idle timeout
DiskInfo disk-usage snapshot with threshold helpers
StorageSelector picks the best storage path from a priority list
BagPolicy decides when to start/stop/split/clean up
"""
from __future__ import annotations
import math
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
# ── Motion tracking ───────────────────────────────────────────────────────────
class MotionState:
"""Tracks /cmd_vel activity and determines idle timeout.
Usage::
ms = MotionState(idle_timeout_s=30.0)
ms.update(linear_x=0.5, angular_z=0.0, now=time.monotonic())
if ms.should_start_recording(now):
...
if ms.should_stop_recording(now):
...
"""
def __init__(self, idle_timeout_s: float = 30.0) -> None:
if idle_timeout_s <= 0:
raise ValueError(f"idle_timeout_s must be > 0, got {idle_timeout_s!r}")
self._idle_timeout = idle_timeout_s
self._last_motion_t: Optional[float] = None
self._ever_moved: bool = False
# ── Mutators ───────────────────────────────────────────────────────────
def update(self, linear_x: float, angular_z: float, now: float) -> None:
"""Feed the latest cmd_vel values. Non-zero → reset idle timer."""
if abs(linear_x) > 1e-3 or abs(angular_z) > 1e-3:
self._last_motion_t = now
self._ever_moved = True
def reset(self) -> None:
"""Forget all motion history (e.g. after a recording session ends)."""
self._last_motion_t = None
self._ever_moved = False
# ── Queries ────────────────────────────────────────────────────────────
def idle_seconds(self, now: float) -> float:
"""Seconds since last non-zero cmd_vel (inf if never moved)."""
if self._last_motion_t is None:
return math.inf
return now - self._last_motion_t
def is_idle(self, now: float) -> bool:
"""True if idle_timeout has elapsed since last non-zero cmd_vel."""
return self.idle_seconds(now) >= self._idle_timeout
def is_moving(self, now: float) -> bool:
"""True if non-zero cmd_vel was received within the idle window."""
return self._ever_moved and not self.is_idle(now)
def should_start_recording(self, now: float) -> bool:
"""True when motion is detected (used to trigger auto-record start)."""
return self._ever_moved and not self.is_idle(now)
def should_stop_recording(self, now: float) -> bool:
"""True when idle timeout has elapsed after motion (trigger auto-stop)."""
return self._ever_moved and self.is_idle(now)
@property
def ever_moved(self) -> bool:
return self._ever_moved
@property
def idle_timeout_s(self) -> float:
return self._idle_timeout
# ── Disk information ──────────────────────────────────────────────────────────
@dataclass
class DiskInfo:
"""Snapshot of disk usage for a single mount point.
Construct with :meth:`DiskInfo.from_path` for live data or build
directly from known values for unit tests.
"""
path: str
total_bytes: int
used_bytes: int
free_bytes: int
@classmethod
def from_path(cls, path: str) -> "DiskInfo":
"""Read live disk usage from the OS."""
u = shutil.disk_usage(path)
return cls(path=path, total_bytes=u.total,
used_bytes=u.used, free_bytes=u.free)
# ── Derived properties ─────────────────────────────────────────────────
@property
def used_pct(self) -> float:
"""Percentage of disk space used (0100)."""
if self.total_bytes == 0:
return 100.0
return self.used_bytes / self.total_bytes * 100.0
@property
def free_pct(self) -> float:
return 100.0 - self.used_pct
@property
def free_gb(self) -> float:
return self.free_bytes / (1024 ** 3)
@property
def used_gb(self) -> float:
return self.used_bytes / (1024 ** 3)
@property
def total_gb(self) -> float:
return self.total_bytes / (1024 ** 3)
# ── Threshold helpers ──────────────────────────────────────────────────
def exceeds_pct(self, threshold_pct: float) -> bool:
return self.used_pct >= threshold_pct
def has_min_free(self, min_free_gb: float) -> bool:
return self.free_gb >= min_free_gb
def summary(self) -> str:
return (f"{self.path}: {self.used_gb:.1f}/{self.total_gb:.1f} GB "
f"({self.used_pct:.1f}% used, {self.free_gb:.1f} GB free)")
# ── Storage path selection ────────────────────────────────────────────────────
class StorageSelector:
"""Selects the best storage path from a priority-ordered list.
Paths are tried in order; the first accessible path with at least
``min_free_gb`` of free space is returned. Typical priority order:
USB NVMe internal home directory.
Example::
sel = StorageSelector(['/media/usb0', '/mnt/nvme', '~/bags'])
path = sel.select() # e.g. '/media/usb0'
"""
DEFAULT_MIN_FREE_GB = 2.0
def __init__(self,
paths: List[str],
min_free_gb: float = DEFAULT_MIN_FREE_GB) -> None:
if not paths:
raise ValueError("paths list must not be empty")
self._paths = [str(Path(p).expanduser()) for p in paths]
self._min_free_gb = min_free_gb
def select(self) -> Optional[str]:
"""Return the best available storage path, or None if none qualify."""
for path in self._paths:
p = Path(path)
if not p.exists():
continue
try:
info = DiskInfo.from_path(path)
if info.has_min_free(self._min_free_gb):
return path
except OSError:
continue
return None
def select_or_default(self, default: str) -> str:
"""Return the best path, falling back to ``default`` if none qualify."""
return self.select() or default
def disk_infos(self) -> List[DiskInfo]:
"""Return DiskInfo for all accessible paths (for status publishing)."""
result = []
for path in self._paths:
if Path(path).exists():
try:
result.append(DiskInfo.from_path(path))
except OSError:
pass
return result
# ── Recording policy ──────────────────────────────────────────────────────────
class BagPolicy:
"""Encapsulates all recording policy thresholds (Issue #615).
Separates business logic from ROS2 so it can be unit-tested without a
live ROS2 install.
Parameters
split_size_gb float Split bag when it reaches this size. Default 1.0
split_duration_min float Split bag after this many minutes. Default 10.0
cleanup_age_days int Bags older than this are cleanup candidates. Default 7
cleanup_disk_pct float Trigger cleanup when disk exceeds this %. Default 80.0
warn_disk_pct float Log a warning when disk exceeds this %. Default 70.0
min_free_gb float Minimum free space to start recording. Default 2.0
"""
def __init__(
self,
split_size_gb: float = 1.0,
split_duration_min: float = 10.0,
cleanup_age_days: int = 7,
cleanup_disk_pct: float = 80.0,
warn_disk_pct: float = 70.0,
min_free_gb: float = 2.0,
) -> None:
if split_size_gb <= 0:
raise ValueError(f"split_size_gb must be > 0, got {split_size_gb!r}")
if split_duration_min <= 0:
raise ValueError(f"split_duration_min must be > 0, got {split_duration_min!r}")
if cleanup_age_days < 1:
raise ValueError(f"cleanup_age_days must be >= 1, got {cleanup_age_days!r}")
if not (0 < warn_disk_pct < cleanup_disk_pct <= 100):
raise ValueError(
f"Must have 0 < warn_disk_pct({warn_disk_pct}) "
f"< cleanup_disk_pct({cleanup_disk_pct}) <= 100"
)
self.split_size_bytes = int(split_size_gb * 1024 ** 3)
self.split_duration_s = split_duration_min * 60.0
self.cleanup_age_days = cleanup_age_days
self.cleanup_disk_pct = cleanup_disk_pct
self.warn_disk_pct = warn_disk_pct
self.min_free_gb = min_free_gb
# ── Split decision ─────────────────────────────────────────────────────
def needs_split(
self,
bag_size_bytes: int,
bag_duration_s: float,
) -> Tuple[bool, str]:
"""Return (True, reason) if the active bag segment should be split."""
if bag_size_bytes >= self.split_size_bytes:
size_gb = bag_size_bytes / 1024 ** 3
lim_gb = self.split_size_bytes / 1024 ** 3
return True, f"size {size_gb:.2f} GB ≥ {lim_gb:.1f} GB"
if bag_duration_s >= self.split_duration_s:
return True, (
f"duration {bag_duration_s / 60:.1f} min "
f"{self.split_duration_s / 60:.0f} min"
)
return False, ""
# ── Disk thresholds ────────────────────────────────────────────────────
def needs_cleanup(self, disk: DiskInfo) -> bool:
"""True if disk is full enough to warrant deleting old bags."""
return disk.exceeds_pct(self.cleanup_disk_pct)
def should_warn_disk(self, disk: DiskInfo) -> bool:
"""True if disk usage deserves a warning log."""
return disk.exceeds_pct(self.warn_disk_pct)
def can_record(self, disk: DiskInfo) -> bool:
"""True if there is enough free space to start/continue recording."""
return disk.has_min_free(self.min_free_gb)
# ── Age-based cleanup ──────────────────────────────────────────────────
def bag_is_expired(self, bag_age_days: float) -> bool:
"""True if a bag is old enough to be a cleanup candidate."""
return bag_age_days >= self.cleanup_age_days
def bag_age_days(self, bag_mtime: float, now: float) -> float:
"""Compute bag age in fractional days from mtime and current time."""
return (now - bag_mtime) / 86400.0

View File

@ -1,265 +1,383 @@
#!/usr/bin/env python3 """bag_recorder_node.py — ROS2 bag recording manager (Issue #615).
import os Upgrades the Issue #488 basic recorder with:
import signal Motion-triggered auto-record (/cmd_vel non-zero start, 30s idle stop)
import shutil Auto-split at 1 GB or 10 min
import subprocess USB/NVMe storage path selection with disk monitoring
import threading Disk-usage-triggered cleanup (>80% delete bags >7 days old)
import time JSON status topic at 1 Hz
Services (Trigger):
/saltybot/bag_recorder/start /saltybot/bag_recorder/stop
/saltybot/bag_recorder/split (plus legacy /saltybot/start_recording etc.)
Topics:
SUB /cmd_vel (geometry_msgs/Twist)
PUB /saltybot/bag_recorder/status (std_msgs/String, JSON)
"""
from __future__ import annotations
import json, os, shutil, signal, subprocess, threading, time
from datetime import datetime
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Optional from typing import List, Optional
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from std_srvs.srv import Trigger from geometry_msgs.msg import Twist
from std_msgs.msg import String from std_msgs.msg import String
from std_srvs.srv import Trigger
from saltybot_bag_recorder.bag_policy import BagPolicy, DiskInfo, MotionState, StorageSelector
class _BagSession:
"""Tracks one recording session (possibly multiple split segments)."""
def __init__(self, bag_dir: Path, session_name: str) -> None:
self.bag_dir = bag_dir
self.session_name = session_name
self.start_time = time.monotonic()
self.split_count = 0
self.proc: Optional[subprocess.Popen] = None
self.segment_start = time.monotonic()
self._lock = threading.Lock()
def segment_name(self) -> str:
return f"{self.session_name}_s{self.split_count:03d}"
def segment_path(self) -> Path:
return self.bag_dir / self.segment_name()
def segment_size_bytes(self) -> int:
p = self.segment_path()
if not p.exists():
return 0
return sum(f.stat().st_size for f in p.rglob("*") if f.is_file())
def segment_duration_s(self) -> float:
return time.monotonic() - self.segment_start
def session_duration_s(self) -> float:
return time.monotonic() - self.start_time
def is_alive(self) -> bool:
with self._lock:
return self.proc is not None and self.proc.poll() is None
def stop_proc(self, timeout: float = 5.0) -> None:
with self._lock:
if self.proc is None:
return
try:
self.proc.send_signal(signal.SIGINT)
self.proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc.wait()
except OSError:
pass
self.proc = None
class BagRecorderNode(Node): class BagRecorderNode(Node):
"""ROS2 bag recording service for mission logging (Issue #488).""" """ROS2 bag recording manager with motion trigger and disk monitoring (Issue #615)."""
def __init__(self): DEFAULT_TOPICS: List[str] = [
super().__init__('saltybot_bag_recorder') "/scan", "/cmd_vel", "/odom", "/tf", "/tf_static",
"/saltybot/pose/fused_cov", "/saltybot/diagnostics",
"/saltybot/sensor_health", "/saltybot/battery",
"/saltybot/motor_daemon/status",
"/camera/color/image_raw/compressed",
"/camera/depth/image_rect_raw/compressed",
"/imu/data",
]
# Configuration (Issue #488: mission logging) DEFAULT_STORAGE_PATHS: List[str] = [
default_bag_dir = str(Path.home() / 'saltybot-data' / 'bags') "/media/usb0", "/media/usb1",
self.declare_parameter('bag_dir', default_bag_dir) "/mnt/nvme/saltybot-bags",
self.declare_parameter('topics', [ "~/saltybot-data/bags",
'/scan', ]
'/cmd_vel',
'/odom',
'/tf',
'/camera/color/image_raw/compressed',
'/saltybot/diagnostics'
])
self.declare_parameter('buffer_duration_minutes', 30)
self.declare_parameter('storage_ttl_days', 7)
self.declare_parameter('max_storage_gb', 20)
self.declare_parameter('enable_rsync', False)
self.declare_parameter('rsync_destination', '')
self.declare_parameter('storage_format', 'mcap')
self.bag_dir = Path(self.get_parameter('bag_dir').value) def __init__(self) -> None:
self.topics = self.get_parameter('topics').value super().__init__("bag_recorder")
self.buffer_duration = self.get_parameter('buffer_duration_minutes').value * 60
self.storage_ttl_days = self.get_parameter('storage_ttl_days').value
self.max_storage_gb = self.get_parameter('max_storage_gb').value
self.enable_rsync = self.get_parameter('enable_rsync').value
self.rsync_destination = self.get_parameter('rsync_destination').value
self.storage_format = self.get_parameter('storage_format').value
self.bag_dir.mkdir(parents=True, exist_ok=True) self.declare_parameter("storage_paths", self.DEFAULT_STORAGE_PATHS)
self.declare_parameter("topics", self.DEFAULT_TOPICS)
self.declare_parameter("motion_trigger", True)
self.declare_parameter("idle_timeout_s", 30.0)
self.declare_parameter("split_size_gb", 1.0)
self.declare_parameter("split_duration_min", 10.0)
self.declare_parameter("storage_format", "mcap")
self.declare_parameter("warn_disk_pct", 70.0)
self.declare_parameter("cleanup_disk_pct", 80.0)
self.declare_parameter("cleanup_age_days", 7)
self.declare_parameter("min_free_gb", 2.0)
self.declare_parameter("status_hz", 1.0)
self.declare_parameter("check_hz", 0.033)
# Recording state self._topics = list(self.get_parameter("topics").value)
self.is_recording = False self._motion_trigger = self.get_parameter("motion_trigger").value
self.current_bag_process = None self._storage_fmt = self.get_parameter("storage_format").value
self.current_bag_name = None
self.buffer_bags: List[str] = []
self.recording_lock = threading.Lock()
# Services self._policy = BagPolicy(
self.save_service = self.create_service(Trigger, '/saltybot/save_bag', self.save_bag_callback) split_size_gb = self.get_parameter("split_size_gb").value,
self.start_service = self.create_service(Trigger, '/saltybot/start_recording', self.start_recording_callback) split_duration_min = self.get_parameter("split_duration_min").value,
self.stop_service = self.create_service(Trigger, '/saltybot/stop_recording', self.stop_recording_callback) cleanup_age_days = int(self.get_parameter("cleanup_age_days").value),
cleanup_disk_pct = self.get_parameter("cleanup_disk_pct").value,
# Watchdog to handle crash recovery warn_disk_pct = self.get_parameter("warn_disk_pct").value,
self.setup_signal_handlers() min_free_gb = self.get_parameter("min_free_gb").value,
)
# Start recording self._motion = MotionState(
self.start_recording() idle_timeout_s = self.get_parameter("idle_timeout_s").value,
)
# Periodic maintenance (cleanup old bags, manage storage) self._selector = StorageSelector(
self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback) paths = list(self.get_parameter("storage_paths").value),
min_free_gb = self.get_parameter("min_free_gb").value,
self.get_logger().info(
f'Bag recorder initialized: {self.bag_dir}, format={self.storage_format}, '
f'buffer={self.buffer_duration}s, max={self.max_storage_gb}GB, topics={len(self.topics)}'
) )
def setup_signal_handlers(self): self._bag_dir: Optional[Path] = None
"""Setup signal handlers for graceful shutdown and crash recovery.""" self._session: Optional[_BagSession] = None
def signal_handler(sig, frame): self._lock: threading.Lock = threading.Lock()
self.get_logger().warn(f'Signal {sig} received, saving current bag') self._recording: bool = False
self.stop_recording(save=True)
self.cleanup()
rclpy.shutdown()
signal.signal(signal.SIGINT, signal_handler) self._status_pub = self.create_publisher(
signal.signal(signal.SIGTERM, signal_handler) String, "/saltybot/bag_recorder/status", 10)
def start_recording(self): self.create_service(Trigger, "/saltybot/bag_recorder/start", self._svc_start)
"""Start bag recording in the background.""" self.create_service(Trigger, "/saltybot/bag_recorder/stop", self._svc_stop)
with self.recording_lock: self.create_service(Trigger, "/saltybot/bag_recorder/split", self._svc_split)
if self.is_recording: # Legacy (Issue #488 compatibility)
self.create_service(Trigger, "/saltybot/start_recording", self._svc_start)
self.create_service(Trigger, "/saltybot/stop_recording", self._svc_stop)
self.create_service(Trigger, "/saltybot/save_bag", self._svc_split)
if self._motion_trigger:
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
hz = self.get_parameter("status_hz").value
self.create_timer(1.0 / hz, self._publish_status)
chz = self.get_parameter("check_hz").value
self.create_timer(1.0 / chz, self._periodic_check)
self.create_timer(60.0, self._disk_maintenance)
self.get_logger().info(
f"BagRecorder ready | motion={self._motion_trigger} "
f"idle={self._motion.idle_timeout_s:.0f}s "
f"split={self._policy.split_size_bytes//1024**3}GB/"
f"{int(self._policy.split_duration_s//60)}min "
f"fmt={self._storage_fmt} topics={len(self._topics)}"
)
def _on_cmd_vel(self, msg: Twist) -> None:
now = time.monotonic()
self._motion.update(msg.linear.x, msg.angular.z, now)
with self._lock:
if not self._recording and self._motion.should_start_recording(now):
self.get_logger().info("Motion detected — auto-starting bag recording")
self._start_recording_locked()
def _svc_start(self, _req, resp):
with self._lock:
if self._recording:
resp.success = False
resp.message = f"Already recording: {self._session.session_name}"
return resp
ok, msg = self._start_recording_locked()
resp.success = ok
resp.message = msg
return resp
def _svc_stop(self, _req, resp):
with self._lock:
if not self._recording:
resp.success = False
resp.message = "Not recording"
return resp
_, ok, msg = self._stop_recording_locked()
resp.success = ok
resp.message = msg
return resp
def _svc_split(self, _req, resp):
with self._lock:
if not self._recording:
resp.success = False
resp.message = "Not recording — nothing to split"
return resp
ok, msg = self._split_locked()
resp.success = ok
resp.message = msg
return resp
def _periodic_check(self) -> None:
now = time.monotonic()
with self._lock:
if self._recording and self._motion_trigger:
if self._motion.should_stop_recording(now):
self.get_logger().info(
f"Idle {self._motion.idle_seconds(now):.0f}s — auto-stopping")
self._stop_recording_locked()
return return
if self._recording and self._session is not None:
size = self._session.segment_size_bytes()
dur = self._session.segment_duration_s()
split, reason = self._policy.needs_split(size, dur)
if split:
self.get_logger().info(f"Auto-split: {reason}")
self._split_locked()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') def _disk_maintenance(self) -> None:
self.current_bag_name = f'saltybot_{timestamp}' if self._bag_dir is None:
bag_path = self.bag_dir / self.current_bag_name sel = self._selector.select()
if sel is None:
try:
# Build rosbag2 record command
cmd = ['ros2', 'bag', 'record', '--output', str(bag_path), '--storage', self.storage_format]
# Add compression for sqlite3 format
if self.storage_format == 'sqlite3':
cmd.extend(['--compression-format', 'zstd', '--compression-mode', 'file'])
# Add topics (required for mission logging)
if self.topics and self.topics[0]:
cmd.extend(self.topics)
else:
cmd.extend(['/scan', '/cmd_vel', '/odom', '/tf', '/camera/color/image_raw/compressed', '/saltybot/diagnostics'])
self.current_bag_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.is_recording = True
self.buffer_bags.append(self.current_bag_name)
self.get_logger().info(f'Started recording: {self.current_bag_name}')
except Exception as e:
self.get_logger().error(f'Failed to start recording: {e}')
def save_bag_callback(self, request, response):
"""Service callback to manually trigger bag save."""
try:
self.stop_recording(save=True)
self.start_recording()
response.success = True
response.message = f'Saved: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to save bag: {e}'
self.get_logger().error(response.message)
return response
def start_recording_callback(self, request, response):
"""Service callback to start recording."""
if self.is_recording:
response.success = False
response.message = 'Recording already in progress'
return response
try:
self.start_recording()
response.success = True
response.message = f'Recording started: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to start recording: {e}'
self.get_logger().error(response.message)
return response
def stop_recording_callback(self, request, response):
"""Service callback to stop recording."""
if not self.is_recording:
response.success = False
response.message = 'No recording in progress'
return response
try:
self.stop_recording(save=True)
response.success = True
response.message = f'Recording stopped and saved: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to stop recording: {e}'
self.get_logger().error(response.message)
return response
def stop_recording(self, save: bool = False):
"""Stop the current bag recording."""
with self.recording_lock:
if not self.is_recording or not self.current_bag_process:
return return
self._bag_dir = Path(sel)
try: try:
self.current_bag_process.send_signal(signal.SIGINT) disk = DiskInfo.from_path(str(self._bag_dir))
self.current_bag_process.wait(timeout=5.0) except OSError as exc:
except subprocess.TimeoutExpired: self.get_logger().warn(f"Disk check failed: {exc}")
self.get_logger().warn(f'Force terminating {self.current_bag_name}')
self.current_bag_process.kill()
self.current_bag_process.wait()
except Exception as e:
self.get_logger().error(f'Error stopping recording: {e}')
self.is_recording = False
self.get_logger().info(f'Stopped recording: {self.current_bag_name}')
if save:
self.apply_compression()
def apply_compression(self):
"""Compress the current bag using zstd."""
if not self.current_bag_name:
return return
bag_path = self.bag_dir / self.current_bag_name if self._policy.should_warn_disk(disk):
try: self.get_logger().warn(
tar_path = f'{bag_path}.tar.zst' f"Disk {disk.used_pct:.1f}% > {self._policy.warn_disk_pct:.0f}%")
if bag_path.exists(): if self._policy.needs_cleanup(disk):
cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name] self.get_logger().warn(
subprocess.run(cmd, check=True, capture_output=True, timeout=60) f"Disk {disk.used_pct:.1f}% >= {self._policy.cleanup_disk_pct:.0f}% — cleaning")
self.get_logger().info(f'Compressed: {tar_path}') self._cleanup_expired_bags()
except Exception as e:
self.get_logger().warn(f'Compression skipped: {e}')
def maintenance_callback(self): def _start_recording_locked(self):
"""Periodic maintenance: cleanup old bags and manage storage.""" sel = self._selector.select()
self.cleanup_expired_bags() if sel is None:
self.enforce_storage_quota() msg = f"No storage path with >={self._policy.min_free_gb:.1f}GB free"
if self.enable_rsync and self.rsync_destination: self.get_logger().error(msg)
self.rsync_bags() return False, msg
self._bag_dir = Path(sel)
self._bag_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
self._session = _BagSession(self._bag_dir, f"saltybot_{ts}")
ok, msg = self._launch_ros2bag_locked()
if ok:
self._recording = True
self.get_logger().info(
f"Recording: {self._session.segment_name()} -> {self._bag_dir}")
return ok, msg
def cleanup_expired_bags(self): def _stop_recording_locked(self):
"""Remove bags older than TTL.""" if self._session is None:
try: return None, False, "No active session"
cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days) name = self._session.session_name
for item in self.bag_dir.iterdir(): self._session.stop_proc()
if item.is_dir() and item.name.startswith('saltybot_'): self._recording = False
try: self.get_logger().info(f"Stopped: {name}")
timestamp_str = item.name.replace('saltybot_', '') return name, True, f"Stopped: {name}"
item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
if item_time < cutoff_time:
shutil.rmtree(item, ignore_errors=True)
self.get_logger().info(f'Removed expired bag: {item.name}')
except (ValueError, OSError) as e:
self.get_logger().warn(f'Error processing {item.name}: {e}')
except Exception as e:
self.get_logger().error(f'Cleanup failed: {e}')
def enforce_storage_quota(self): def _split_locked(self):
"""Remove oldest bags if total size exceeds quota (FIFO).""" if self._session is None:
return False, "No active session"
old = self._session.segment_name()
self._session.stop_proc()
self._session.split_count += 1
self._session.segment_start = time.monotonic()
ok, msg = self._launch_ros2bag_locked()
if ok:
self.get_logger().info(f"Split: {old} -> {self._session.segment_name()}")
return True, f"Split: {old} -> {self._session.segment_name()}"
return ok, msg
def _launch_ros2bag_locked(self):
seg_path = self._session.segment_path()
cmd = ["ros2", "bag", "record",
"--output", str(seg_path),
"--storage", self._storage_fmt]
if self._storage_fmt == "sqlite3":
cmd += ["--compression-format", "zstd", "--compression-mode", "file"]
cmd += self._topics
try: try:
total_size = sum(f.stat().st_size for f in self.bag_dir.rglob('*') if f.is_file()) / (1024 ** 3) self._session.proc = subprocess.Popen(
if total_size > self.max_storage_gb: cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
self.get_logger().warn(f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB') preexec_fn=os.setsid)
bags = sorted([d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')], key=lambda x: x.stat().st_mtime) return True, f"Recording: {self._session.segment_name()}"
for bag in bags: except Exception as exc:
if total_size <= self.max_storage_gb: msg = f"Failed to launch ros2 bag record: {exc}"
self.get_logger().error(msg)
return False, msg
def _cleanup_expired_bags(self) -> None:
if self._bag_dir is None or not self._bag_dir.exists():
return
now = time.time()
for item in sorted(self._bag_dir.iterdir(),
key=lambda p: p.stat().st_mtime):
if not item.is_dir() or not item.name.startswith("saltybot_"):
continue
if (self._session and
item.name.startswith(self._session.session_name)):
continue
age = self._policy.bag_age_days(item.stat().st_mtime, now)
if self._policy.bag_is_expired(age):
try:
shutil.rmtree(item)
self.get_logger().info(
f"Cleanup: removed {item.name} (age {age:.1f}d)")
try:
disk = DiskInfo.from_path(str(self._bag_dir))
if not self._policy.needs_cleanup(disk):
break break
bag_size = sum(f.stat().st_size for f in bag.rglob('*') if f.is_file()) / (1024 ** 3) except OSError:
shutil.rmtree(bag, ignore_errors=True) break
total_size -= bag_size except OSError as exc:
self.get_logger().info(f'Removed bag to enforce quota: {bag.name}') self.get_logger().warn(f"Cleanup failed {item.name}: {exc}")
except Exception as e:
self.get_logger().error(f'Storage quota enforcement failed: {e}')
def rsync_bags(self): def _publish_status(self) -> None:
"""Optional: rsync bags to NAS.""" with self._lock:
s_name = self._session.session_name if self._session else None
seg = self._session.segment_name() if self._session else None
splits = self._session.split_count if self._session else 0
seg_sz = self._session.segment_size_bytes() if self._session else 0
s_dur = self._session.session_duration_s() if self._session else 0.0
seg_dur = self._session.segment_duration_s() if self._session else 0.0
rec = self._recording
bdir = str(self._bag_dir) if self._bag_dir else None
now = time.monotonic()
idle = self._motion.idle_seconds(now)
disk_status = {}
if self._bag_dir and self._bag_dir.exists():
try: try:
cmd = ['rsync', '-avz', '--delete', f'{self.bag_dir}/', self.rsync_destination] di = DiskInfo.from_path(str(self._bag_dir))
subprocess.run(cmd, check=False, timeout=300) disk_status = {
self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}') "path": bdir,
except subprocess.TimeoutExpired: "used_pct": round(di.used_pct, 1),
self.get_logger().warn('Rsync timed out') "free_gb": round(di.free_gb, 2),
except Exception as e: "total_gb": round(di.total_gb, 2),
self.get_logger().error(f'Rsync failed: {e}') "warn": self._policy.should_warn_disk(di),
"cleanup": self._policy.needs_cleanup(di),
}
except OSError:
pass
payload = {
"recording": rec,
"session": s_name,
"segment": seg,
"split_count": splits,
"segment_size_mb": round(seg_sz / 1e6, 1),
"session_dur_s": round(s_dur, 1),
"segment_dur_s": round(seg_dur, 1),
"motion_idle_s": round(idle, 1) if idle != float("inf") else None,
"motion_trigger": self._motion_trigger,
"bag_dir": bdir,
"disk": disk_status,
}
self._status_pub.publish(String(data=json.dumps(payload)))
def cleanup(self): def _shutdown(self) -> None:
"""Cleanup resources.""" with self._lock:
self.stop_recording(save=True) if self._recording:
self.get_logger().info('Bag recorder shutdown complete') self._stop_recording_locked()
def main(args=None): def main(args=None) -> None:
rclpy.init(args=args) rclpy.init(args=args)
node = BagRecorderNode() node = BagRecorderNode()
try: try:
@ -267,10 +385,6 @@ def main(args=None):
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally: finally:
node.cleanup() node._shutdown()
node.destroy_node() node.destroy_node()
rclpy.shutdown() rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,379 @@
"""test_bag_policy.py — Unit tests for bag recording policy (Issue #615).
ROS2-free. Run with:
python3 -m pytest \
jetson/ros2_ws/src/saltybot_bag_recorder/test/test_bag_policy.py -v
"""
from __future__ import annotations
import math
import os
import sys
from unittest.mock import MagicMock, patch
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..",
"saltybot_bag_recorder"))
import pytest
from bag_policy import BagPolicy, DiskInfo, MotionState, StorageSelector
# ─── MotionState ──────────────────────────────────────────────────────────────
class TestMotionStateInit:
def test_default_idle_timeout(self):
assert MotionState().idle_timeout_s == 30.0
def test_custom_idle_timeout(self):
assert MotionState(idle_timeout_s=60.0).idle_timeout_s == 60.0
def test_zero_timeout_raises(self):
with pytest.raises(ValueError, match="idle_timeout_s"):
MotionState(idle_timeout_s=0.0)
def test_negative_timeout_raises(self):
with pytest.raises(ValueError):
MotionState(idle_timeout_s=-5.0)
def test_initial_not_ever_moved(self):
assert not MotionState().ever_moved
def test_initial_idle_seconds_inf(self):
assert MotionState().idle_seconds(1000.0) == math.inf
def test_initial_is_idle(self):
assert MotionState().is_idle(1000.0)
def test_initial_not_moving(self):
assert not MotionState().is_moving(1000.0)
class TestMotionStateUpdate:
def setup_method(self):
self.ms = MotionState(idle_timeout_s=30.0)
def test_nonzero_linear_sets_moved(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.ever_moved
def test_nonzero_angular_sets_moved(self):
self.ms.update(0.0, 0.3, 100.0)
assert self.ms.ever_moved
def test_zero_cmd_vel_does_not_set_moved(self):
self.ms.update(0.0, 0.0, 100.0)
assert not self.ms.ever_moved
def test_noise_below_threshold_ignored(self):
self.ms.update(0.0005, 0.0005, 100.0)
assert not self.ms.ever_moved
def test_idle_seconds_after_motion(self):
self.ms.update(0.5, 0.0, 100.0)
assert abs(self.ms.idle_seconds(105.0) - 5.0) < 1e-9
def test_not_idle_immediately(self):
self.ms.update(0.5, 0.0, 100.0)
assert not self.ms.is_idle(100.0)
def test_not_idle_within_timeout(self):
self.ms.update(0.5, 0.0, 100.0)
assert not self.ms.is_idle(129.9)
def test_idle_at_boundary(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.is_idle(130.0)
def test_idle_after_timeout(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.is_idle(200.0)
def test_moving_within_window(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.is_moving(115.0)
def test_not_moving_after_timeout(self):
self.ms.update(0.5, 0.0, 100.0)
assert not self.ms.is_moving(200.0)
def test_second_motion_resets_timer(self):
self.ms.update(0.5, 0.0, 100.0)
self.ms.update(0.5, 0.0, 125.0)
assert not self.ms.is_idle(150.0) # 25s since last motion
assert self.ms.is_idle(160.0) # 35s since last motion
class TestMotionStateAutoDecisions:
def setup_method(self):
self.ms = MotionState(idle_timeout_s=30.0)
def test_no_start_before_motion(self):
assert not self.ms.should_start_recording(1000.0)
def test_start_after_motion(self):
self.ms.update(0.5, 0.0, 1000.0)
assert self.ms.should_start_recording(1001.0)
def test_no_start_after_idle(self):
self.ms.update(0.5, 0.0, 1000.0)
assert not self.ms.should_start_recording(1040.0)
def test_no_stop_before_motion(self):
assert not self.ms.should_stop_recording(1000.0)
def test_no_stop_while_active(self):
self.ms.update(0.5, 0.0, 1000.0)
assert not self.ms.should_stop_recording(1010.0)
def test_stop_after_idle(self):
self.ms.update(0.5, 0.0, 1000.0)
assert self.ms.should_stop_recording(1035.0)
class TestMotionStateReset:
def test_reset_clears_motion(self):
ms = MotionState()
ms.update(1.0, 0.0, 100.0)
ms.reset()
assert not ms.ever_moved
assert ms.idle_seconds(200.0) == math.inf
# ─── DiskInfo ─────────────────────────────────────────────────────────────────
def _disk(total_gb=100.0, free_gb=50.0, path="/fake"):
total = int(total_gb * 1024**3)
free = int(free_gb * 1024**3)
used = total - free
return DiskInfo(path=path, total_bytes=total, used_bytes=used, free_bytes=free)
class TestDiskInfo:
def test_used_pct_50(self):
assert abs(_disk(100, 50).used_pct - 50.0) < 0.01
def test_used_pct_100(self):
assert _disk(100, 0).used_pct == 100.0
def test_used_pct_0(self):
assert abs(_disk(100, 100).used_pct) < 0.01
def test_free_pct(self):
assert abs(_disk(100, 30).free_pct - 30.0) < 0.01
def test_free_gb(self):
assert abs(_disk(100, 20).free_gb - 20.0) < 0.01
def test_total_gb(self):
assert abs(_disk(250, 100).total_gb - 250.0) < 0.01
def test_used_gb(self):
assert abs(_disk(100, 30).used_gb - 70.0) < 0.01
def test_exceeds_pct_true(self):
assert _disk(100, 10).exceeds_pct(80.0) # 90% used
def test_exceeds_pct_false(self):
assert not _disk(100, 50).exceeds_pct(80.0) # 50% used
def test_exceeds_pct_boundary(self):
assert _disk(100, 20).exceeds_pct(80.0) # exactly 80%
def test_has_min_free_true(self):
assert _disk(100, 10).has_min_free(2.0)
def test_has_min_free_false(self):
assert not _disk(100, 1).has_min_free(2.0)
def test_zero_total_is_100pct(self):
di = DiskInfo(path="/", total_bytes=0, used_bytes=0, free_bytes=0)
assert di.used_pct == 100.0
def test_summary_has_path(self):
di = _disk(path="/media/usb0")
assert "/media/usb0" in di.summary()
# ─── StorageSelector ─────────────────────────────────────────────────────────
class TestStorageSelector:
def test_empty_paths_raises(self):
with pytest.raises(ValueError, match="paths"):
StorageSelector([])
def test_select_none_when_no_paths_exist(self):
sel = StorageSelector(["/nonexistent/a", "/nonexistent/b"])
assert sel.select() is None
def test_select_first_accessible(self, tmp_path):
p1 = tmp_path / "p1"; p1.mkdir()
p2 = tmp_path / "p2"; p2.mkdir()
sel = StorageSelector([str(p1), str(p2)], min_free_gb=0.0)
assert sel.select() == str(p1)
def test_select_skips_insufficient_free(self, tmp_path):
small = tmp_path / "small"; small.mkdir()
large = tmp_path / "large"; large.mkdir()
def fake_from(path):
if "small" in path:
return DiskInfo(path=path, total_bytes=10*1024**3,
used_bytes=9*1024**3, free_bytes=1*1024**3)
return DiskInfo(path=path, total_bytes=100*1024**3,
used_bytes=10*1024**3, free_bytes=90*1024**3)
with patch("bag_policy.DiskInfo.from_path", side_effect=fake_from):
sel = StorageSelector([str(small), str(large)], min_free_gb=2.0)
assert sel.select() == str(large)
def test_select_or_default_fallback(self):
sel = StorageSelector(["/nonexistent"], min_free_gb=1.0)
assert sel.select_or_default("/fb") == "/fb"
def test_disk_infos_empty_nonexistent(self):
sel = StorageSelector(["/nonexistent"])
assert sel.disk_infos() == []
# ─── BagPolicy constructor ────────────────────────────────────────────────────
class TestBagPolicyConstructor:
def test_defaults(self):
p = BagPolicy()
assert p.split_size_bytes == 1 * 1024**3
assert p.split_duration_s == 600.0
assert p.cleanup_age_days == 7
assert p.cleanup_disk_pct == 80.0
assert p.warn_disk_pct == 70.0
def test_size_stored_as_bytes(self):
assert BagPolicy(split_size_gb=2.0).split_size_bytes == 2 * 1024**3
def test_duration_stored_as_seconds(self):
assert BagPolicy(split_duration_min=5.0).split_duration_s == 300.0
def test_zero_size_raises(self):
with pytest.raises(ValueError, match="split_size_gb"):
BagPolicy(split_size_gb=0.0)
def test_negative_size_raises(self):
with pytest.raises(ValueError):
BagPolicy(split_size_gb=-1.0)
def test_zero_duration_raises(self):
with pytest.raises(ValueError, match="split_duration_min"):
BagPolicy(split_duration_min=0.0)
def test_cleanup_age_zero_raises(self):
with pytest.raises(ValueError, match="cleanup_age_days"):
BagPolicy(cleanup_age_days=0)
def test_warn_above_cleanup_raises(self):
with pytest.raises(ValueError):
BagPolicy(warn_disk_pct=85.0, cleanup_disk_pct=80.0)
def test_equal_warn_cleanup_raises(self):
with pytest.raises(ValueError):
BagPolicy(warn_disk_pct=80.0, cleanup_disk_pct=80.0)
# ─── BagPolicy split ─────────────────────────────────────────────────────────
class TestBagPolicySplit:
def setup_method(self):
self.p = BagPolicy(split_size_gb=1.0, split_duration_min=10.0)
def test_no_split_small_short(self):
split, reason = self.p.needs_split(100*1024**2, 60)
assert not split and reason == ""
def test_split_at_size_boundary(self):
split, reason = self.p.needs_split(1024**3, 60)
assert split
assert "size" in reason.lower() or "gb" in reason.lower()
def test_split_over_size(self):
split, _ = self.p.needs_split(int(1.5*1024**3), 60)
assert split
def test_split_at_duration_boundary(self):
split, reason = self.p.needs_split(100*1024**2, 600)
assert split
assert "min" in reason.lower() or "duration" in reason.lower()
def test_split_over_duration(self):
split, _ = self.p.needs_split(100*1024**2, 700)
assert split
def test_just_below_both(self):
split, _ = self.p.needs_split(1024**3 - 1, 599)
assert not split
def test_both_exceeded_still_splits(self):
split, _ = self.p.needs_split(2*1024**3, 700)
assert split
# ─── BagPolicy disk thresholds ────────────────────────────────────────────────
class TestBagPolicyDisk:
def setup_method(self):
self.p = BagPolicy(warn_disk_pct=70.0, cleanup_disk_pct=80.0,
min_free_gb=2.0)
def test_no_warn_below(self):
assert not self.p.should_warn_disk(_disk(100, 40)) # 60% used
def test_warn_at_boundary(self):
assert self.p.should_warn_disk(_disk(100, 30)) # 70% used
def test_warn_above(self):
assert self.p.should_warn_disk(_disk(100, 25)) # 75% used
def test_no_cleanup_at_warn(self):
assert not self.p.needs_cleanup(_disk(100, 30)) # 70% used
def test_cleanup_at_boundary(self):
assert self.p.needs_cleanup(_disk(100, 20)) # 80% used
def test_cleanup_above(self):
assert self.p.needs_cleanup(_disk(100, 5)) # 95% used
def test_can_record_exact_min_free(self):
total = 100 * 1024**3
free = int(2.0 * 1024**3)
di = DiskInfo(path="/", total_bytes=total,
used_bytes=total-free, free_bytes=free)
assert self.p.can_record(di)
def test_cannot_record_below_min_free(self):
total = 100 * 1024**3
free = int(1.0 * 1024**3)
di = DiskInfo(path="/", total_bytes=total,
used_bytes=total-free, free_bytes=free)
assert not self.p.can_record(di)
# ─── BagPolicy age ────────────────────────────────────────────────────────────
class TestBagPolicyAge:
def setup_method(self):
self.p = BagPolicy(cleanup_age_days=7)
def test_fresh_not_expired(self):
assert not self.p.bag_is_expired(1.0)
def test_at_boundary_expired(self):
assert self.p.bag_is_expired(7.0)
def test_old_expired(self):
assert self.p.bag_is_expired(30.0)
def test_age_computation(self):
mtime = 1000.0
age = self.p.bag_age_days(mtime, mtime + 8 * 86400)
assert abs(age - 8.0) < 1e-9
def test_age_zero(self):
assert self.p.bag_age_days(5000.0, 5000.0) == 0.0