feat: encounter offline queue sync service (Issue #400) #406
@ -0,0 +1,266 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Encounter data sync service for offline-first queue management.
|
||||
|
||||
Monitors a local encounter queue directory and syncs JSON files to cloud
|
||||
API when internet connectivity is available. Implements exponential backoff
|
||||
retry strategy and manages processed files.
|
||||
|
||||
Watched directory: /home/seb/encounter-queue/
|
||||
Synced directory: /home/seb/encounter-queue/synced/
|
||||
|
||||
Published topics:
|
||||
/social/encounter_sync_status (std_msgs/String) - Sync status updates
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from datetime import datetime
|
||||
import socket
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from std_msgs.msg import String
|
||||
|
||||
|
||||
class EncounterSyncService(Node):
|
||||
"""ROS2 node for syncing encounter data to cloud API."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("encounter_sync_service")
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter(
|
||||
"api_url",
|
||||
"https://api.openclaw.io/encounters", # Default placeholder
|
||||
)
|
||||
self.declare_parameter("queue_dir", "/home/seb/encounter-queue")
|
||||
self.declare_parameter("synced_subdir", "synced")
|
||||
self.declare_parameter("check_interval", 5.0) # seconds
|
||||
self.declare_parameter("connectivity_check_url", "https://www.google.com")
|
||||
self.declare_parameter("connectivity_timeout", 3.0) # seconds
|
||||
self.declare_parameter("max_retries", 5)
|
||||
self.declare_parameter("initial_backoff", 1.0) # seconds
|
||||
self.declare_parameter("max_backoff", 300.0) # 5 minutes
|
||||
|
||||
self.api_url = self.get_parameter("api_url").value
|
||||
self.queue_dir = Path(self.get_parameter("queue_dir").value)
|
||||
self.synced_subdir = self.get_parameter("synced_subdir").value
|
||||
self.check_interval = self.get_parameter("check_interval").value
|
||||
self.connectivity_url = self.get_parameter("connectivity_check_url").value
|
||||
self.connectivity_timeout = self.get_parameter("connectivity_timeout").value
|
||||
self.max_retries = self.get_parameter("max_retries").value
|
||||
self.initial_backoff = self.get_parameter("initial_backoff").value
|
||||
self.max_backoff = self.get_parameter("max_backoff").value
|
||||
|
||||
# Ensure queue directory exists
|
||||
self.queue_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.synced_dir = self.queue_dir / self.synced_subdir
|
||||
self.synced_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Publisher for sync status
|
||||
self.pub_status = self.create_publisher(String, "/social/encounter_sync_status", 10)
|
||||
|
||||
# Track retry state per file
|
||||
self.retry_counts = {}
|
||||
self.backoff_times = {}
|
||||
|
||||
# Main processing timer
|
||||
self.create_timer(self.check_interval, self._sync_loop)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Encounter sync service initialized. "
|
||||
f"Queue: {self.queue_dir}, API: {self.api_url}"
|
||||
)
|
||||
self._publish_status("initialized", f"Queue: {self.queue_dir}")
|
||||
|
||||
def _sync_loop(self) -> None:
|
||||
"""Main loop: check connectivity and sync queued files."""
|
||||
# Check internet connectivity
|
||||
if not self._check_connectivity():
|
||||
self._publish_status("offline", "No internet connectivity")
|
||||
return
|
||||
|
||||
self._publish_status("online", "Internet connectivity detected")
|
||||
|
||||
# Get all JSON files in queue directory (not in synced subdirectory)
|
||||
queued_files = [
|
||||
f
|
||||
for f in self.queue_dir.glob("*.json")
|
||||
if f.is_file() and not f.parent.name == self.synced_subdir
|
||||
]
|
||||
|
||||
if not queued_files:
|
||||
return
|
||||
|
||||
self.get_logger().debug(f"Found {len(queued_files)} queued encounter files")
|
||||
|
||||
for encounter_file in queued_files:
|
||||
self._sync_file(encounter_file)
|
||||
|
||||
def _check_connectivity(self) -> bool:
|
||||
"""Check internet connectivity via HTTP ping.
|
||||
|
||||
Returns:
|
||||
True if connected, False otherwise
|
||||
"""
|
||||
try:
|
||||
request = urllib.request.Request(
|
||||
self.connectivity_url, method="HEAD"
|
||||
)
|
||||
with urllib.request.urlopen(request, timeout=self.connectivity_timeout):
|
||||
return True
|
||||
except (urllib.error.URLError, socket.timeout, OSError):
|
||||
return False
|
||||
|
||||
def _sync_file(self, filepath: Path) -> None:
|
||||
"""Attempt to sync a single encounter file, with exponential backoff retry.
|
||||
|
||||
Args:
|
||||
filepath: Path to JSON file to sync
|
||||
"""
|
||||
file_id = filepath.name
|
||||
|
||||
# Check if we should retry this file
|
||||
if file_id in self.retry_counts:
|
||||
if self.retry_counts[file_id] >= self.max_retries:
|
||||
self.get_logger().error(
|
||||
f"Max retries exceeded for {file_id}, moving to synced with error flag"
|
||||
)
|
||||
self._move_to_synced(filepath, error=True)
|
||||
del self.retry_counts[file_id]
|
||||
if file_id in self.backoff_times:
|
||||
del self.backoff_times[file_id]
|
||||
return
|
||||
|
||||
# Check backoff timer
|
||||
if file_id in self.backoff_times:
|
||||
if time.time() < self.backoff_times[file_id]:
|
||||
return # Not yet time to retry
|
||||
|
||||
# Attempt upload
|
||||
try:
|
||||
with open(filepath, "r") as f:
|
||||
encounter_data = json.load(f)
|
||||
|
||||
success = self._upload_encounter(encounter_data)
|
||||
|
||||
if success:
|
||||
self.get_logger().info(f"Successfully synced {file_id}")
|
||||
self._move_to_synced(filepath, error=False)
|
||||
if file_id in self.retry_counts:
|
||||
del self.retry_counts[file_id]
|
||||
if file_id in self.backoff_times:
|
||||
del self.backoff_times[file_id]
|
||||
self._publish_status("synced", f"File: {file_id}")
|
||||
else:
|
||||
self._handle_sync_failure(file_id)
|
||||
|
||||
except (json.JSONDecodeError, IOError) as e:
|
||||
self.get_logger().error(f"Failed to read {file_id}: {e}")
|
||||
self._move_to_synced(filepath, error=True)
|
||||
if file_id in self.retry_counts:
|
||||
del self.retry_counts[file_id]
|
||||
if file_id in self.backoff_times:
|
||||
del self.backoff_times[file_id]
|
||||
|
||||
def _upload_encounter(self, encounter_data: dict) -> bool:
|
||||
"""Upload encounter data to cloud API.
|
||||
|
||||
Args:
|
||||
encounter_data: Encounter JSON data
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
json_bytes = json.dumps(encounter_data).encode("utf-8")
|
||||
request = urllib.request.Request(
|
||||
self.api_url,
|
||||
data=json_bytes,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
with urllib.request.urlopen(request, timeout=10.0) as response:
|
||||
return response.status == 200 or response.status == 201
|
||||
except (urllib.error.URLError, socket.timeout, OSError, json.JSONEncodeError) as e:
|
||||
self.get_logger().warning(f"Upload failed: {e}")
|
||||
return False
|
||||
|
||||
def _handle_sync_failure(self, file_id: str) -> None:
|
||||
"""Handle sync failure with exponential backoff.
|
||||
|
||||
Args:
|
||||
file_id: Filename identifier
|
||||
"""
|
||||
if file_id not in self.retry_counts:
|
||||
self.retry_counts[file_id] = 0
|
||||
self.backoff_times[file_id] = 0
|
||||
|
||||
self.retry_counts[file_id] += 1
|
||||
backoff = min(
|
||||
self.initial_backoff * (2 ** (self.retry_counts[file_id] - 1)),
|
||||
self.max_backoff,
|
||||
)
|
||||
self.backoff_times[file_id] = time.time() + backoff
|
||||
|
||||
self.get_logger().warning(
|
||||
f"Sync failed for {file_id}, retry {self.retry_counts[file_id]}/{self.max_retries} "
|
||||
f"in {backoff:.1f}s"
|
||||
)
|
||||
self._publish_status(
|
||||
"retry",
|
||||
f"File: {file_id}, attempt {self.retry_counts[file_id]}/{self.max_retries}",
|
||||
)
|
||||
|
||||
def _move_to_synced(self, filepath: Path, error: bool = False) -> None:
|
||||
"""Move processed file to synced directory.
|
||||
|
||||
Args:
|
||||
filepath: Path to file
|
||||
error: Whether file had an error during sync
|
||||
"""
|
||||
timestamp = datetime.now().isoformat()
|
||||
status_suffix = "_error" if error else ""
|
||||
new_name = f"{filepath.stem}_{timestamp}{status_suffix}.json"
|
||||
dest_path = self.synced_dir / new_name
|
||||
|
||||
try:
|
||||
shutil.move(str(filepath), str(dest_path))
|
||||
self.get_logger().debug(f"Moved {filepath.name} to synced/")
|
||||
except OSError as e:
|
||||
self.get_logger().error(f"Failed to move {filepath.name} to synced: {e}")
|
||||
|
||||
def _publish_status(self, status: str, details: str = "") -> None:
|
||||
"""Publish sync status update.
|
||||
|
||||
Args:
|
||||
status: Status string (e.g., 'online', 'offline', 'synced', 'retry')
|
||||
details: Additional details
|
||||
"""
|
||||
timestamp = datetime.now().isoformat()
|
||||
message = f"{timestamp} | {status.upper()} | {details}" if details else timestamp
|
||||
msg = String()
|
||||
msg.data = message
|
||||
self.pub_status.publish(msg)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = EncounterSyncService()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
x
Reference in New Issue
Block a user