Compare commits
No commits in common. "7a4930e8d28956a30588bf35c390131e55349fb8" and "285178b2f90af5fb52eece58568add99ea4772ba" have entirely different histories.
7a4930e8d2
...
285178b2f9
@ -1,26 +1,24 @@
|
|||||||
bag_recorder:
|
bag_recorder:
|
||||||
ros__parameters:
|
ros__parameters:
|
||||||
# Path where bags are stored (Issue #488: mission logging)
|
# Path where bags are stored
|
||||||
bag_dir: '~/saltybot-data/bags'
|
bag_dir: '/home/seb/rosbags'
|
||||||
|
|
||||||
# Topics to record for mission logging (Issue #488)
|
# Topics to record (empty list = record all)
|
||||||
topics:
|
topics: []
|
||||||
- '/scan'
|
# topics:
|
||||||
- '/cmd_vel'
|
# - '/camera/image_raw'
|
||||||
- '/odom'
|
# - '/lidar/scan'
|
||||||
- '/tf'
|
# - '/odom'
|
||||||
- '/camera/color/image_raw/compressed'
|
|
||||||
- '/saltybot/diagnostics'
|
|
||||||
|
|
||||||
# Rotation interval: save new bag every N minutes (Issue #488: 30 min)
|
# Circular buffer duration (minutes)
|
||||||
buffer_duration_minutes: 30
|
buffer_duration_minutes: 30
|
||||||
|
|
||||||
# Storage management (Issue #488: FIFO 20GB limit)
|
# Storage management
|
||||||
storage_ttl_days: 7 # Remove bags older than 7 days
|
storage_ttl_days: 7 # Remove bags older than 7 days
|
||||||
max_storage_gb: 20 # Enforce 20GB FIFO quota
|
max_storage_gb: 50 # Enforce 50GB quota
|
||||||
|
|
||||||
# Storage format (Issue #488: prefer MCAP)
|
# Compression
|
||||||
storage_format: 'mcap' # Options: mcap, sqlite3
|
compression: 'zstd' # Options: zstd, zstandard
|
||||||
|
|
||||||
# NAS sync (optional)
|
# NAS sync (optional)
|
||||||
enable_rsync: false
|
enable_rsync: false
|
||||||
|
|||||||
@ -4,9 +4,8 @@
|
|||||||
<name>saltybot_bag_recorder</name>
|
<name>saltybot_bag_recorder</name>
|
||||||
<version>0.1.0</version>
|
<version>0.1.0</version>
|
||||||
<description>
|
<description>
|
||||||
ROS2 bag recording service for mission logging with circular buffer and storage management.
|
ROS2 bag recording service with circular buffer, auto-save on crash, and storage management.
|
||||||
Records mission-critical topics to MCAP format with 30min rotation, 20GB FIFO cap, auto-start,
|
Configurable topics, 7-day TTL, 50GB cap, zstd compression, and optional NAS rsync.
|
||||||
start/stop services, and optional NAS rsync for archival (Issue #488).
|
|
||||||
</description>
|
</description>
|
||||||
<maintainer email="seb@vayrette.com">seb</maintainer>
|
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||||
<license>MIT</license>
|
<license>MIT</license>
|
||||||
|
|||||||
@ -17,28 +17,20 @@ from std_msgs.msg import String
|
|||||||
|
|
||||||
|
|
||||||
class BagRecorderNode(Node):
|
class BagRecorderNode(Node):
|
||||||
"""ROS2 bag recording service for mission logging (Issue #488)."""
|
"""ROS2 bag recording service with circular buffer and storage management."""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__('saltybot_bag_recorder')
|
super().__init__('saltybot_bag_recorder')
|
||||||
|
|
||||||
# Configuration (Issue #488: mission logging)
|
# Configuration
|
||||||
default_bag_dir = str(Path.home() / 'saltybot-data' / 'bags')
|
self.declare_parameter('bag_dir', '/home/seb/rosbags')
|
||||||
self.declare_parameter('bag_dir', default_bag_dir)
|
self.declare_parameter('topics', [''])
|
||||||
self.declare_parameter('topics', [
|
|
||||||
'/scan',
|
|
||||||
'/cmd_vel',
|
|
||||||
'/odom',
|
|
||||||
'/tf',
|
|
||||||
'/camera/color/image_raw/compressed',
|
|
||||||
'/saltybot/diagnostics'
|
|
||||||
])
|
|
||||||
self.declare_parameter('buffer_duration_minutes', 30)
|
self.declare_parameter('buffer_duration_minutes', 30)
|
||||||
self.declare_parameter('storage_ttl_days', 7)
|
self.declare_parameter('storage_ttl_days', 7)
|
||||||
self.declare_parameter('max_storage_gb', 20)
|
self.declare_parameter('max_storage_gb', 50)
|
||||||
self.declare_parameter('enable_rsync', False)
|
self.declare_parameter('enable_rsync', False)
|
||||||
self.declare_parameter('rsync_destination', '')
|
self.declare_parameter('rsync_destination', '')
|
||||||
self.declare_parameter('storage_format', 'mcap')
|
self.declare_parameter('compression', 'zstd')
|
||||||
|
|
||||||
self.bag_dir = Path(self.get_parameter('bag_dir').value)
|
self.bag_dir = Path(self.get_parameter('bag_dir').value)
|
||||||
self.topics = self.get_parameter('topics').value
|
self.topics = self.get_parameter('topics').value
|
||||||
@ -47,7 +39,7 @@ class BagRecorderNode(Node):
|
|||||||
self.max_storage_gb = self.get_parameter('max_storage_gb').value
|
self.max_storage_gb = self.get_parameter('max_storage_gb').value
|
||||||
self.enable_rsync = self.get_parameter('enable_rsync').value
|
self.enable_rsync = self.get_parameter('enable_rsync').value
|
||||||
self.rsync_destination = self.get_parameter('rsync_destination').value
|
self.rsync_destination = self.get_parameter('rsync_destination').value
|
||||||
self.storage_format = self.get_parameter('storage_format').value
|
self.compression = self.get_parameter('compression').value
|
||||||
|
|
||||||
self.bag_dir.mkdir(parents=True, exist_ok=True)
|
self.bag_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
@ -59,9 +51,11 @@ class BagRecorderNode(Node):
|
|||||||
self.recording_lock = threading.Lock()
|
self.recording_lock = threading.Lock()
|
||||||
|
|
||||||
# Services
|
# Services
|
||||||
self.save_service = self.create_service(Trigger, '/saltybot/save_bag', self.save_bag_callback)
|
self.save_service = self.create_service(
|
||||||
self.start_service = self.create_service(Trigger, '/saltybot/start_recording', self.start_recording_callback)
|
Trigger,
|
||||||
self.stop_service = self.create_service(Trigger, '/saltybot/stop_recording', self.stop_recording_callback)
|
'/saltybot/save_bag',
|
||||||
|
self.save_bag_callback
|
||||||
|
)
|
||||||
|
|
||||||
# Watchdog to handle crash recovery
|
# Watchdog to handle crash recovery
|
||||||
self.setup_signal_handlers()
|
self.setup_signal_handlers()
|
||||||
@ -73,8 +67,9 @@ class BagRecorderNode(Node):
|
|||||||
self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback)
|
self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback)
|
||||||
|
|
||||||
self.get_logger().info(
|
self.get_logger().info(
|
||||||
f'Bag recorder initialized: {self.bag_dir}, format={self.storage_format}, '
|
f'Bag recorder initialized: {self.bag_dir}, '
|
||||||
f'buffer={self.buffer_duration}s, max={self.max_storage_gb}GB, topics={len(self.topics)}'
|
f'buffer={self.buffer_duration}s, ttl={self.storage_ttl_days}d, '
|
||||||
|
f'max={self.max_storage_gb}GB'
|
||||||
)
|
)
|
||||||
|
|
||||||
def setup_signal_handlers(self):
|
def setup_signal_handlers(self):
|
||||||
@ -100,21 +95,27 @@ class BagRecorderNode(Node):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# Build rosbag2 record command
|
# Build rosbag2 record command
|
||||||
cmd = ['ros2', 'bag', 'record', '--output', str(bag_path), '--storage', self.storage_format]
|
cmd = [
|
||||||
|
'ros2', 'bag', 'record',
|
||||||
|
f'--output', str(bag_path),
|
||||||
|
f'--compression-format', self.compression,
|
||||||
|
f'--compression-mode', 'file',
|
||||||
|
]
|
||||||
|
|
||||||
# Add compression for sqlite3 format
|
# Add topics or record all if empty
|
||||||
if self.storage_format == 'sqlite3':
|
|
||||||
cmd.extend(['--compression-format', 'zstd', '--compression-mode', 'file'])
|
|
||||||
|
|
||||||
# Add topics (required for mission logging)
|
|
||||||
if self.topics and self.topics[0]:
|
if self.topics and self.topics[0]:
|
||||||
cmd.extend(self.topics)
|
cmd.extend(self.topics)
|
||||||
else:
|
else:
|
||||||
cmd.extend(['/scan', '/cmd_vel', '/odom', '/tf', '/camera/color/image_raw/compressed', '/saltybot/diagnostics'])
|
cmd.append('--all')
|
||||||
|
|
||||||
self.current_bag_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
self.current_bag_process = subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE
|
||||||
|
)
|
||||||
self.is_recording = True
|
self.is_recording = True
|
||||||
self.buffer_bags.append(self.current_bag_name)
|
self.buffer_bags.append(self.current_bag_name)
|
||||||
|
|
||||||
self.get_logger().info(f'Started recording: {self.current_bag_name}')
|
self.get_logger().info(f'Started recording: {self.current_bag_name}')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -132,40 +133,7 @@ class BagRecorderNode(Node):
|
|||||||
response.success = False
|
response.success = False
|
||||||
response.message = f'Failed to save bag: {e}'
|
response.message = f'Failed to save bag: {e}'
|
||||||
self.get_logger().error(response.message)
|
self.get_logger().error(response.message)
|
||||||
return response
|
|
||||||
|
|
||||||
def start_recording_callback(self, request, response):
|
|
||||||
"""Service callback to start recording."""
|
|
||||||
if self.is_recording:
|
|
||||||
response.success = False
|
|
||||||
response.message = 'Recording already in progress'
|
|
||||||
return response
|
|
||||||
try:
|
|
||||||
self.start_recording()
|
|
||||||
response.success = True
|
|
||||||
response.message = f'Recording started: {self.current_bag_name}'
|
|
||||||
self.get_logger().info(response.message)
|
|
||||||
except Exception as e:
|
|
||||||
response.success = False
|
|
||||||
response.message = f'Failed to start recording: {e}'
|
|
||||||
self.get_logger().error(response.message)
|
|
||||||
return response
|
|
||||||
|
|
||||||
def stop_recording_callback(self, request, response):
|
|
||||||
"""Service callback to stop recording."""
|
|
||||||
if not self.is_recording:
|
|
||||||
response.success = False
|
|
||||||
response.message = 'No recording in progress'
|
|
||||||
return response
|
|
||||||
try:
|
|
||||||
self.stop_recording(save=True)
|
|
||||||
response.success = True
|
|
||||||
response.message = f'Recording stopped and saved: {self.current_bag_name}'
|
|
||||||
self.get_logger().info(response.message)
|
|
||||||
except Exception as e:
|
|
||||||
response.success = False
|
|
||||||
response.message = f'Failed to stop recording: {e}'
|
|
||||||
self.get_logger().error(response.message)
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
def stop_recording(self, save: bool = False):
|
def stop_recording(self, save: bool = False):
|
||||||
@ -173,7 +141,9 @@ class BagRecorderNode(Node):
|
|||||||
with self.recording_lock:
|
with self.recording_lock:
|
||||||
if not self.is_recording or not self.current_bag_process:
|
if not self.is_recording or not self.current_bag_process:
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Send SIGINT to gracefully close rosbag2
|
||||||
self.current_bag_process.send_signal(signal.SIGINT)
|
self.current_bag_process.send_signal(signal.SIGINT)
|
||||||
self.current_bag_process.wait(timeout=5.0)
|
self.current_bag_process.wait(timeout=5.0)
|
||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
@ -182,8 +152,11 @@ class BagRecorderNode(Node):
|
|||||||
self.current_bag_process.wait()
|
self.current_bag_process.wait()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.get_logger().error(f'Error stopping recording: {e}')
|
self.get_logger().error(f'Error stopping recording: {e}')
|
||||||
|
|
||||||
self.is_recording = False
|
self.is_recording = False
|
||||||
self.get_logger().info(f'Stopped recording: {self.current_bag_name}')
|
self.get_logger().info(f'Stopped recording: {self.current_bag_name}')
|
||||||
|
|
||||||
|
# Apply compression if needed (rosbag2 does this by default with -compression-mode file)
|
||||||
if save:
|
if save:
|
||||||
self.apply_compression()
|
self.apply_compression()
|
||||||
|
|
||||||
@ -191,9 +164,13 @@ class BagRecorderNode(Node):
|
|||||||
"""Compress the current bag using zstd."""
|
"""Compress the current bag using zstd."""
|
||||||
if not self.current_bag_name:
|
if not self.current_bag_name:
|
||||||
return
|
return
|
||||||
|
|
||||||
bag_path = self.bag_dir / self.current_bag_name
|
bag_path = self.bag_dir / self.current_bag_name
|
||||||
try:
|
try:
|
||||||
|
# rosbag2 with compression-mode file already compresses the sqlite db
|
||||||
|
# This is a secondary option to compress the entire directory
|
||||||
tar_path = f'{bag_path}.tar.zst'
|
tar_path = f'{bag_path}.tar.zst'
|
||||||
|
|
||||||
if bag_path.exists():
|
if bag_path.exists():
|
||||||
cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name]
|
cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name]
|
||||||
subprocess.run(cmd, check=True, capture_output=True, timeout=60)
|
subprocess.run(cmd, check=True, capture_output=True, timeout=60)
|
||||||
@ -212,11 +189,14 @@ class BagRecorderNode(Node):
|
|||||||
"""Remove bags older than TTL."""
|
"""Remove bags older than TTL."""
|
||||||
try:
|
try:
|
||||||
cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days)
|
cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days)
|
||||||
|
|
||||||
for item in self.bag_dir.iterdir():
|
for item in self.bag_dir.iterdir():
|
||||||
if item.is_dir() and item.name.startswith('saltybot_'):
|
if item.is_dir() and item.name.startswith('saltybot_'):
|
||||||
try:
|
try:
|
||||||
|
# Parse timestamp from directory name
|
||||||
timestamp_str = item.name.replace('saltybot_', '')
|
timestamp_str = item.name.replace('saltybot_', '')
|
||||||
item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
|
item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
|
||||||
|
|
||||||
if item_time < cutoff_time:
|
if item_time < cutoff_time:
|
||||||
shutil.rmtree(item, ignore_errors=True)
|
shutil.rmtree(item, ignore_errors=True)
|
||||||
self.get_logger().info(f'Removed expired bag: {item.name}')
|
self.get_logger().info(f'Removed expired bag: {item.name}')
|
||||||
@ -226,26 +206,51 @@ class BagRecorderNode(Node):
|
|||||||
self.get_logger().error(f'Cleanup failed: {e}')
|
self.get_logger().error(f'Cleanup failed: {e}')
|
||||||
|
|
||||||
def enforce_storage_quota(self):
|
def enforce_storage_quota(self):
|
||||||
"""Remove oldest bags if total size exceeds quota (FIFO)."""
|
"""Remove oldest bags if total size exceeds quota."""
|
||||||
try:
|
try:
|
||||||
total_size = sum(f.stat().st_size for f in self.bag_dir.rglob('*') if f.is_file()) / (1024 ** 3)
|
total_size = sum(
|
||||||
|
f.stat().st_size
|
||||||
|
for f in self.bag_dir.rglob('*')
|
||||||
|
if f.is_file()
|
||||||
|
) / (1024 ** 3) # Convert to GB
|
||||||
|
|
||||||
if total_size > self.max_storage_gb:
|
if total_size > self.max_storage_gb:
|
||||||
self.get_logger().warn(f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB')
|
self.get_logger().warn(
|
||||||
bags = sorted([d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')], key=lambda x: x.stat().st_mtime)
|
f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get bags sorted by modification time
|
||||||
|
bags = sorted(
|
||||||
|
[d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')],
|
||||||
|
key=lambda x: x.stat().st_mtime
|
||||||
|
)
|
||||||
|
|
||||||
|
# Remove oldest bags until under quota
|
||||||
for bag in bags:
|
for bag in bags:
|
||||||
if total_size <= self.max_storage_gb:
|
if total_size <= self.max_storage_gb:
|
||||||
break
|
break
|
||||||
bag_size = sum(f.stat().st_size for f in bag.rglob('*') if f.is_file()) / (1024 ** 3)
|
|
||||||
|
bag_size = sum(
|
||||||
|
f.stat().st_size
|
||||||
|
for f in bag.rglob('*')
|
||||||
|
if f.is_file()
|
||||||
|
) / (1024 ** 3)
|
||||||
|
|
||||||
shutil.rmtree(bag, ignore_errors=True)
|
shutil.rmtree(bag, ignore_errors=True)
|
||||||
total_size -= bag_size
|
total_size -= bag_size
|
||||||
self.get_logger().info(f'Removed bag to enforce quota: {bag.name}')
|
self.get_logger().info(f'Removed bag to enforce quota: {bag.name}')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.get_logger().error(f'Storage quota enforcement failed: {e}')
|
self.get_logger().error(f'Storage quota enforcement failed: {e}')
|
||||||
|
|
||||||
def rsync_bags(self):
|
def rsync_bags(self):
|
||||||
"""Optional: rsync bags to NAS."""
|
"""Optional: rsync bags to NAS."""
|
||||||
try:
|
try:
|
||||||
cmd = ['rsync', '-avz', '--delete', f'{self.bag_dir}/', self.rsync_destination]
|
cmd = [
|
||||||
|
'rsync', '-avz', '--delete',
|
||||||
|
f'{self.bag_dir}/',
|
||||||
|
self.rsync_destination
|
||||||
|
]
|
||||||
subprocess.run(cmd, check=False, timeout=300)
|
subprocess.run(cmd, check=False, timeout=300)
|
||||||
self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}')
|
self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}')
|
||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
@ -262,6 +267,7 @@ class BagRecorderNode(Node):
|
|||||||
def main(args=None):
|
def main(args=None):
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
node = BagRecorderNode()
|
node = BagRecorderNode()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
rclpy.spin(node)
|
rclpy.spin(node)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|||||||
@ -194,12 +194,6 @@ def generate_launch_description():
|
|||||||
description="Launch rosbridge WebSocket server (port 9090)",
|
description="Launch rosbridge WebSocket server (port 9090)",
|
||||||
)
|
)
|
||||||
|
|
||||||
enable_docking_arg = DeclareLaunchArgument(
|
|
||||||
"enable_docking",
|
|
||||||
default_value="true",
|
|
||||||
description="Launch autonomous docking behavior (auto-dock at 20% battery, Issue #489)",
|
|
||||||
)
|
|
||||||
|
|
||||||
follow_distance_arg = DeclareLaunchArgument(
|
follow_distance_arg = DeclareLaunchArgument(
|
||||||
"follow_distance",
|
"follow_distance",
|
||||||
default_value="1.5",
|
default_value="1.5",
|
||||||
@ -424,25 +418,6 @@ def generate_launch_description():
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
# ── t=7s Docking (auto-dock at 20% battery, Issue #489) ──────────────────
|
|
||||||
docking = TimerAction(
|
|
||||||
period=7.0,
|
|
||||||
actions=[
|
|
||||||
GroupAction(
|
|
||||||
condition=IfCondition(LaunchConfiguration("enable_docking")),
|
|
||||||
actions=[
|
|
||||||
LogInfo(msg="[full_stack] Starting autonomous docking (auto-trigger at 20% battery)"),
|
|
||||||
IncludeLaunchDescription(
|
|
||||||
_launch("saltybot_docking", "launch", "docking.launch.py"),
|
|
||||||
launch_arguments={
|
|
||||||
"battery_low_pct": "20.0",
|
|
||||||
}.items(),
|
|
||||||
),
|
|
||||||
],
|
|
||||||
),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── t=14s Nav2 (indoor only; SLAM needs ~8s to build initial map) ────────
|
# ── t=14s Nav2 (indoor only; SLAM needs ~8s to build initial map) ────────
|
||||||
nav2 = TimerAction(
|
nav2 = TimerAction(
|
||||||
period=14.0,
|
period=14.0,
|
||||||
@ -493,7 +468,6 @@ def generate_launch_description():
|
|||||||
enable_follower_arg,
|
enable_follower_arg,
|
||||||
enable_bridge_arg,
|
enable_bridge_arg,
|
||||||
enable_rosbridge_arg,
|
enable_rosbridge_arg,
|
||||||
enable_docking_arg,
|
|
||||||
follow_distance_arg,
|
follow_distance_arg,
|
||||||
max_linear_vel_arg,
|
max_linear_vel_arg,
|
||||||
uwb_port_a_arg,
|
uwb_port_a_arg,
|
||||||
@ -524,9 +498,6 @@ def generate_launch_description():
|
|||||||
perception,
|
perception,
|
||||||
object_detection,
|
object_detection,
|
||||||
|
|
||||||
# t=7s
|
|
||||||
docking,
|
|
||||||
|
|
||||||
# t=9s
|
# t=9s
|
||||||
follower,
|
follower,
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user