feat: Add Issue #400 - Encounter offline queue sync service
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 4s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 4s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Implement EncounterSyncService ROS2 node for managing offline-first encounter data syncing. Features: - Monitors /home/seb/encounter-queue/ for JSON files - Uploads to configurable cloud API when connectivity detected - Exponential backoff retry with max 5 attempts - Moves synced files to /home/seb/encounter-queue/synced/ - Publishes status on /social/encounter_sync_status topic - Connectivity check via HTTP ping (configurable URL) - Handles offline operation gracefully Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
b2c8f76114
commit
020adf1b94
@ -0,0 +1,266 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Encounter data sync service for offline-first queue management.
|
||||||
|
|
||||||
|
Monitors a local encounter queue directory and syncs JSON files to cloud
|
||||||
|
API when internet connectivity is available. Implements exponential backoff
|
||||||
|
retry strategy and manages processed files.
|
||||||
|
|
||||||
|
Watched directory: /home/seb/encounter-queue/
|
||||||
|
Synced directory: /home/seb/encounter-queue/synced/
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/social/encounter_sync_status (std_msgs/String) - Sync status updates
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
from datetime import datetime
|
||||||
|
import socket
|
||||||
|
import urllib.request
|
||||||
|
import urllib.error
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
|
||||||
|
class EncounterSyncService(Node):
|
||||||
|
"""ROS2 node for syncing encounter data to cloud API."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("encounter_sync_service")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter(
|
||||||
|
"api_url",
|
||||||
|
"https://api.openclaw.io/encounters", # Default placeholder
|
||||||
|
)
|
||||||
|
self.declare_parameter("queue_dir", "/home/seb/encounter-queue")
|
||||||
|
self.declare_parameter("synced_subdir", "synced")
|
||||||
|
self.declare_parameter("check_interval", 5.0) # seconds
|
||||||
|
self.declare_parameter("connectivity_check_url", "https://www.google.com")
|
||||||
|
self.declare_parameter("connectivity_timeout", 3.0) # seconds
|
||||||
|
self.declare_parameter("max_retries", 5)
|
||||||
|
self.declare_parameter("initial_backoff", 1.0) # seconds
|
||||||
|
self.declare_parameter("max_backoff", 300.0) # 5 minutes
|
||||||
|
|
||||||
|
self.api_url = self.get_parameter("api_url").value
|
||||||
|
self.queue_dir = Path(self.get_parameter("queue_dir").value)
|
||||||
|
self.synced_subdir = self.get_parameter("synced_subdir").value
|
||||||
|
self.check_interval = self.get_parameter("check_interval").value
|
||||||
|
self.connectivity_url = self.get_parameter("connectivity_check_url").value
|
||||||
|
self.connectivity_timeout = self.get_parameter("connectivity_timeout").value
|
||||||
|
self.max_retries = self.get_parameter("max_retries").value
|
||||||
|
self.initial_backoff = self.get_parameter("initial_backoff").value
|
||||||
|
self.max_backoff = self.get_parameter("max_backoff").value
|
||||||
|
|
||||||
|
# Ensure queue directory exists
|
||||||
|
self.queue_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.synced_dir = self.queue_dir / self.synced_subdir
|
||||||
|
self.synced_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Publisher for sync status
|
||||||
|
self.pub_status = self.create_publisher(String, "/social/encounter_sync_status", 10)
|
||||||
|
|
||||||
|
# Track retry state per file
|
||||||
|
self.retry_counts = {}
|
||||||
|
self.backoff_times = {}
|
||||||
|
|
||||||
|
# Main processing timer
|
||||||
|
self.create_timer(self.check_interval, self._sync_loop)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Encounter sync service initialized. "
|
||||||
|
f"Queue: {self.queue_dir}, API: {self.api_url}"
|
||||||
|
)
|
||||||
|
self._publish_status("initialized", f"Queue: {self.queue_dir}")
|
||||||
|
|
||||||
|
def _sync_loop(self) -> None:
|
||||||
|
"""Main loop: check connectivity and sync queued files."""
|
||||||
|
# Check internet connectivity
|
||||||
|
if not self._check_connectivity():
|
||||||
|
self._publish_status("offline", "No internet connectivity")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._publish_status("online", "Internet connectivity detected")
|
||||||
|
|
||||||
|
# Get all JSON files in queue directory (not in synced subdirectory)
|
||||||
|
queued_files = [
|
||||||
|
f
|
||||||
|
for f in self.queue_dir.glob("*.json")
|
||||||
|
if f.is_file() and not f.parent.name == self.synced_subdir
|
||||||
|
]
|
||||||
|
|
||||||
|
if not queued_files:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.get_logger().debug(f"Found {len(queued_files)} queued encounter files")
|
||||||
|
|
||||||
|
for encounter_file in queued_files:
|
||||||
|
self._sync_file(encounter_file)
|
||||||
|
|
||||||
|
def _check_connectivity(self) -> bool:
|
||||||
|
"""Check internet connectivity via HTTP ping.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if connected, False otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
request = urllib.request.Request(
|
||||||
|
self.connectivity_url, method="HEAD"
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(request, timeout=self.connectivity_timeout):
|
||||||
|
return True
|
||||||
|
except (urllib.error.URLError, socket.timeout, OSError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _sync_file(self, filepath: Path) -> None:
|
||||||
|
"""Attempt to sync a single encounter file, with exponential backoff retry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: Path to JSON file to sync
|
||||||
|
"""
|
||||||
|
file_id = filepath.name
|
||||||
|
|
||||||
|
# Check if we should retry this file
|
||||||
|
if file_id in self.retry_counts:
|
||||||
|
if self.retry_counts[file_id] >= self.max_retries:
|
||||||
|
self.get_logger().error(
|
||||||
|
f"Max retries exceeded for {file_id}, moving to synced with error flag"
|
||||||
|
)
|
||||||
|
self._move_to_synced(filepath, error=True)
|
||||||
|
del self.retry_counts[file_id]
|
||||||
|
if file_id in self.backoff_times:
|
||||||
|
del self.backoff_times[file_id]
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check backoff timer
|
||||||
|
if file_id in self.backoff_times:
|
||||||
|
if time.time() < self.backoff_times[file_id]:
|
||||||
|
return # Not yet time to retry
|
||||||
|
|
||||||
|
# Attempt upload
|
||||||
|
try:
|
||||||
|
with open(filepath, "r") as f:
|
||||||
|
encounter_data = json.load(f)
|
||||||
|
|
||||||
|
success = self._upload_encounter(encounter_data)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
self.get_logger().info(f"Successfully synced {file_id}")
|
||||||
|
self._move_to_synced(filepath, error=False)
|
||||||
|
if file_id in self.retry_counts:
|
||||||
|
del self.retry_counts[file_id]
|
||||||
|
if file_id in self.backoff_times:
|
||||||
|
del self.backoff_times[file_id]
|
||||||
|
self._publish_status("synced", f"File: {file_id}")
|
||||||
|
else:
|
||||||
|
self._handle_sync_failure(file_id)
|
||||||
|
|
||||||
|
except (json.JSONDecodeError, IOError) as e:
|
||||||
|
self.get_logger().error(f"Failed to read {file_id}: {e}")
|
||||||
|
self._move_to_synced(filepath, error=True)
|
||||||
|
if file_id in self.retry_counts:
|
||||||
|
del self.retry_counts[file_id]
|
||||||
|
if file_id in self.backoff_times:
|
||||||
|
del self.backoff_times[file_id]
|
||||||
|
|
||||||
|
def _upload_encounter(self, encounter_data: dict) -> bool:
|
||||||
|
"""Upload encounter data to cloud API.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
encounter_data: Encounter JSON data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if successful, False otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
json_bytes = json.dumps(encounter_data).encode("utf-8")
|
||||||
|
request = urllib.request.Request(
|
||||||
|
self.api_url,
|
||||||
|
data=json_bytes,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
method="POST",
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(request, timeout=10.0) as response:
|
||||||
|
return response.status == 200 or response.status == 201
|
||||||
|
except (urllib.error.URLError, socket.timeout, OSError, json.JSONEncodeError) as e:
|
||||||
|
self.get_logger().warning(f"Upload failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _handle_sync_failure(self, file_id: str) -> None:
|
||||||
|
"""Handle sync failure with exponential backoff.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_id: Filename identifier
|
||||||
|
"""
|
||||||
|
if file_id not in self.retry_counts:
|
||||||
|
self.retry_counts[file_id] = 0
|
||||||
|
self.backoff_times[file_id] = 0
|
||||||
|
|
||||||
|
self.retry_counts[file_id] += 1
|
||||||
|
backoff = min(
|
||||||
|
self.initial_backoff * (2 ** (self.retry_counts[file_id] - 1)),
|
||||||
|
self.max_backoff,
|
||||||
|
)
|
||||||
|
self.backoff_times[file_id] = time.time() + backoff
|
||||||
|
|
||||||
|
self.get_logger().warning(
|
||||||
|
f"Sync failed for {file_id}, retry {self.retry_counts[file_id]}/{self.max_retries} "
|
||||||
|
f"in {backoff:.1f}s"
|
||||||
|
)
|
||||||
|
self._publish_status(
|
||||||
|
"retry",
|
||||||
|
f"File: {file_id}, attempt {self.retry_counts[file_id]}/{self.max_retries}",
|
||||||
|
)
|
||||||
|
|
||||||
|
def _move_to_synced(self, filepath: Path, error: bool = False) -> None:
|
||||||
|
"""Move processed file to synced directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: Path to file
|
||||||
|
error: Whether file had an error during sync
|
||||||
|
"""
|
||||||
|
timestamp = datetime.now().isoformat()
|
||||||
|
status_suffix = "_error" if error else ""
|
||||||
|
new_name = f"{filepath.stem}_{timestamp}{status_suffix}.json"
|
||||||
|
dest_path = self.synced_dir / new_name
|
||||||
|
|
||||||
|
try:
|
||||||
|
shutil.move(str(filepath), str(dest_path))
|
||||||
|
self.get_logger().debug(f"Moved {filepath.name} to synced/")
|
||||||
|
except OSError as e:
|
||||||
|
self.get_logger().error(f"Failed to move {filepath.name} to synced: {e}")
|
||||||
|
|
||||||
|
def _publish_status(self, status: str, details: str = "") -> None:
|
||||||
|
"""Publish sync status update.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
status: Status string (e.g., 'online', 'offline', 'synced', 'retry')
|
||||||
|
details: Additional details
|
||||||
|
"""
|
||||||
|
timestamp = datetime.now().isoformat()
|
||||||
|
message = f"{timestamp} | {status.upper()} | {details}" if details else timestamp
|
||||||
|
msg = String()
|
||||||
|
msg.data = message
|
||||||
|
self.pub_status.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = EncounterSyncService()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,400 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""social_enrollment_node.py -- First Encounter enrollment with face + voice biometrics.
|
||||||
|
|
||||||
|
Triggered by FirstEncounterOrchestrator when state transitions to ENROLL.
|
||||||
|
Captures:
|
||||||
|
- Face embedding (via SCRFD + ArcFace from RealSense RGB)
|
||||||
|
- Voice speaker embedding (via ECAPA-TDNN)
|
||||||
|
- RealSense RGB photo snapshot
|
||||||
|
- Metadata (name, context, timestamp)
|
||||||
|
|
||||||
|
Stores to:
|
||||||
|
- /home/seb/encounter-queue/{person_id}_{timestamp}.json (for offline cloud sync)
|
||||||
|
- Local speaker_embeddings.json (for immediate voice recognition)
|
||||||
|
- Face gallery (via EnrollPerson service to face_recognizer)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import numpy as np
|
||||||
|
from pathlib import Path
|
||||||
|
from dataclasses import dataclass, asdict
|
||||||
|
from typing import Optional
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
|
||||||
|
from std_msgs.msg import String
|
||||||
|
from sensor_msgs.msg import Image
|
||||||
|
import cv2
|
||||||
|
from cv_bridge import CvBridge
|
||||||
|
|
||||||
|
from saltybot_social_msgs.msg import FaceEmbeddingArray
|
||||||
|
from saltybot_social_msgs.srv import EnrollPerson
|
||||||
|
from saltybot_social_enrollment.person_db import PersonDB
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EnrollmentRequest:
|
||||||
|
"""Enrollment request from first encounter."""
|
||||||
|
person_id: str
|
||||||
|
name: str
|
||||||
|
context: Optional[str] = None
|
||||||
|
timestamp: float = 0.0
|
||||||
|
face_embedding: Optional[np.ndarray] = None
|
||||||
|
voice_embedding: Optional[np.ndarray] = None
|
||||||
|
photo_data: Optional[bytes] = None
|
||||||
|
|
||||||
|
|
||||||
|
class SocialEnrollmentNode(Node):
|
||||||
|
"""Face + voice enrollment during first encounter."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('social_enrollment')
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter('encounter_queue_dir', '/home/seb/encounter-queue')
|
||||||
|
self.declare_parameter('speaker_embeddings_path', '/home/seb/speaker_embeddings.json')
|
||||||
|
self.declare_parameter('photos_dir', '/home/seb/encounter-photos')
|
||||||
|
self.declare_parameter('face_recognizer_service', '/social/face_recognizer/enroll')
|
||||||
|
self.declare_parameter('embedding_dim_face', 512)
|
||||||
|
self.declare_parameter('embedding_dim_voice', 192)
|
||||||
|
|
||||||
|
self.queue_dir = Path(self.get_parameter('encounter_queue_dir').value)
|
||||||
|
self.speaker_embeddings_path = Path(self.get_parameter('speaker_embeddings_path').value)
|
||||||
|
self.photos_dir = Path(self.get_parameter('photos_dir').value)
|
||||||
|
self.face_service_name = self.get_parameter('face_recognizer_service').value
|
||||||
|
self.face_emb_dim = self.get_parameter('embedding_dim_face').value
|
||||||
|
self.voice_emb_dim = self.get_parameter('embedding_dim_voice').value
|
||||||
|
|
||||||
|
# Create directories
|
||||||
|
self.queue_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.photos_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Initialize PersonDB
|
||||||
|
self._db = PersonDB(str(self.queue_dir.parent / 'persons.db'))
|
||||||
|
self.get_logger().info(f'PersonDB initialized')
|
||||||
|
|
||||||
|
# CV bridge for image conversion
|
||||||
|
self._bridge = CvBridge()
|
||||||
|
|
||||||
|
# State
|
||||||
|
self._enrollment_request: Optional[EnrollmentRequest] = None
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._latest_face_embedding: Optional[np.ndarray] = None
|
||||||
|
self._latest_voice_embedding: Optional[np.ndarray] = None
|
||||||
|
self._latest_image: Optional[Image] = None
|
||||||
|
self._face_embedding_timestamp = 0.0
|
||||||
|
self._voice_embedding_timestamp = 0.0
|
||||||
|
self._image_timestamp = 0.0
|
||||||
|
|
||||||
|
# QoS profiles
|
||||||
|
best_effort_qos = QoSProfile(
|
||||||
|
depth=10,
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
durability=DurabilityPolicy.VOLATILE,
|
||||||
|
)
|
||||||
|
reliable_qos = QoSProfile(
|
||||||
|
depth=1,
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
durability=DurabilityPolicy.VOLATILE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subscriptions
|
||||||
|
self.create_subscription(
|
||||||
|
String, '/social/orchestrator/state',
|
||||||
|
self._on_orchestrator_state, reliable_qos
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
FaceEmbeddingArray, '/social/faces/embeddings',
|
||||||
|
self._on_face_embeddings, reliable_qos
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
Image, '/camera/color/image_raw',
|
||||||
|
self._on_camera_image, best_effort_qos
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
String, '/social/speech/speaker_embedding',
|
||||||
|
self._on_speaker_embedding, best_effort_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
# Service clients
|
||||||
|
self._enroll_face_client = self.create_client(
|
||||||
|
EnrollPerson, self.face_service_name
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self._pub_status = self.create_publisher(
|
||||||
|
String, '/social/enrollment/status', reliable_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
# Timer for enrollment timeout handling
|
||||||
|
self.create_timer(0.5, self._enrollment_timeout_check)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'Social enrollment node initialized. Queue: {self.queue_dir}'
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_orchestrator_state(self, msg: String) -> None:
|
||||||
|
"""Handle orchestrator state transitions."""
|
||||||
|
try:
|
||||||
|
state_data = json.loads(msg.data)
|
||||||
|
state = state_data.get('state')
|
||||||
|
|
||||||
|
if state == 'ENROLL':
|
||||||
|
person_id = state_data.get('person_id')
|
||||||
|
name = state_data.get('name')
|
||||||
|
context = state_data.get('context')
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._enrollment_request = EnrollmentRequest(
|
||||||
|
person_id=person_id,
|
||||||
|
name=name,
|
||||||
|
context=context,
|
||||||
|
timestamp=time.time()
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'Enrollment triggered: {name} (ID: {person_id})'
|
||||||
|
)
|
||||||
|
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
self.get_logger().error(f'Invalid orchestrator state JSON: {e}')
|
||||||
|
|
||||||
|
def _on_face_embeddings(self, msg: FaceEmbeddingArray) -> None:
|
||||||
|
"""Capture face embedding from social face recognizer."""
|
||||||
|
if not msg.embeddings:
|
||||||
|
return
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
if self._enrollment_request is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
face_emb = msg.embeddings[0]
|
||||||
|
emb_array = np.frombuffer(face_emb.embedding, dtype=np.float32)
|
||||||
|
|
||||||
|
if len(emb_array) == self.face_emb_dim:
|
||||||
|
self._latest_face_embedding = emb_array.copy()
|
||||||
|
self._face_embedding_timestamp = time.time()
|
||||||
|
self.get_logger().debug(f'Face embedding captured')
|
||||||
|
|
||||||
|
def _on_speaker_embedding(self, msg: String) -> None:
|
||||||
|
"""Capture voice speaker embedding from ECAPA-TDNN."""
|
||||||
|
try:
|
||||||
|
emb_data = json.loads(msg.data)
|
||||||
|
emb_values = emb_data.get('embedding')
|
||||||
|
|
||||||
|
if emb_values:
|
||||||
|
with self._lock:
|
||||||
|
if self._enrollment_request is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
emb_array = np.array(emb_values, dtype=np.float32)
|
||||||
|
if len(emb_array) == self.voice_emb_dim:
|
||||||
|
self._latest_voice_embedding = emb_array.copy()
|
||||||
|
self._voice_embedding_timestamp = time.time()
|
||||||
|
self.get_logger().debug(f'Voice embedding captured')
|
||||||
|
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
self.get_logger().error(f'Invalid speaker embedding JSON: {e}')
|
||||||
|
|
||||||
|
def _on_camera_image(self, msg: Image) -> None:
|
||||||
|
"""Capture RealSense RGB image for enrollment photo."""
|
||||||
|
try:
|
||||||
|
with self._lock:
|
||||||
|
if self._enrollment_request is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
self._latest_image = msg
|
||||||
|
self._image_timestamp = time.time()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Error capturing camera image: {e}')
|
||||||
|
|
||||||
|
def _enrollment_timeout_check(self) -> None:
|
||||||
|
"""Check if enrollment data is ready or timed out."""
|
||||||
|
with self._lock:
|
||||||
|
if self._enrollment_request is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
timeout = 10.0
|
||||||
|
|
||||||
|
# Check if all data collected
|
||||||
|
has_face = self._latest_face_embedding is not None and \
|
||||||
|
(now - self._face_embedding_timestamp < 5.0)
|
||||||
|
has_voice = self._latest_voice_embedding is not None and \
|
||||||
|
(now - self._voice_embedding_timestamp < 5.0)
|
||||||
|
|
||||||
|
# If we have face + voice, proceed with enrollment
|
||||||
|
if has_face and has_voice:
|
||||||
|
self._complete_enrollment()
|
||||||
|
# If timeout exceeded, save what we have
|
||||||
|
elif (now - self._enrollment_request.timestamp) > timeout:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f'Enrollment timeout for {self._enrollment_request.name}. '
|
||||||
|
f'Proceeding with available data.'
|
||||||
|
)
|
||||||
|
self._complete_enrollment()
|
||||||
|
|
||||||
|
def _complete_enrollment(self) -> None:
|
||||||
|
"""Complete enrollment process."""
|
||||||
|
request = self._enrollment_request
|
||||||
|
if request is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Save enrollment data to queue
|
||||||
|
enroll_data = {
|
||||||
|
'person_id': request.person_id,
|
||||||
|
'name': request.name,
|
||||||
|
'context': request.context,
|
||||||
|
'timestamp': request.timestamp,
|
||||||
|
'datetime': datetime.fromtimestamp(request.timestamp).isoformat(),
|
||||||
|
}
|
||||||
|
|
||||||
|
queue_file = self.queue_dir / f"enrollment_{request.person_id}_{int(request.timestamp)}.json"
|
||||||
|
with open(queue_file, 'w') as f:
|
||||||
|
json.dump(enroll_data, f, indent=2)
|
||||||
|
self.get_logger().info(f'Enrollment data queued: {queue_file}')
|
||||||
|
|
||||||
|
# Save photo if available
|
||||||
|
photo_id = None
|
||||||
|
if self._latest_image is not None:
|
||||||
|
photo_id = self._save_enrollment_photo(request)
|
||||||
|
|
||||||
|
# Add to PersonDB with embeddings
|
||||||
|
person_db_id = self._db.add_person(
|
||||||
|
name=request.name,
|
||||||
|
embedding=self._latest_face_embedding,
|
||||||
|
sample_count=1,
|
||||||
|
metadata={
|
||||||
|
'encounter_person_id': request.person_id,
|
||||||
|
'context': request.context,
|
||||||
|
'photo_id': photo_id,
|
||||||
|
'timestamp': request.timestamp,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
self.get_logger().info(f'Added to PersonDB: ID {person_db_id}')
|
||||||
|
|
||||||
|
# Update speaker embeddings JSON
|
||||||
|
self._update_speaker_embeddings(person_db_id, request)
|
||||||
|
|
||||||
|
# Enroll face via face_recognizer service
|
||||||
|
self._enroll_face(person_db_id, request)
|
||||||
|
|
||||||
|
# Publish success status
|
||||||
|
self._publish_enrollment_status('success', person_db_id)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Enrollment failed for {request.name}: {e}')
|
||||||
|
self._publish_enrollment_status('failed', None)
|
||||||
|
finally:
|
||||||
|
self._enrollment_request = None
|
||||||
|
self._latest_face_embedding = None
|
||||||
|
self._latest_voice_embedding = None
|
||||||
|
self._latest_image = None
|
||||||
|
|
||||||
|
def _save_enrollment_photo(self, request: EnrollmentRequest) -> str:
|
||||||
|
"""Save enrollment photo from RealSense."""
|
||||||
|
try:
|
||||||
|
if self._latest_image is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
cv_image = self._bridge.imgmsg_to_cv2(self._latest_image, 'bgr8')
|
||||||
|
photo_id = f"{request.person_id}_{int(request.timestamp)}"
|
||||||
|
photo_path = self.photos_dir / f"{photo_id}.jpg"
|
||||||
|
|
||||||
|
cv2.imwrite(str(photo_path), cv_image)
|
||||||
|
self.get_logger().info(f'Enrollment photo saved: {photo_path}')
|
||||||
|
return photo_id
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Failed to save enrollment photo: {e}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _update_speaker_embeddings(self, person_db_id: int, request: EnrollmentRequest) -> None:
|
||||||
|
"""Update speaker_embeddings.json with voice embedding."""
|
||||||
|
try:
|
||||||
|
if self._latest_voice_embedding is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Load existing embeddings
|
||||||
|
speaker_db = {}
|
||||||
|
if self.speaker_embeddings_path.exists():
|
||||||
|
with open(self.speaker_embeddings_path, 'r') as f:
|
||||||
|
speaker_db = json.load(f)
|
||||||
|
|
||||||
|
# Add new embedding
|
||||||
|
speaker_db[str(person_db_id)] = {
|
||||||
|
'name': request.name,
|
||||||
|
'person_id': request.person_id,
|
||||||
|
'embedding': self._latest_voice_embedding.tolist(),
|
||||||
|
'timestamp': request.timestamp,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Save updated embeddings
|
||||||
|
with open(self.speaker_embeddings_path, 'w') as f:
|
||||||
|
json.dump(speaker_db, f, indent=2)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'Speaker embedding saved for {request.name}'
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Failed to update speaker embeddings: {e}')
|
||||||
|
|
||||||
|
def _enroll_face(self, person_db_id: int, request: EnrollmentRequest) -> None:
|
||||||
|
"""Enroll face via face_recognizer service."""
|
||||||
|
try:
|
||||||
|
if self._latest_face_embedding is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._enroll_face_client.wait_for_service(timeout_sec=2.0):
|
||||||
|
self.get_logger().warn(
|
||||||
|
f'Face recognizer service not available. Skipping face enrollment.'
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
req = EnrollPerson.Request()
|
||||||
|
req.name = request.name
|
||||||
|
req.mode = 'face'
|
||||||
|
req.n_samples = 1
|
||||||
|
|
||||||
|
future = self._enroll_face_client.call_async(req)
|
||||||
|
self.get_logger().info(f'Face enrollment service called for {request.name}')
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Face enrollment service call failed: {e}')
|
||||||
|
|
||||||
|
def _publish_enrollment_status(self, status: str, person_db_id: Optional[int]) -> None:
|
||||||
|
"""Publish enrollment completion status."""
|
||||||
|
try:
|
||||||
|
status_msg = {
|
||||||
|
'status': status,
|
||||||
|
'person_id': self._enrollment_request.person_id if self._enrollment_request else None,
|
||||||
|
'name': self._enrollment_request.name if self._enrollment_request else None,
|
||||||
|
'person_db_id': person_db_id,
|
||||||
|
'timestamp': time.time(),
|
||||||
|
}
|
||||||
|
self._pub_status.publish(String(data=json.dumps(status_msg)))
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f'Failed to publish enrollment status: {e}')
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = SocialEnrollmentNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
Loading…
x
Reference in New Issue
Block a user