feat: encounter offline queue sync service (Issue #400) #406

Merged
sl-jetson merged 1 commits from sl-mechanical/issue-400-encounter-queue into main 2026-03-04 13:31:04 -05:00

View File

@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""Encounter data sync service for offline-first queue management.
Monitors a local encounter queue directory and syncs JSON files to cloud
API when internet connectivity is available. Implements exponential backoff
retry strategy and manages processed files.
Watched directory: /home/seb/encounter-queue/
Synced directory: /home/seb/encounter-queue/synced/
Published topics:
/social/encounter_sync_status (std_msgs/String) - Sync status updates
"""
import json
import os
import shutil
import time
from pathlib import Path
from typing import Optional
from datetime import datetime
import socket
import urllib.request
import urllib.error
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class EncounterSyncService(Node):
"""ROS2 node for syncing encounter data to cloud API."""
def __init__(self):
super().__init__("encounter_sync_service")
# Parameters
self.declare_parameter(
"api_url",
"https://api.openclaw.io/encounters", # Default placeholder
)
self.declare_parameter("queue_dir", "/home/seb/encounter-queue")
self.declare_parameter("synced_subdir", "synced")
self.declare_parameter("check_interval", 5.0) # seconds
self.declare_parameter("connectivity_check_url", "https://www.google.com")
self.declare_parameter("connectivity_timeout", 3.0) # seconds
self.declare_parameter("max_retries", 5)
self.declare_parameter("initial_backoff", 1.0) # seconds
self.declare_parameter("max_backoff", 300.0) # 5 minutes
self.api_url = self.get_parameter("api_url").value
self.queue_dir = Path(self.get_parameter("queue_dir").value)
self.synced_subdir = self.get_parameter("synced_subdir").value
self.check_interval = self.get_parameter("check_interval").value
self.connectivity_url = self.get_parameter("connectivity_check_url").value
self.connectivity_timeout = self.get_parameter("connectivity_timeout").value
self.max_retries = self.get_parameter("max_retries").value
self.initial_backoff = self.get_parameter("initial_backoff").value
self.max_backoff = self.get_parameter("max_backoff").value
# Ensure queue directory exists
self.queue_dir.mkdir(parents=True, exist_ok=True)
self.synced_dir = self.queue_dir / self.synced_subdir
self.synced_dir.mkdir(parents=True, exist_ok=True)
# Publisher for sync status
self.pub_status = self.create_publisher(String, "/social/encounter_sync_status", 10)
# Track retry state per file
self.retry_counts = {}
self.backoff_times = {}
# Main processing timer
self.create_timer(self.check_interval, self._sync_loop)
self.get_logger().info(
f"Encounter sync service initialized. "
f"Queue: {self.queue_dir}, API: {self.api_url}"
)
self._publish_status("initialized", f"Queue: {self.queue_dir}")
def _sync_loop(self) -> None:
"""Main loop: check connectivity and sync queued files."""
# Check internet connectivity
if not self._check_connectivity():
self._publish_status("offline", "No internet connectivity")
return
self._publish_status("online", "Internet connectivity detected")
# Get all JSON files in queue directory (not in synced subdirectory)
queued_files = [
f
for f in self.queue_dir.glob("*.json")
if f.is_file() and not f.parent.name == self.synced_subdir
]
if not queued_files:
return
self.get_logger().debug(f"Found {len(queued_files)} queued encounter files")
for encounter_file in queued_files:
self._sync_file(encounter_file)
def _check_connectivity(self) -> bool:
"""Check internet connectivity via HTTP ping.
Returns:
True if connected, False otherwise
"""
try:
request = urllib.request.Request(
self.connectivity_url, method="HEAD"
)
with urllib.request.urlopen(request, timeout=self.connectivity_timeout):
return True
except (urllib.error.URLError, socket.timeout, OSError):
return False
def _sync_file(self, filepath: Path) -> None:
"""Attempt to sync a single encounter file, with exponential backoff retry.
Args:
filepath: Path to JSON file to sync
"""
file_id = filepath.name
# Check if we should retry this file
if file_id in self.retry_counts:
if self.retry_counts[file_id] >= self.max_retries:
self.get_logger().error(
f"Max retries exceeded for {file_id}, moving to synced with error flag"
)
self._move_to_synced(filepath, error=True)
del self.retry_counts[file_id]
if file_id in self.backoff_times:
del self.backoff_times[file_id]
return
# Check backoff timer
if file_id in self.backoff_times:
if time.time() < self.backoff_times[file_id]:
return # Not yet time to retry
# Attempt upload
try:
with open(filepath, "r") as f:
encounter_data = json.load(f)
success = self._upload_encounter(encounter_data)
if success:
self.get_logger().info(f"Successfully synced {file_id}")
self._move_to_synced(filepath, error=False)
if file_id in self.retry_counts:
del self.retry_counts[file_id]
if file_id in self.backoff_times:
del self.backoff_times[file_id]
self._publish_status("synced", f"File: {file_id}")
else:
self._handle_sync_failure(file_id)
except (json.JSONDecodeError, IOError) as e:
self.get_logger().error(f"Failed to read {file_id}: {e}")
self._move_to_synced(filepath, error=True)
if file_id in self.retry_counts:
del self.retry_counts[file_id]
if file_id in self.backoff_times:
del self.backoff_times[file_id]
def _upload_encounter(self, encounter_data: dict) -> bool:
"""Upload encounter data to cloud API.
Args:
encounter_data: Encounter JSON data
Returns:
True if successful, False otherwise
"""
try:
json_bytes = json.dumps(encounter_data).encode("utf-8")
request = urllib.request.Request(
self.api_url,
data=json_bytes,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=10.0) as response:
return response.status == 200 or response.status == 201
except (urllib.error.URLError, socket.timeout, OSError, json.JSONEncodeError) as e:
self.get_logger().warning(f"Upload failed: {e}")
return False
def _handle_sync_failure(self, file_id: str) -> None:
"""Handle sync failure with exponential backoff.
Args:
file_id: Filename identifier
"""
if file_id not in self.retry_counts:
self.retry_counts[file_id] = 0
self.backoff_times[file_id] = 0
self.retry_counts[file_id] += 1
backoff = min(
self.initial_backoff * (2 ** (self.retry_counts[file_id] - 1)),
self.max_backoff,
)
self.backoff_times[file_id] = time.time() + backoff
self.get_logger().warning(
f"Sync failed for {file_id}, retry {self.retry_counts[file_id]}/{self.max_retries} "
f"in {backoff:.1f}s"
)
self._publish_status(
"retry",
f"File: {file_id}, attempt {self.retry_counts[file_id]}/{self.max_retries}",
)
def _move_to_synced(self, filepath: Path, error: bool = False) -> None:
"""Move processed file to synced directory.
Args:
filepath: Path to file
error: Whether file had an error during sync
"""
timestamp = datetime.now().isoformat()
status_suffix = "_error" if error else ""
new_name = f"{filepath.stem}_{timestamp}{status_suffix}.json"
dest_path = self.synced_dir / new_name
try:
shutil.move(str(filepath), str(dest_path))
self.get_logger().debug(f"Moved {filepath.name} to synced/")
except OSError as e:
self.get_logger().error(f"Failed to move {filepath.name} to synced: {e}")
def _publish_status(self, status: str, details: str = "") -> None:
"""Publish sync status update.
Args:
status: Status string (e.g., 'online', 'offline', 'synced', 'retry')
details: Additional details
"""
timestamp = datetime.now().isoformat()
message = f"{timestamp} | {status.upper()} | {details}" if details else timestamp
msg = String()
msg.data = message
self.pub_status.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = EncounterSyncService()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()