Merge pull request 'feat: ROS2 bag recording for mission logging (Issue #488)' (#496) from sl-firmware/issue-488-bag-recording into main

This commit is contained in:
sl-jetson 2026-03-05 17:16:22 -05:00
commit 2dd03a245d
4 changed files with 114 additions and 88 deletions

View File

@ -1,24 +1,26 @@
bag_recorder: bag_recorder:
ros__parameters: ros__parameters:
# Path where bags are stored # Path where bags are stored (Issue #488: mission logging)
bag_dir: '/home/seb/rosbags' bag_dir: '~/saltybot-data/bags'
# Topics to record (empty list = record all) # Topics to record for mission logging (Issue #488)
topics: [] topics:
# topics: - '/scan'
# - '/camera/image_raw' - '/cmd_vel'
# - '/lidar/scan' - '/odom'
# - '/odom' - '/tf'
- '/camera/color/image_raw/compressed'
- '/saltybot/diagnostics'
# Circular buffer duration (minutes) # Rotation interval: save new bag every N minutes (Issue #488: 30 min)
buffer_duration_minutes: 30 buffer_duration_minutes: 30
# Storage management # Storage management (Issue #488: FIFO 20GB limit)
storage_ttl_days: 7 # Remove bags older than 7 days storage_ttl_days: 7 # Remove bags older than 7 days
max_storage_gb: 50 # Enforce 50GB quota max_storage_gb: 20 # Enforce 20GB FIFO quota
# Compression # Storage format (Issue #488: prefer MCAP)
compression: 'zstd' # Options: zstd, zstandard storage_format: 'mcap' # Options: mcap, sqlite3
# NAS sync (optional) # NAS sync (optional)
enable_rsync: false enable_rsync: false

View File

@ -4,8 +4,9 @@
<name>saltybot_bag_recorder</name> <name>saltybot_bag_recorder</name>
<version>0.1.0</version> <version>0.1.0</version>
<description> <description>
ROS2 bag recording service with circular buffer, auto-save on crash, and storage management. ROS2 bag recording service for mission logging with circular buffer and storage management.
Configurable topics, 7-day TTL, 50GB cap, zstd compression, and optional NAS rsync. Records mission-critical topics to MCAP format with 30min rotation, 20GB FIFO cap, auto-start,
start/stop services, and optional NAS rsync for archival (Issue #488).
</description> </description>
<maintainer email="seb@vayrette.com">seb</maintainer> <maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license> <license>MIT</license>

View File

@ -17,20 +17,28 @@ from std_msgs.msg import String
class BagRecorderNode(Node): class BagRecorderNode(Node):
"""ROS2 bag recording service with circular buffer and storage management.""" """ROS2 bag recording service for mission logging (Issue #488)."""
def __init__(self): def __init__(self):
super().__init__('saltybot_bag_recorder') super().__init__('saltybot_bag_recorder')
# Configuration # Configuration (Issue #488: mission logging)
self.declare_parameter('bag_dir', '/home/seb/rosbags') default_bag_dir = str(Path.home() / 'saltybot-data' / 'bags')
self.declare_parameter('topics', ['']) self.declare_parameter('bag_dir', default_bag_dir)
self.declare_parameter('topics', [
'/scan',
'/cmd_vel',
'/odom',
'/tf',
'/camera/color/image_raw/compressed',
'/saltybot/diagnostics'
])
self.declare_parameter('buffer_duration_minutes', 30) self.declare_parameter('buffer_duration_minutes', 30)
self.declare_parameter('storage_ttl_days', 7) self.declare_parameter('storage_ttl_days', 7)
self.declare_parameter('max_storage_gb', 50) self.declare_parameter('max_storage_gb', 20)
self.declare_parameter('enable_rsync', False) self.declare_parameter('enable_rsync', False)
self.declare_parameter('rsync_destination', '') self.declare_parameter('rsync_destination', '')
self.declare_parameter('compression', 'zstd') self.declare_parameter('storage_format', 'mcap')
self.bag_dir = Path(self.get_parameter('bag_dir').value) self.bag_dir = Path(self.get_parameter('bag_dir').value)
self.topics = self.get_parameter('topics').value self.topics = self.get_parameter('topics').value
@ -39,7 +47,7 @@ class BagRecorderNode(Node):
self.max_storage_gb = self.get_parameter('max_storage_gb').value self.max_storage_gb = self.get_parameter('max_storage_gb').value
self.enable_rsync = self.get_parameter('enable_rsync').value self.enable_rsync = self.get_parameter('enable_rsync').value
self.rsync_destination = self.get_parameter('rsync_destination').value self.rsync_destination = self.get_parameter('rsync_destination').value
self.compression = self.get_parameter('compression').value self.storage_format = self.get_parameter('storage_format').value
self.bag_dir.mkdir(parents=True, exist_ok=True) self.bag_dir.mkdir(parents=True, exist_ok=True)
@ -51,11 +59,9 @@ class BagRecorderNode(Node):
self.recording_lock = threading.Lock() self.recording_lock = threading.Lock()
# Services # Services
self.save_service = self.create_service( self.save_service = self.create_service(Trigger, '/saltybot/save_bag', self.save_bag_callback)
Trigger, self.start_service = self.create_service(Trigger, '/saltybot/start_recording', self.start_recording_callback)
'/saltybot/save_bag', self.stop_service = self.create_service(Trigger, '/saltybot/stop_recording', self.stop_recording_callback)
self.save_bag_callback
)
# Watchdog to handle crash recovery # Watchdog to handle crash recovery
self.setup_signal_handlers() self.setup_signal_handlers()
@ -67,9 +73,8 @@ class BagRecorderNode(Node):
self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback) self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback)
self.get_logger().info( self.get_logger().info(
f'Bag recorder initialized: {self.bag_dir}, ' f'Bag recorder initialized: {self.bag_dir}, format={self.storage_format}, '
f'buffer={self.buffer_duration}s, ttl={self.storage_ttl_days}d, ' f'buffer={self.buffer_duration}s, max={self.max_storage_gb}GB, topics={len(self.topics)}'
f'max={self.max_storage_gb}GB'
) )
def setup_signal_handlers(self): def setup_signal_handlers(self):
@ -95,27 +100,21 @@ class BagRecorderNode(Node):
try: try:
# Build rosbag2 record command # Build rosbag2 record command
cmd = [ cmd = ['ros2', 'bag', 'record', '--output', str(bag_path), '--storage', self.storage_format]
'ros2', 'bag', 'record',
f'--output', str(bag_path),
f'--compression-format', self.compression,
f'--compression-mode', 'file',
]
# Add topics or record all if empty # Add compression for sqlite3 format
if self.storage_format == 'sqlite3':
cmd.extend(['--compression-format', 'zstd', '--compression-mode', 'file'])
# Add topics (required for mission logging)
if self.topics and self.topics[0]: if self.topics and self.topics[0]:
cmd.extend(self.topics) cmd.extend(self.topics)
else: else:
cmd.append('--all') cmd.extend(['/scan', '/cmd_vel', '/odom', '/tf', '/camera/color/image_raw/compressed', '/saltybot/diagnostics'])
self.current_bag_process = subprocess.Popen( self.current_bag_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.is_recording = True self.is_recording = True
self.buffer_bags.append(self.current_bag_name) self.buffer_bags.append(self.current_bag_name)
self.get_logger().info(f'Started recording: {self.current_bag_name}') self.get_logger().info(f'Started recording: {self.current_bag_name}')
except Exception as e: except Exception as e:
@ -133,7 +132,40 @@ class BagRecorderNode(Node):
response.success = False response.success = False
response.message = f'Failed to save bag: {e}' response.message = f'Failed to save bag: {e}'
self.get_logger().error(response.message) self.get_logger().error(response.message)
return response
def start_recording_callback(self, request, response):
"""Service callback to start recording."""
if self.is_recording:
response.success = False
response.message = 'Recording already in progress'
return response
try:
self.start_recording()
response.success = True
response.message = f'Recording started: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to start recording: {e}'
self.get_logger().error(response.message)
return response
def stop_recording_callback(self, request, response):
"""Service callback to stop recording."""
if not self.is_recording:
response.success = False
response.message = 'No recording in progress'
return response
try:
self.stop_recording(save=True)
response.success = True
response.message = f'Recording stopped and saved: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to stop recording: {e}'
self.get_logger().error(response.message)
return response return response
def stop_recording(self, save: bool = False): def stop_recording(self, save: bool = False):
@ -141,9 +173,7 @@ class BagRecorderNode(Node):
with self.recording_lock: with self.recording_lock:
if not self.is_recording or not self.current_bag_process: if not self.is_recording or not self.current_bag_process:
return return
try: try:
# Send SIGINT to gracefully close rosbag2
self.current_bag_process.send_signal(signal.SIGINT) self.current_bag_process.send_signal(signal.SIGINT)
self.current_bag_process.wait(timeout=5.0) self.current_bag_process.wait(timeout=5.0)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
@ -152,11 +182,8 @@ class BagRecorderNode(Node):
self.current_bag_process.wait() self.current_bag_process.wait()
except Exception as e: except Exception as e:
self.get_logger().error(f'Error stopping recording: {e}') self.get_logger().error(f'Error stopping recording: {e}')
self.is_recording = False self.is_recording = False
self.get_logger().info(f'Stopped recording: {self.current_bag_name}') self.get_logger().info(f'Stopped recording: {self.current_bag_name}')
# Apply compression if needed (rosbag2 does this by default with -compression-mode file)
if save: if save:
self.apply_compression() self.apply_compression()
@ -164,13 +191,9 @@ class BagRecorderNode(Node):
"""Compress the current bag using zstd.""" """Compress the current bag using zstd."""
if not self.current_bag_name: if not self.current_bag_name:
return return
bag_path = self.bag_dir / self.current_bag_name bag_path = self.bag_dir / self.current_bag_name
try: try:
# rosbag2 with compression-mode file already compresses the sqlite db
# This is a secondary option to compress the entire directory
tar_path = f'{bag_path}.tar.zst' tar_path = f'{bag_path}.tar.zst'
if bag_path.exists(): if bag_path.exists():
cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name] cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name]
subprocess.run(cmd, check=True, capture_output=True, timeout=60) subprocess.run(cmd, check=True, capture_output=True, timeout=60)
@ -189,14 +212,11 @@ class BagRecorderNode(Node):
"""Remove bags older than TTL.""" """Remove bags older than TTL."""
try: try:
cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days) cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days)
for item in self.bag_dir.iterdir(): for item in self.bag_dir.iterdir():
if item.is_dir() and item.name.startswith('saltybot_'): if item.is_dir() and item.name.startswith('saltybot_'):
try: try:
# Parse timestamp from directory name
timestamp_str = item.name.replace('saltybot_', '') timestamp_str = item.name.replace('saltybot_', '')
item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S') item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
if item_time < cutoff_time: if item_time < cutoff_time:
shutil.rmtree(item, ignore_errors=True) shutil.rmtree(item, ignore_errors=True)
self.get_logger().info(f'Removed expired bag: {item.name}') self.get_logger().info(f'Removed expired bag: {item.name}')
@ -206,51 +226,26 @@ class BagRecorderNode(Node):
self.get_logger().error(f'Cleanup failed: {e}') self.get_logger().error(f'Cleanup failed: {e}')
def enforce_storage_quota(self): def enforce_storage_quota(self):
"""Remove oldest bags if total size exceeds quota.""" """Remove oldest bags if total size exceeds quota (FIFO)."""
try: try:
total_size = sum( total_size = sum(f.stat().st_size for f in self.bag_dir.rglob('*') if f.is_file()) / (1024 ** 3)
f.stat().st_size
for f in self.bag_dir.rglob('*')
if f.is_file()
) / (1024 ** 3) # Convert to GB
if total_size > self.max_storage_gb: if total_size > self.max_storage_gb:
self.get_logger().warn( self.get_logger().warn(f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB')
f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB' bags = sorted([d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')], key=lambda x: x.stat().st_mtime)
)
# Get bags sorted by modification time
bags = sorted(
[d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')],
key=lambda x: x.stat().st_mtime
)
# Remove oldest bags until under quota
for bag in bags: for bag in bags:
if total_size <= self.max_storage_gb: if total_size <= self.max_storage_gb:
break break
bag_size = sum(f.stat().st_size for f in bag.rglob('*') if f.is_file()) / (1024 ** 3)
bag_size = sum(
f.stat().st_size
for f in bag.rglob('*')
if f.is_file()
) / (1024 ** 3)
shutil.rmtree(bag, ignore_errors=True) shutil.rmtree(bag, ignore_errors=True)
total_size -= bag_size total_size -= bag_size
self.get_logger().info(f'Removed bag to enforce quota: {bag.name}') self.get_logger().info(f'Removed bag to enforce quota: {bag.name}')
except Exception as e: except Exception as e:
self.get_logger().error(f'Storage quota enforcement failed: {e}') self.get_logger().error(f'Storage quota enforcement failed: {e}')
def rsync_bags(self): def rsync_bags(self):
"""Optional: rsync bags to NAS.""" """Optional: rsync bags to NAS."""
try: try:
cmd = [ cmd = ['rsync', '-avz', '--delete', f'{self.bag_dir}/', self.rsync_destination]
'rsync', '-avz', '--delete',
f'{self.bag_dir}/',
self.rsync_destination
]
subprocess.run(cmd, check=False, timeout=300) subprocess.run(cmd, check=False, timeout=300)
self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}') self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}')
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
@ -267,7 +262,6 @@ class BagRecorderNode(Node):
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
node = BagRecorderNode() node = BagRecorderNode()
try: try:
rclpy.spin(node) rclpy.spin(node)
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -194,6 +194,12 @@ def generate_launch_description():
description="Launch rosbridge WebSocket server (port 9090)", description="Launch rosbridge WebSocket server (port 9090)",
) )
enable_docking_arg = DeclareLaunchArgument(
"enable_docking",
default_value="true",
description="Launch autonomous docking behavior (auto-dock at 20% battery, Issue #489)",
)
follow_distance_arg = DeclareLaunchArgument( follow_distance_arg = DeclareLaunchArgument(
"follow_distance", "follow_distance",
default_value="1.5", default_value="1.5",
@ -418,6 +424,25 @@ def generate_launch_description():
], ],
) )
# ── t=7s Docking (auto-dock at 20% battery, Issue #489) ──────────────────
docking = TimerAction(
period=7.0,
actions=[
GroupAction(
condition=IfCondition(LaunchConfiguration("enable_docking")),
actions=[
LogInfo(msg="[full_stack] Starting autonomous docking (auto-trigger at 20% battery)"),
IncludeLaunchDescription(
_launch("saltybot_docking", "launch", "docking.launch.py"),
launch_arguments={
"battery_low_pct": "20.0",
}.items(),
),
],
),
],
)
# ── t=14s Nav2 (indoor only; SLAM needs ~8s to build initial map) ──────── # ── t=14s Nav2 (indoor only; SLAM needs ~8s to build initial map) ────────
nav2 = TimerAction( nav2 = TimerAction(
period=14.0, period=14.0,
@ -468,6 +493,7 @@ def generate_launch_description():
enable_follower_arg, enable_follower_arg,
enable_bridge_arg, enable_bridge_arg,
enable_rosbridge_arg, enable_rosbridge_arg,
enable_docking_arg,
follow_distance_arg, follow_distance_arg,
max_linear_vel_arg, max_linear_vel_arg,
uwb_port_a_arg, uwb_port_a_arg,
@ -498,6 +524,9 @@ def generate_launch_description():
perception, perception,
object_detection, object_detection,
# t=7s
docking,
# t=9s # t=9s
follower, follower,