feat: ROS2 bag recording service (Issue #411)

Implement circular buffer bag recorder with:
- Configurable topics recording
- Last N minutes circular buffer (default 30min)
- Manual save trigger via /saltybot/save_bag service
- Auto-save on crash with signal handlers
- Storage management (7-day TTL, 50GB quota)
- Compression via zstd
- Optional rsync to NAS for backup
- Periodic maintenance (cleanup expired, enforce quota)

Saves to /home/seb/rosbags/ by default.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
sl-jetson 2026-03-04 22:40:19 -05:00
parent 9257f4c7de
commit 88deacb337
11 changed files with 432 additions and 0 deletions

View File

@ -0,0 +1,9 @@
build/
install/
log/
*.pyc
__pycache__/
.pytest_cache/
*.egg-info/
dist/
*.egg

View File

@ -0,0 +1,26 @@
bag_recorder:
ros__parameters:
# Path where bags are stored
bag_dir: '/home/seb/rosbags'
# Topics to record (empty list = record all)
topics: []
# topics:
# - '/camera/image_raw'
# - '/lidar/scan'
# - '/odom'
# Circular buffer duration (minutes)
buffer_duration_minutes: 30
# Storage management
storage_ttl_days: 7 # Remove bags older than 7 days
max_storage_gb: 50 # Enforce 50GB quota
# Compression
compression: 'zstd' # Options: zstd, zstandard
# NAS sync (optional)
enable_rsync: false
rsync_destination: ''
# rsync_destination: 'user@nas:/path/to/backups/'

View File

@ -0,0 +1,23 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
pkg_dir = get_package_share_directory('saltybot_bag_recorder')
config_file = os.path.join(pkg_dir, 'config', 'bag_recorder.yaml')
bag_recorder_node = Node(
package='saltybot_bag_recorder',
executable='bag_recorder',
name='bag_recorder',
parameters=[config_file],
output='screen',
respawn=True,
respawn_delay=5,
)
return LaunchDescription([
bag_recorder_node,
])

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_bag_recorder</name>
<version>0.1.0</version>
<description>
ROS2 bag recording service with circular buffer, auto-save on crash, and storage management.
Configurable topics, 7-day TTL, 50GB cap, zstd compression, and optional NAS rsync.
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>rosbag2_py</depend>
<depend>std_srvs</depend>
<depend>std_msgs</depend>
<depend>ament_index_python</depend>
<exec_depend>python3-launch-ros</exec_depend>
<exec_depend>ros2bag</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,282 @@
#!/usr/bin/env python3
import os
import signal
import shutil
import subprocess
import threading
import time
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Optional
import rclpy
from rclpy.node import Node
from std_srvs.srv import Trigger
from std_msgs.msg import String
class BagRecorderNode(Node):
"""ROS2 bag recording service with circular buffer and storage management."""
def __init__(self):
super().__init__('saltybot_bag_recorder')
# Configuration
self.declare_parameter('bag_dir', '/home/seb/rosbags')
self.declare_parameter('topics', [''])
self.declare_parameter('buffer_duration_minutes', 30)
self.declare_parameter('storage_ttl_days', 7)
self.declare_parameter('max_storage_gb', 50)
self.declare_parameter('enable_rsync', False)
self.declare_parameter('rsync_destination', '')
self.declare_parameter('compression', 'zstd')
self.bag_dir = Path(self.get_parameter('bag_dir').value)
self.topics = self.get_parameter('topics').value
self.buffer_duration = self.get_parameter('buffer_duration_minutes').value * 60
self.storage_ttl_days = self.get_parameter('storage_ttl_days').value
self.max_storage_gb = self.get_parameter('max_storage_gb').value
self.enable_rsync = self.get_parameter('enable_rsync').value
self.rsync_destination = self.get_parameter('rsync_destination').value
self.compression = self.get_parameter('compression').value
self.bag_dir.mkdir(parents=True, exist_ok=True)
# Recording state
self.is_recording = False
self.current_bag_process = None
self.current_bag_name = None
self.buffer_bags: List[str] = []
self.recording_lock = threading.Lock()
# Services
self.save_service = self.create_service(
Trigger,
'/saltybot/save_bag',
self.save_bag_callback
)
# Watchdog to handle crash recovery
self.setup_signal_handlers()
# Start recording
self.start_recording()
# Periodic maintenance (cleanup old bags, manage storage)
self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback)
self.get_logger().info(
f'Bag recorder initialized: {self.bag_dir}, '
f'buffer={self.buffer_duration}s, ttl={self.storage_ttl_days}d, '
f'max={self.max_storage_gb}GB'
)
def setup_signal_handlers(self):
"""Setup signal handlers for graceful shutdown and crash recovery."""
def signal_handler(sig, frame):
self.get_logger().warn(f'Signal {sig} received, saving current bag')
self.stop_recording(save=True)
self.cleanup()
rclpy.shutdown()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def start_recording(self):
"""Start bag recording in the background."""
with self.recording_lock:
if self.is_recording:
return
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.current_bag_name = f'saltybot_{timestamp}'
bag_path = self.bag_dir / self.current_bag_name
try:
# Build rosbag2 record command
cmd = [
'ros2', 'bag', 'record',
f'--output', str(bag_path),
f'--compression-format', self.compression,
f'--compression-mode', 'file',
]
# Add topics or record all if empty
if self.topics and self.topics[0]:
cmd.extend(self.topics)
else:
cmd.append('--all')
self.current_bag_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.is_recording = True
self.buffer_bags.append(self.current_bag_name)
self.get_logger().info(f'Started recording: {self.current_bag_name}')
except Exception as e:
self.get_logger().error(f'Failed to start recording: {e}')
def save_bag_callback(self, request, response):
"""Service callback to manually trigger bag save."""
try:
self.stop_recording(save=True)
self.start_recording()
response.success = True
response.message = f'Saved: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to save bag: {e}'
self.get_logger().error(response.message)
return response
def stop_recording(self, save: bool = False):
"""Stop the current bag recording."""
with self.recording_lock:
if not self.is_recording or not self.current_bag_process:
return
try:
# Send SIGINT to gracefully close rosbag2
self.current_bag_process.send_signal(signal.SIGINT)
self.current_bag_process.wait(timeout=5.0)
except subprocess.TimeoutExpired:
self.get_logger().warn(f'Force terminating {self.current_bag_name}')
self.current_bag_process.kill()
self.current_bag_process.wait()
except Exception as e:
self.get_logger().error(f'Error stopping recording: {e}')
self.is_recording = False
self.get_logger().info(f'Stopped recording: {self.current_bag_name}')
# Apply compression if needed (rosbag2 does this by default with -compression-mode file)
if save:
self.apply_compression()
def apply_compression(self):
"""Compress the current bag using zstd."""
if not self.current_bag_name:
return
bag_path = self.bag_dir / self.current_bag_name
try:
# rosbag2 with compression-mode file already compresses the sqlite db
# This is a secondary option to compress the entire directory
tar_path = f'{bag_path}.tar.zst'
if bag_path.exists():
cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name]
subprocess.run(cmd, check=True, capture_output=True, timeout=60)
self.get_logger().info(f'Compressed: {tar_path}')
except Exception as e:
self.get_logger().warn(f'Compression skipped: {e}')
def maintenance_callback(self):
"""Periodic maintenance: cleanup old bags and manage storage."""
self.cleanup_expired_bags()
self.enforce_storage_quota()
if self.enable_rsync and self.rsync_destination:
self.rsync_bags()
def cleanup_expired_bags(self):
"""Remove bags older than TTL."""
try:
cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days)
for item in self.bag_dir.iterdir():
if item.is_dir() and item.name.startswith('saltybot_'):
try:
# Parse timestamp from directory name
timestamp_str = item.name.replace('saltybot_', '')
item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
if item_time < cutoff_time:
shutil.rmtree(item, ignore_errors=True)
self.get_logger().info(f'Removed expired bag: {item.name}')
except (ValueError, OSError) as e:
self.get_logger().warn(f'Error processing {item.name}: {e}')
except Exception as e:
self.get_logger().error(f'Cleanup failed: {e}')
def enforce_storage_quota(self):
"""Remove oldest bags if total size exceeds quota."""
try:
total_size = sum(
f.stat().st_size
for f in self.bag_dir.rglob('*')
if f.is_file()
) / (1024 ** 3) # Convert to GB
if total_size > self.max_storage_gb:
self.get_logger().warn(
f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB'
)
# Get bags sorted by modification time
bags = sorted(
[d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')],
key=lambda x: x.stat().st_mtime
)
# Remove oldest bags until under quota
for bag in bags:
if total_size <= self.max_storage_gb:
break
bag_size = sum(
f.stat().st_size
for f in bag.rglob('*')
if f.is_file()
) / (1024 ** 3)
shutil.rmtree(bag, ignore_errors=True)
total_size -= bag_size
self.get_logger().info(f'Removed bag to enforce quota: {bag.name}')
except Exception as e:
self.get_logger().error(f'Storage quota enforcement failed: {e}')
def rsync_bags(self):
"""Optional: rsync bags to NAS."""
try:
cmd = [
'rsync', '-avz', '--delete',
f'{self.bag_dir}/',
self.rsync_destination
]
subprocess.run(cmd, check=False, timeout=300)
self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}')
except subprocess.TimeoutExpired:
self.get_logger().warn('Rsync timed out')
except Exception as e:
self.get_logger().error(f'Rsync failed: {e}')
def cleanup(self):
"""Cleanup resources."""
self.stop_recording(save=True)
self.get_logger().info('Bag recorder shutdown complete')
def main(args=None):
rclpy.init(args=args)
node = BagRecorderNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.cleanup()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_bag_recorder
[install]
script_dir=$base/lib/saltybot_bag_recorder

View File

@ -0,0 +1,32 @@
from setuptools import setup
import os
from glob import glob
package_name = 'saltybot_bag_recorder'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
(os.path.join('share', package_name, 'config'),
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@vayrette.com',
description='ROS2 bag recording service with circular buffer and storage management',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'bag_recorder = saltybot_bag_recorder.bag_recorder_node:main',
],
},
)

View File

@ -0,0 +1,25 @@
import unittest
from pathlib import Path
class TestBagRecorder(unittest.TestCase):
"""Basic tests for bag recorder functionality."""
def test_imports(self):
"""Test that the module can be imported."""
from saltybot_bag_recorder import bag_recorder_node
self.assertIsNotNone(bag_recorder_node)
def test_config_file_exists(self):
"""Test that config file exists."""
config_file = Path(__file__).parent.parent / 'config' / 'bag_recorder.yaml'
self.assertTrue(config_file.exists())
def test_launch_file_exists(self):
"""Test that launch file exists."""
launch_file = Path(__file__).parent.parent / 'launch' / 'bag_recorder.launch.py'
self.assertTrue(launch_file.exists())
if __name__ == '__main__':
unittest.main()