diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/config/bag_recorder.yaml b/jetson/ros2_ws/src/saltybot_bag_recorder/config/bag_recorder.yaml index eaacc7e..4384791 100644 --- a/jetson/ros2_ws/src/saltybot_bag_recorder/config/bag_recorder.yaml +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/config/bag_recorder.yaml @@ -1,24 +1,26 @@ bag_recorder: ros__parameters: - # Path where bags are stored - bag_dir: '/home/seb/rosbags' + # Path where bags are stored (Issue #488: mission logging) + bag_dir: '~/saltybot-data/bags' - # Topics to record (empty list = record all) - topics: [] - # topics: - # - '/camera/image_raw' - # - '/lidar/scan' - # - '/odom' + # Topics to record for mission logging (Issue #488) + topics: + - '/scan' + - '/cmd_vel' + - '/odom' + - '/tf' + - '/camera/color/image_raw/compressed' + - '/saltybot/diagnostics' - # Circular buffer duration (minutes) + # Rotation interval: save new bag every N minutes (Issue #488: 30 min) buffer_duration_minutes: 30 - # Storage management + # Storage management (Issue #488: FIFO 20GB limit) storage_ttl_days: 7 # Remove bags older than 7 days - max_storage_gb: 50 # Enforce 50GB quota + max_storage_gb: 20 # Enforce 20GB FIFO quota - # Compression - compression: 'zstd' # Options: zstd, zstandard + # Storage format (Issue #488: prefer MCAP) + storage_format: 'mcap' # Options: mcap, sqlite3 # NAS sync (optional) enable_rsync: false diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/package.xml b/jetson/ros2_ws/src/saltybot_bag_recorder/package.xml index bffbe26..28e1544 100644 --- a/jetson/ros2_ws/src/saltybot_bag_recorder/package.xml +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/package.xml @@ -4,8 +4,9 @@ saltybot_bag_recorder 0.1.0 - ROS2 bag recording service with circular buffer, auto-save on crash, and storage management. - Configurable topics, 7-day TTL, 50GB cap, zstd compression, and optional NAS rsync. + ROS2 bag recording service for mission logging with circular buffer and storage management. + Records mission-critical topics to MCAP format with 30min rotation, 20GB FIFO cap, auto-start, + start/stop services, and optional NAS rsync for archival (Issue #488). seb MIT diff --git a/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/bag_recorder_node.py b/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/bag_recorder_node.py index a291188..9df26bc 100644 --- a/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/bag_recorder_node.py +++ b/jetson/ros2_ws/src/saltybot_bag_recorder/saltybot_bag_recorder/bag_recorder_node.py @@ -17,20 +17,28 @@ from std_msgs.msg import String class BagRecorderNode(Node): - """ROS2 bag recording service with circular buffer and storage management.""" + """ROS2 bag recording service for mission logging (Issue #488).""" def __init__(self): super().__init__('saltybot_bag_recorder') - # Configuration - self.declare_parameter('bag_dir', '/home/seb/rosbags') - self.declare_parameter('topics', ['']) + # Configuration (Issue #488: mission logging) + default_bag_dir = str(Path.home() / 'saltybot-data' / 'bags') + self.declare_parameter('bag_dir', default_bag_dir) + self.declare_parameter('topics', [ + '/scan', + '/cmd_vel', + '/odom', + '/tf', + '/camera/color/image_raw/compressed', + '/saltybot/diagnostics' + ]) self.declare_parameter('buffer_duration_minutes', 30) self.declare_parameter('storage_ttl_days', 7) - self.declare_parameter('max_storage_gb', 50) + self.declare_parameter('max_storage_gb', 20) self.declare_parameter('enable_rsync', False) self.declare_parameter('rsync_destination', '') - self.declare_parameter('compression', 'zstd') + self.declare_parameter('storage_format', 'mcap') self.bag_dir = Path(self.get_parameter('bag_dir').value) self.topics = self.get_parameter('topics').value @@ -39,7 +47,7 @@ class BagRecorderNode(Node): self.max_storage_gb = self.get_parameter('max_storage_gb').value self.enable_rsync = self.get_parameter('enable_rsync').value self.rsync_destination = self.get_parameter('rsync_destination').value - self.compression = self.get_parameter('compression').value + self.storage_format = self.get_parameter('storage_format').value self.bag_dir.mkdir(parents=True, exist_ok=True) @@ -51,11 +59,9 @@ class BagRecorderNode(Node): self.recording_lock = threading.Lock() # Services - self.save_service = self.create_service( - Trigger, - '/saltybot/save_bag', - self.save_bag_callback - ) + self.save_service = self.create_service(Trigger, '/saltybot/save_bag', self.save_bag_callback) + self.start_service = self.create_service(Trigger, '/saltybot/start_recording', self.start_recording_callback) + self.stop_service = self.create_service(Trigger, '/saltybot/stop_recording', self.stop_recording_callback) # Watchdog to handle crash recovery self.setup_signal_handlers() @@ -67,9 +73,8 @@ class BagRecorderNode(Node): self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback) self.get_logger().info( - f'Bag recorder initialized: {self.bag_dir}, ' - f'buffer={self.buffer_duration}s, ttl={self.storage_ttl_days}d, ' - f'max={self.max_storage_gb}GB' + f'Bag recorder initialized: {self.bag_dir}, format={self.storage_format}, ' + f'buffer={self.buffer_duration}s, max={self.max_storage_gb}GB, topics={len(self.topics)}' ) def setup_signal_handlers(self): @@ -95,27 +100,21 @@ class BagRecorderNode(Node): try: # Build rosbag2 record command - cmd = [ - 'ros2', 'bag', 'record', - f'--output', str(bag_path), - f'--compression-format', self.compression, - f'--compression-mode', 'file', - ] + cmd = ['ros2', 'bag', 'record', '--output', str(bag_path), '--storage', self.storage_format] - # Add topics or record all if empty + # Add compression for sqlite3 format + if self.storage_format == 'sqlite3': + cmd.extend(['--compression-format', 'zstd', '--compression-mode', 'file']) + + # Add topics (required for mission logging) if self.topics and self.topics[0]: cmd.extend(self.topics) else: - cmd.append('--all') + cmd.extend(['/scan', '/cmd_vel', '/odom', '/tf', '/camera/color/image_raw/compressed', '/saltybot/diagnostics']) - self.current_bag_process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) + self.current_bag_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.is_recording = True self.buffer_bags.append(self.current_bag_name) - self.get_logger().info(f'Started recording: {self.current_bag_name}') except Exception as e: @@ -133,7 +132,40 @@ class BagRecorderNode(Node): response.success = False response.message = f'Failed to save bag: {e}' self.get_logger().error(response.message) + return response + def start_recording_callback(self, request, response): + """Service callback to start recording.""" + if self.is_recording: + response.success = False + response.message = 'Recording already in progress' + return response + try: + self.start_recording() + response.success = True + response.message = f'Recording started: {self.current_bag_name}' + self.get_logger().info(response.message) + except Exception as e: + response.success = False + response.message = f'Failed to start recording: {e}' + self.get_logger().error(response.message) + return response + + def stop_recording_callback(self, request, response): + """Service callback to stop recording.""" + if not self.is_recording: + response.success = False + response.message = 'No recording in progress' + return response + try: + self.stop_recording(save=True) + response.success = True + response.message = f'Recording stopped and saved: {self.current_bag_name}' + self.get_logger().info(response.message) + except Exception as e: + response.success = False + response.message = f'Failed to stop recording: {e}' + self.get_logger().error(response.message) return response def stop_recording(self, save: bool = False): @@ -141,9 +173,7 @@ class BagRecorderNode(Node): with self.recording_lock: if not self.is_recording or not self.current_bag_process: return - try: - # Send SIGINT to gracefully close rosbag2 self.current_bag_process.send_signal(signal.SIGINT) self.current_bag_process.wait(timeout=5.0) except subprocess.TimeoutExpired: @@ -152,11 +182,8 @@ class BagRecorderNode(Node): self.current_bag_process.wait() except Exception as e: self.get_logger().error(f'Error stopping recording: {e}') - self.is_recording = False self.get_logger().info(f'Stopped recording: {self.current_bag_name}') - - # Apply compression if needed (rosbag2 does this by default with -compression-mode file) if save: self.apply_compression() @@ -164,13 +191,9 @@ class BagRecorderNode(Node): """Compress the current bag using zstd.""" if not self.current_bag_name: return - bag_path = self.bag_dir / self.current_bag_name try: - # rosbag2 with compression-mode file already compresses the sqlite db - # This is a secondary option to compress the entire directory tar_path = f'{bag_path}.tar.zst' - if bag_path.exists(): cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name] subprocess.run(cmd, check=True, capture_output=True, timeout=60) @@ -189,14 +212,11 @@ class BagRecorderNode(Node): """Remove bags older than TTL.""" try: cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days) - for item in self.bag_dir.iterdir(): if item.is_dir() and item.name.startswith('saltybot_'): try: - # Parse timestamp from directory name timestamp_str = item.name.replace('saltybot_', '') item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S') - if item_time < cutoff_time: shutil.rmtree(item, ignore_errors=True) self.get_logger().info(f'Removed expired bag: {item.name}') @@ -206,51 +226,26 @@ class BagRecorderNode(Node): self.get_logger().error(f'Cleanup failed: {e}') def enforce_storage_quota(self): - """Remove oldest bags if total size exceeds quota.""" + """Remove oldest bags if total size exceeds quota (FIFO).""" try: - total_size = sum( - f.stat().st_size - for f in self.bag_dir.rglob('*') - if f.is_file() - ) / (1024 ** 3) # Convert to GB - + total_size = sum(f.stat().st_size for f in self.bag_dir.rglob('*') if f.is_file()) / (1024 ** 3) if total_size > self.max_storage_gb: - self.get_logger().warn( - f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB' - ) - - # Get bags sorted by modification time - bags = sorted( - [d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')], - key=lambda x: x.stat().st_mtime - ) - - # Remove oldest bags until under quota + self.get_logger().warn(f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB') + bags = sorted([d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')], key=lambda x: x.stat().st_mtime) for bag in bags: if total_size <= self.max_storage_gb: break - - bag_size = sum( - f.stat().st_size - for f in bag.rglob('*') - if f.is_file() - ) / (1024 ** 3) - + bag_size = sum(f.stat().st_size for f in bag.rglob('*') if f.is_file()) / (1024 ** 3) shutil.rmtree(bag, ignore_errors=True) total_size -= bag_size self.get_logger().info(f'Removed bag to enforce quota: {bag.name}') - except Exception as e: self.get_logger().error(f'Storage quota enforcement failed: {e}') def rsync_bags(self): """Optional: rsync bags to NAS.""" try: - cmd = [ - 'rsync', '-avz', '--delete', - f'{self.bag_dir}/', - self.rsync_destination - ] + cmd = ['rsync', '-avz', '--delete', f'{self.bag_dir}/', self.rsync_destination] subprocess.run(cmd, check=False, timeout=300) self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}') except subprocess.TimeoutExpired: @@ -267,7 +262,6 @@ class BagRecorderNode(Node): def main(args=None): rclpy.init(args=args) node = BagRecorderNode() - try: rclpy.spin(node) except KeyboardInterrupt: