From 136ca1fb9c7139f74b8c5ea66591646a36080316 Mon Sep 17 00:00:00 2001 From: sl-mechanical Date: Wed, 4 Mar 2026 13:16:00 -0500 Subject: [PATCH] feat: Add Issue #400 - Encounter offline queue sync service Implement EncounterSyncService ROS2 node for managing offline-first encounter data syncing. Features: - Monitors /home/seb/encounter-queue/ for JSON files - Uploads to configurable cloud API when connectivity detected - Exponential backoff retry with max 5 attempts - Moves synced files to /home/seb/encounter-queue/synced/ - Publishes status on /social/encounter_sync_status topic - Connectivity check via HTTP ping (configurable URL) - Handles offline operation gracefully Co-Authored-By: Claude Haiku 4.5 --- .../saltybot_social/encounter_sync_service.py | 266 ++++++++++++++++++ 1 file changed, 266 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_social/saltybot_social/encounter_sync_service.py diff --git a/jetson/ros2_ws/src/saltybot_social/saltybot_social/encounter_sync_service.py b/jetson/ros2_ws/src/saltybot_social/saltybot_social/encounter_sync_service.py new file mode 100644 index 0000000..078d63d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/saltybot_social/encounter_sync_service.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 +"""Encounter data sync service for offline-first queue management. + +Monitors a local encounter queue directory and syncs JSON files to cloud +API when internet connectivity is available. Implements exponential backoff +retry strategy and manages processed files. + +Watched directory: /home/seb/encounter-queue/ +Synced directory: /home/seb/encounter-queue/synced/ + +Published topics: + /social/encounter_sync_status (std_msgs/String) - Sync status updates +""" + +import json +import os +import shutil +import time +from pathlib import Path +from typing import Optional +from datetime import datetime +import socket +import urllib.request +import urllib.error + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String + + +class EncounterSyncService(Node): + """ROS2 node for syncing encounter data to cloud API.""" + + def __init__(self): + super().__init__("encounter_sync_service") + + # Parameters + self.declare_parameter( + "api_url", + "https://api.openclaw.io/encounters", # Default placeholder + ) + self.declare_parameter("queue_dir", "/home/seb/encounter-queue") + self.declare_parameter("synced_subdir", "synced") + self.declare_parameter("check_interval", 5.0) # seconds + self.declare_parameter("connectivity_check_url", "https://www.google.com") + self.declare_parameter("connectivity_timeout", 3.0) # seconds + self.declare_parameter("max_retries", 5) + self.declare_parameter("initial_backoff", 1.0) # seconds + self.declare_parameter("max_backoff", 300.0) # 5 minutes + + self.api_url = self.get_parameter("api_url").value + self.queue_dir = Path(self.get_parameter("queue_dir").value) + self.synced_subdir = self.get_parameter("synced_subdir").value + self.check_interval = self.get_parameter("check_interval").value + self.connectivity_url = self.get_parameter("connectivity_check_url").value + self.connectivity_timeout = self.get_parameter("connectivity_timeout").value + self.max_retries = self.get_parameter("max_retries").value + self.initial_backoff = self.get_parameter("initial_backoff").value + self.max_backoff = self.get_parameter("max_backoff").value + + # Ensure queue directory exists + self.queue_dir.mkdir(parents=True, exist_ok=True) + self.synced_dir = self.queue_dir / self.synced_subdir + self.synced_dir.mkdir(parents=True, exist_ok=True) + + # Publisher for sync status + self.pub_status = self.create_publisher(String, "/social/encounter_sync_status", 10) + + # Track retry state per file + self.retry_counts = {} + self.backoff_times = {} + + # Main processing timer + self.create_timer(self.check_interval, self._sync_loop) + + self.get_logger().info( + f"Encounter sync service initialized. " + f"Queue: {self.queue_dir}, API: {self.api_url}" + ) + self._publish_status("initialized", f"Queue: {self.queue_dir}") + + def _sync_loop(self) -> None: + """Main loop: check connectivity and sync queued files.""" + # Check internet connectivity + if not self._check_connectivity(): + self._publish_status("offline", "No internet connectivity") + return + + self._publish_status("online", "Internet connectivity detected") + + # Get all JSON files in queue directory (not in synced subdirectory) + queued_files = [ + f + for f in self.queue_dir.glob("*.json") + if f.is_file() and not f.parent.name == self.synced_subdir + ] + + if not queued_files: + return + + self.get_logger().debug(f"Found {len(queued_files)} queued encounter files") + + for encounter_file in queued_files: + self._sync_file(encounter_file) + + def _check_connectivity(self) -> bool: + """Check internet connectivity via HTTP ping. + + Returns: + True if connected, False otherwise + """ + try: + request = urllib.request.Request( + self.connectivity_url, method="HEAD" + ) + with urllib.request.urlopen(request, timeout=self.connectivity_timeout): + return True + except (urllib.error.URLError, socket.timeout, OSError): + return False + + def _sync_file(self, filepath: Path) -> None: + """Attempt to sync a single encounter file, with exponential backoff retry. + + Args: + filepath: Path to JSON file to sync + """ + file_id = filepath.name + + # Check if we should retry this file + if file_id in self.retry_counts: + if self.retry_counts[file_id] >= self.max_retries: + self.get_logger().error( + f"Max retries exceeded for {file_id}, moving to synced with error flag" + ) + self._move_to_synced(filepath, error=True) + del self.retry_counts[file_id] + if file_id in self.backoff_times: + del self.backoff_times[file_id] + return + + # Check backoff timer + if file_id in self.backoff_times: + if time.time() < self.backoff_times[file_id]: + return # Not yet time to retry + + # Attempt upload + try: + with open(filepath, "r") as f: + encounter_data = json.load(f) + + success = self._upload_encounter(encounter_data) + + if success: + self.get_logger().info(f"Successfully synced {file_id}") + self._move_to_synced(filepath, error=False) + if file_id in self.retry_counts: + del self.retry_counts[file_id] + if file_id in self.backoff_times: + del self.backoff_times[file_id] + self._publish_status("synced", f"File: {file_id}") + else: + self._handle_sync_failure(file_id) + + except (json.JSONDecodeError, IOError) as e: + self.get_logger().error(f"Failed to read {file_id}: {e}") + self._move_to_synced(filepath, error=True) + if file_id in self.retry_counts: + del self.retry_counts[file_id] + if file_id in self.backoff_times: + del self.backoff_times[file_id] + + def _upload_encounter(self, encounter_data: dict) -> bool: + """Upload encounter data to cloud API. + + Args: + encounter_data: Encounter JSON data + + Returns: + True if successful, False otherwise + """ + try: + json_bytes = json.dumps(encounter_data).encode("utf-8") + request = urllib.request.Request( + self.api_url, + data=json_bytes, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=10.0) as response: + return response.status == 200 or response.status == 201 + except (urllib.error.URLError, socket.timeout, OSError, json.JSONEncodeError) as e: + self.get_logger().warning(f"Upload failed: {e}") + return False + + def _handle_sync_failure(self, file_id: str) -> None: + """Handle sync failure with exponential backoff. + + Args: + file_id: Filename identifier + """ + if file_id not in self.retry_counts: + self.retry_counts[file_id] = 0 + self.backoff_times[file_id] = 0 + + self.retry_counts[file_id] += 1 + backoff = min( + self.initial_backoff * (2 ** (self.retry_counts[file_id] - 1)), + self.max_backoff, + ) + self.backoff_times[file_id] = time.time() + backoff + + self.get_logger().warning( + f"Sync failed for {file_id}, retry {self.retry_counts[file_id]}/{self.max_retries} " + f"in {backoff:.1f}s" + ) + self._publish_status( + "retry", + f"File: {file_id}, attempt {self.retry_counts[file_id]}/{self.max_retries}", + ) + + def _move_to_synced(self, filepath: Path, error: bool = False) -> None: + """Move processed file to synced directory. + + Args: + filepath: Path to file + error: Whether file had an error during sync + """ + timestamp = datetime.now().isoformat() + status_suffix = "_error" if error else "" + new_name = f"{filepath.stem}_{timestamp}{status_suffix}.json" + dest_path = self.synced_dir / new_name + + try: + shutil.move(str(filepath), str(dest_path)) + self.get_logger().debug(f"Moved {filepath.name} to synced/") + except OSError as e: + self.get_logger().error(f"Failed to move {filepath.name} to synced: {e}") + + def _publish_status(self, status: str, details: str = "") -> None: + """Publish sync status update. + + Args: + status: Status string (e.g., 'online', 'offline', 'synced', 'retry') + details: Additional details + """ + timestamp = datetime.now().isoformat() + message = f"{timestamp} | {status.upper()} | {details}" if details else timestamp + msg = String() + msg.data = message + self.pub_status.publish(msg) + + +def main(args=None): + rclpy.init(args=args) + node = EncounterSyncService() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() -- 2.47.2