diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/.gitignore b/jetson/ros2_ws/src/saltybot_bag_recorder/.gitignore new file mode 100644 index 0000000..4549348 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/.gitignore @@ -0,0 +1,9 @@ +build/ +install/ +log/ +*.pyc +__pycache__/ +.pytest_cache/ +*.egg-info/ +dist/ +*.egg diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/config/bag_recorder.yaml b/jetson/ros2_ws/src/saltybot_bag_recorder/config/bag_recorder.yaml new file mode 100644 index 0000000..eaacc7e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/config/bag_recorder.yaml @@ -0,0 +1,26 @@ +bag_recorder: + ros__parameters: + # Path where bags are stored + bag_dir: '/home/seb/rosbags' + + # Topics to record (empty list = record all) + topics: [] + # topics: + # - '/camera/image_raw' + # - '/lidar/scan' + # - '/odom' + + # Circular buffer duration (minutes) + buffer_duration_minutes: 30 + + # Storage management + storage_ttl_days: 7 # Remove bags older than 7 days + max_storage_gb: 50 # Enforce 50GB quota + + # Compression + compression: 'zstd' # Options: zstd, zstandard + + # NAS sync (optional) + enable_rsync: false + rsync_destination: '' + # rsync_destination: 'user@nas:/path/to/backups/' diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/launch/bag_recorder.launch.py b/jetson/ros2_ws/src/saltybot_bag_recorder/launch/bag_recorder.launch.py new file mode 100644 index 0000000..659e0f8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/launch/bag_recorder.launch.py @@ -0,0 +1,23 @@ +from launch import LaunchDescription +from launch_ros.actions import Node +from ament_index_python.packages import get_package_share_directory +import os + + +def generate_launch_description(): + pkg_dir = get_package_share_directory('saltybot_bag_recorder') + config_file = os.path.join(pkg_dir, 'config', 'bag_recorder.yaml') + + bag_recorder_node = Node( + package='saltybot_bag_recorder', + executable='bag_recorder', + name='bag_recorder', + parameters=[config_file], + output='screen', + respawn=True, + respawn_delay=5, + ) + + return LaunchDescription([ + bag_recorder_node, + ]) diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/package.xml b/jetson/ros2_ws/src/saltybot_bag_recorder/package.xml new file mode 100644 index 0000000..bffbe26 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/package.xml @@ -0,0 +1,30 @@ + + + + saltybot_bag_recorder + 0.1.0 + + ROS2 bag recording service with circular buffer, auto-save on crash, and storage management. + Configurable topics, 7-day TTL, 50GB cap, zstd compression, and optional NAS rsync. + + seb + MIT + + rclpy + rosbag2_py + std_srvs + std_msgs + ament_index_python + + python3-launch-ros + ros2bag + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/resource/saltybot_bag_recorder b/jetson/ros2_ws/src/saltybot_bag_recorder/resource/saltybot_bag_recorder new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/__init__.py b/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/bag_recorder_node.py b/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/bag_recorder_node.py new file mode 100644 index 0000000..a291188 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/bag_recorder_node.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 + +import os +import signal +import shutil +import subprocess +import threading +import time +from pathlib import Path +from datetime import datetime, timedelta +from typing import List, Optional + +import rclpy +from rclpy.node import Node +from std_srvs.srv import Trigger +from std_msgs.msg import String + + +class BagRecorderNode(Node): + """ROS2 bag recording service with circular buffer and storage management.""" + + def __init__(self): + super().__init__('saltybot_bag_recorder') + + # Configuration + self.declare_parameter('bag_dir', '/home/seb/rosbags') + self.declare_parameter('topics', ['']) + self.declare_parameter('buffer_duration_minutes', 30) + self.declare_parameter('storage_ttl_days', 7) + self.declare_parameter('max_storage_gb', 50) + self.declare_parameter('enable_rsync', False) + self.declare_parameter('rsync_destination', '') + self.declare_parameter('compression', 'zstd') + + self.bag_dir = Path(self.get_parameter('bag_dir').value) + self.topics = self.get_parameter('topics').value + self.buffer_duration = self.get_parameter('buffer_duration_minutes').value * 60 + self.storage_ttl_days = self.get_parameter('storage_ttl_days').value + self.max_storage_gb = self.get_parameter('max_storage_gb').value + self.enable_rsync = self.get_parameter('enable_rsync').value + self.rsync_destination = self.get_parameter('rsync_destination').value + self.compression = self.get_parameter('compression').value + + self.bag_dir.mkdir(parents=True, exist_ok=True) + + # Recording state + self.is_recording = False + self.current_bag_process = None + self.current_bag_name = None + self.buffer_bags: List[str] = [] + self.recording_lock = threading.Lock() + + # Services + self.save_service = self.create_service( + Trigger, + '/saltybot/save_bag', + self.save_bag_callback + ) + + # Watchdog to handle crash recovery + self.setup_signal_handlers() + + # Start recording + self.start_recording() + + # Periodic maintenance (cleanup old bags, manage storage) + self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback) + + self.get_logger().info( + f'Bag recorder initialized: {self.bag_dir}, ' + f'buffer={self.buffer_duration}s, ttl={self.storage_ttl_days}d, ' + f'max={self.max_storage_gb}GB' + ) + + def setup_signal_handlers(self): + """Setup signal handlers for graceful shutdown and crash recovery.""" + def signal_handler(sig, frame): + self.get_logger().warn(f'Signal {sig} received, saving current bag') + self.stop_recording(save=True) + self.cleanup() + rclpy.shutdown() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + def start_recording(self): + """Start bag recording in the background.""" + with self.recording_lock: + if self.is_recording: + return + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + self.current_bag_name = f'saltybot_{timestamp}' + bag_path = self.bag_dir / self.current_bag_name + + try: + # Build rosbag2 record command + cmd = [ + 'ros2', 'bag', 'record', + f'--output', str(bag_path), + f'--compression-format', self.compression, + f'--compression-mode', 'file', + ] + + # Add topics or record all if empty + if self.topics and self.topics[0]: + cmd.extend(self.topics) + else: + cmd.append('--all') + + self.current_bag_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + self.is_recording = True + self.buffer_bags.append(self.current_bag_name) + + self.get_logger().info(f'Started recording: {self.current_bag_name}') + + except Exception as e: + self.get_logger().error(f'Failed to start recording: {e}') + + def save_bag_callback(self, request, response): + """Service callback to manually trigger bag save.""" + try: + self.stop_recording(save=True) + self.start_recording() + response.success = True + response.message = f'Saved: {self.current_bag_name}' + self.get_logger().info(response.message) + except Exception as e: + response.success = False + response.message = f'Failed to save bag: {e}' + self.get_logger().error(response.message) + + return response + + def stop_recording(self, save: bool = False): + """Stop the current bag recording.""" + with self.recording_lock: + if not self.is_recording or not self.current_bag_process: + return + + try: + # Send SIGINT to gracefully close rosbag2 + self.current_bag_process.send_signal(signal.SIGINT) + self.current_bag_process.wait(timeout=5.0) + except subprocess.TimeoutExpired: + self.get_logger().warn(f'Force terminating {self.current_bag_name}') + self.current_bag_process.kill() + self.current_bag_process.wait() + except Exception as e: + self.get_logger().error(f'Error stopping recording: {e}') + + self.is_recording = False + self.get_logger().info(f'Stopped recording: {self.current_bag_name}') + + # Apply compression if needed (rosbag2 does this by default with -compression-mode file) + if save: + self.apply_compression() + + def apply_compression(self): + """Compress the current bag using zstd.""" + if not self.current_bag_name: + return + + bag_path = self.bag_dir / self.current_bag_name + try: + # rosbag2 with compression-mode file already compresses the sqlite db + # This is a secondary option to compress the entire directory + tar_path = f'{bag_path}.tar.zst' + + if bag_path.exists(): + cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name] + subprocess.run(cmd, check=True, capture_output=True, timeout=60) + self.get_logger().info(f'Compressed: {tar_path}') + except Exception as e: + self.get_logger().warn(f'Compression skipped: {e}') + + def maintenance_callback(self): + """Periodic maintenance: cleanup old bags and manage storage.""" + self.cleanup_expired_bags() + self.enforce_storage_quota() + if self.enable_rsync and self.rsync_destination: + self.rsync_bags() + + def cleanup_expired_bags(self): + """Remove bags older than TTL.""" + try: + cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days) + + for item in self.bag_dir.iterdir(): + if item.is_dir() and item.name.startswith('saltybot_'): + try: + # Parse timestamp from directory name + timestamp_str = item.name.replace('saltybot_', '') + item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S') + + if item_time < cutoff_time: + shutil.rmtree(item, ignore_errors=True) + self.get_logger().info(f'Removed expired bag: {item.name}') + except (ValueError, OSError) as e: + self.get_logger().warn(f'Error processing {item.name}: {e}') + except Exception as e: + self.get_logger().error(f'Cleanup failed: {e}') + + def enforce_storage_quota(self): + """Remove oldest bags if total size exceeds quota.""" + try: + total_size = sum( + f.stat().st_size + for f in self.bag_dir.rglob('*') + if f.is_file() + ) / (1024 ** 3) # Convert to GB + + if total_size > self.max_storage_gb: + self.get_logger().warn( + f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB' + ) + + # Get bags sorted by modification time + bags = sorted( + [d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')], + key=lambda x: x.stat().st_mtime + ) + + # Remove oldest bags until under quota + for bag in bags: + if total_size <= self.max_storage_gb: + break + + bag_size = sum( + f.stat().st_size + for f in bag.rglob('*') + if f.is_file() + ) / (1024 ** 3) + + shutil.rmtree(bag, ignore_errors=True) + total_size -= bag_size + self.get_logger().info(f'Removed bag to enforce quota: {bag.name}') + + except Exception as e: + self.get_logger().error(f'Storage quota enforcement failed: {e}') + + def rsync_bags(self): + """Optional: rsync bags to NAS.""" + try: + cmd = [ + 'rsync', '-avz', '--delete', + f'{self.bag_dir}/', + self.rsync_destination + ] + subprocess.run(cmd, check=False, timeout=300) + self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}') + except subprocess.TimeoutExpired: + self.get_logger().warn('Rsync timed out') + except Exception as e: + self.get_logger().error(f'Rsync failed: {e}') + + def cleanup(self): + """Cleanup resources.""" + self.stop_recording(save=True) + self.get_logger().info('Bag recorder shutdown complete') + + +def main(args=None): + rclpy.init(args=args) + node = BagRecorderNode() + + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.cleanup() + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/setup.cfg b/jetson/ros2_ws/src/saltybot_bag_recorder/setup.cfg new file mode 100644 index 0000000..4d24108 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_bag_recorder + +[install] +script_dir=$base/lib/saltybot_bag_recorder diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/setup.py b/jetson/ros2_ws/src/saltybot_bag_recorder/setup.py new file mode 100644 index 0000000..8c36500 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup +import os +from glob import glob + +package_name = 'saltybot_bag_recorder' + +setup( + name=package_name, + version='0.1.0', + packages=[package_name], + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + (os.path.join('share', package_name, 'launch'), + glob('launch/*.py')), + (os.path.join('share', package_name, 'config'), + glob('config/*.yaml')), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='seb', + maintainer_email='seb@vayrette.com', + description='ROS2 bag recording service with circular buffer and storage management', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'bag_recorder = saltybot_bag_recorder.bag_recorder_node:main', + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/test/__init__.py b/jetson/ros2_ws/src/saltybot_bag_recorder/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/test/test_bag_recorder.py b/jetson/ros2_ws/src/saltybot_bag_recorder/test/test_bag_recorder.py new file mode 100644 index 0000000..76e09f4 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/test/test_bag_recorder.py @@ -0,0 +1,25 @@ +import unittest +from pathlib import Path + + +class TestBagRecorder(unittest.TestCase): + """Basic tests for bag recorder functionality.""" + + def test_imports(self): + """Test that the module can be imported.""" + from saltybot_bag_recorder import bag_recorder_node + self.assertIsNotNone(bag_recorder_node) + + def test_config_file_exists(self): + """Test that config file exists.""" + config_file = Path(__file__).parent.parent / 'config' / 'bag_recorder.yaml' + self.assertTrue(config_file.exists()) + + def test_launch_file_exists(self): + """Test that launch file exists.""" + launch_file = Path(__file__).parent.parent / 'launch' / 'bag_recorder.launch.py' + self.assertTrue(launch_file.exists()) + + +if __name__ == '__main__': + unittest.main()