feat: first encounter orchestrator (Issue #400) #402

Merged
sl-jetson merged 2 commits from sl-jetson/issue-400-encounter-launch into main 2026-03-04 13:30:16 -05:00
2 changed files with 428 additions and 0 deletions
Showing only changes of commit 86c60f48e6 - Show all commits

View File

@ -0,0 +1,428 @@
#!/usr/bin/env python3
"""social_enrollment_node.py -- First Encounter enrollment with face + voice biometrics.
Triggered by FirstEncounterOrchestrator when state transitions to ENROLL.
Captures:
- Face embedding (via SCRFD + ArcFace from RealSense RGB)
- Voice speaker embedding (via ECAPA-TDNN)
- RealSense RGB photo snapshot
- Metadata (name, context, timestamp)
Stores to:
- /home/seb/encounter-queue/{person_id}_{timestamp}.json (for offline cloud sync)
- Local speaker_embeddings.json (for immediate voice recognition)
- Face gallery (via EnrollPerson service to face_recognizer)
Subscribes to:
/social/orchestrator/state (JSON: state, person_id, name, context)
/social/faces/embeddings (FaceEmbeddingArray with ArcFace embeddings)
/camera/color/image_raw (RealSense RGB for snapshots)
/social/speech/speaker_embedding (speaker embedding from ECAPA-TDNN)
Publishes:
/social/enrollment/status (JSON: person_id, status, person_db_id)
"""
import json
import time
import threading
import numpy as np
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Optional, Dict
from datetime import datetime
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
from std_msgs.msg import String
from sensor_msgs.msg import Image
import cv2
from cv_bridge import CvBridge
from saltybot_social_msgs.msg import FaceEmbeddingArray
from saltybot_social_msgs.srv import EnrollPerson
from saltybot_social_enrollment.person_db import PersonDB
@dataclass
class EnrollmentRequest:
"""Enrollment request from first encounter."""
person_id: str
name: str
context: Optional[str] = None
timestamp: float = 0.0
face_embedding: Optional[np.ndarray] = None
voice_embedding: Optional[np.ndarray] = None
photo_data: Optional[bytes] = None # JPEG encoded
class SocialEnrollmentNode(Node):
"""Face + voice enrollment during first encounter."""
def __init__(self):
super().__init__('social_enrollment')
# Parameters
self.declare_parameter('encounter_queue_dir', '/home/seb/encounter-queue')
self.declare_parameter('speaker_embeddings_path', '/home/seb/speaker_embeddings.json')
self.declare_parameter('photos_dir', '/home/seb/encounter-photos')
self.declare_parameter('face_recognizer_service', '/social/face_recognizer/enroll')
self.declare_parameter('embedding_dim_face', 512)
self.declare_parameter('embedding_dim_voice', 192)
self.queue_dir = Path(self.get_parameter('encounter_queue_dir').value)
self.speaker_embeddings_path = Path(self.get_parameter('speaker_embeddings_path').value)
self.photos_dir = Path(self.get_parameter('photos_dir').value)
self.face_service_name = self.get_parameter('face_recognizer_service').value
self.face_emb_dim = self.get_parameter('embedding_dim_face').value
self.voice_emb_dim = self.get_parameter('embedding_dim_voice').value
# Create directories
self.queue_dir.mkdir(parents=True, exist_ok=True)
self.photos_dir.mkdir(parents=True, exist_ok=True)
# Initialize PersonDB
self._db = PersonDB(str(self.queue_dir.parent / 'persons.db'))
self.get_logger().info(f'PersonDB initialized')
# CV bridge for image conversion
self._bridge = CvBridge()
# State
self._enrollment_request: Optional[EnrollmentRequest] = None
self._lock = threading.Lock()
self._latest_face_embedding: Optional[np.ndarray] = None
self._latest_voice_embedding: Optional[np.ndarray] = None
self._latest_image: Optional[Image] = None
self._face_embedding_timestamp = 0.0
self._voice_embedding_timestamp = 0.0
self._image_timestamp = 0.0
# QoS profiles
best_effort_qos = QoSProfile(
depth=10,
reliability=ReliabilityPolicy.BEST_EFFORT,
durability=DurabilityPolicy.VOLATILE,
)
reliable_qos = QoSProfile(
depth=1,
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.VOLATILE,
)
# Subscriptions
self.create_subscription(
String, '/social/orchestrator/state',
self._on_orchestrator_state, reliable_qos
)
self.create_subscription(
FaceEmbeddingArray, '/social/faces/embeddings',
self._on_face_embeddings, reliable_qos
)
self.create_subscription(
Image, '/camera/color/image_raw',
self._on_camera_image, best_effort_qos
)
self.create_subscription(
String, '/social/speech/speaker_embedding',
self._on_speaker_embedding, best_effort_qos
)
# Service clients
self._enroll_face_client = self.create_client(
EnrollPerson, self.face_service_name
)
# Publishers
self._pub_status = self.create_publisher(
String, '/social/enrollment/status', reliable_qos
)
# Timer for enrollment timeout handling
self.create_timer(0.5, self._enrollment_timeout_check)
self.get_logger().info(
f'Social enrollment node initialized. '
f'Queue: {self.queue_dir}, '
f'Speakers: {self.speaker_embeddings_path}'
)
def _on_orchestrator_state(self, msg: String) -> None:
"""Handle orchestrator state transitions."""
try:
state_data = json.loads(msg.data)
state = state_data.get('state')
if state == 'ENROLL':
person_id = state_data.get('person_id')
name = state_data.get('name')
context = state_data.get('context')
with self._lock:
self._enrollment_request = EnrollmentRequest(
person_id=person_id,
name=name,
context=context,
timestamp=time.time()
)
self._face_embedding_timestamp = 0.0
self._voice_embedding_timestamp = 0.0
self._image_timestamp = 0.0
self.get_logger().info(
f'Enrollment triggered: {name} (ID: {person_id})'
)
except json.JSONDecodeError as e:
self.get_logger().error(f'Invalid orchestrator state JSON: {e}')
def _on_face_embeddings(self, msg: FaceEmbeddingArray) -> None:
"""Capture face embedding from social face recognizer."""
if not msg.embeddings:
return
with self._lock:
if self._enrollment_request is None:
return
# Take first detected face embedding
face_emb = msg.embeddings[0]
emb_array = np.frombuffer(face_emb.embedding, dtype=np.float32)
if len(emb_array) == self.face_emb_dim:
self._latest_face_embedding = emb_array.copy()
self._face_embedding_timestamp = time.time()
self.get_logger().debug(
f'Face embedding captured: {face_emb.track_id}'
)
def _on_speaker_embedding(self, msg: String) -> None:
"""Capture voice speaker embedding from ECAPA-TDNN."""
try:
emb_data = json.loads(msg.data)
emb_values = emb_data.get('embedding')
if emb_values:
with self._lock:
if self._enrollment_request is None:
return
emb_array = np.array(emb_values, dtype=np.float32)
if len(emb_array) == self.voice_emb_dim:
self._latest_voice_embedding = emb_array.copy()
self._voice_embedding_timestamp = time.time()
self.get_logger().debug(
f'Voice embedding captured: {len(emb_array)} dims'
)
except json.JSONDecodeError as e:
self.get_logger().error(f'Invalid speaker embedding JSON: {e}')
def _on_camera_image(self, msg: Image) -> None:
"""Capture RealSense RGB image for enrollment photo."""
try:
with self._lock:
if self._enrollment_request is None:
return
# Store latest image
self._latest_image = msg
self._image_timestamp = time.time()
except Exception as e:
self.get_logger().error(f'Error capturing camera image: {e}')
def _enrollment_timeout_check(self) -> None:
"""Check if enrollment data is ready or timed out."""
with self._lock:
if self._enrollment_request is None:
return
now = time.time()
timeout = 10.0 # 10 seconds to collect embeddings
# Check if all data collected
has_face = self._latest_face_embedding is not None and \
(now - self._face_embedding_timestamp < 5.0)
has_voice = self._latest_voice_embedding is not None and \
(now - self._voice_embedding_timestamp < 5.0)
has_image = self._latest_image is not None and \
(now - self._image_timestamp < 5.0)
# If we have face + voice, proceed with enrollment
if has_face and has_voice:
self._complete_enrollment()
# If timeout exceeded, save what we have
elif (now - self._enrollment_request.timestamp) > timeout:
self.get_logger().warn(
f'Enrollment timeout for {self._enrollment_request.name}. '
f'Proceeding with available data.'
)
self._complete_enrollment()
def _complete_enrollment(self) -> None:
"""Complete enrollment process."""
request = self._enrollment_request
if request is None:
return
try:
# Save enrollment data to queue
enroll_data = {
'person_id': request.person_id,
'name': request.name,
'context': request.context,
'timestamp': request.timestamp,
'datetime': datetime.fromtimestamp(request.timestamp).isoformat(),
'face_embedding_shape': list(self._latest_face_embedding.shape)
if self._latest_face_embedding is not None else None,
'voice_embedding_shape': list(self._latest_voice_embedding.shape)
if self._latest_voice_embedding is not None else None,
}
# Save queue JSON
queue_file = self.queue_dir / f"enrollment_{request.person_id}_{int(request.timestamp)}.json"
with open(queue_file, 'w') as f:
json.dump(enroll_data, f, indent=2)
self.get_logger().info(f'Enrollment data queued: {queue_file}')
# Save photo if available
photo_id = None
if self._latest_image is not None:
photo_id = self._save_enrollment_photo(request)
# Add to PersonDB with embeddings
person_db_id = self._db.add_person(
name=request.name,
embedding=self._latest_face_embedding,
sample_count=1,
metadata={
'encounter_person_id': request.person_id,
'context': request.context,
'photo_id': photo_id,
'timestamp': request.timestamp,
}
)
self.get_logger().info(f'Added to PersonDB: ID {person_db_id}')
# Update speaker embeddings JSON
self._update_speaker_embeddings(person_db_id, request)
# Enroll face via face_recognizer service
self._enroll_face(person_db_id, request)
# Publish success status
self._publish_enrollment_status('success', person_db_id)
except Exception as e:
self.get_logger().error(f'Enrollment failed for {request.name}: {e}')
self._publish_enrollment_status('failed', None)
finally:
self._enrollment_request = None
self._latest_face_embedding = None
self._latest_voice_embedding = None
self._latest_image = None
def _save_enrollment_photo(self, request: EnrollmentRequest) -> str:
"""Save enrollment photo from RealSense."""
try:
if self._latest_image is None:
return None
cv_image = self._bridge.imgmsg_to_cv2(self._latest_image, 'bgr8')
photo_id = f"{request.person_id}_{int(request.timestamp)}"
photo_path = self.photos_dir / f"{photo_id}.jpg"
cv2.imwrite(str(photo_path), cv_image)
self.get_logger().info(f'Enrollment photo saved: {photo_path}')
return photo_id
except Exception as e:
self.get_logger().error(f'Failed to save enrollment photo: {e}')
return None
def _update_speaker_embeddings(self, person_db_id: int, request: EnrollmentRequest) -> None:
"""Update speaker_embeddings.json with voice embedding."""
try:
if self._latest_voice_embedding is None:
return
# Load existing embeddings
speaker_db = {}
if self.speaker_embeddings_path.exists():
with open(self.speaker_embeddings_path, 'r') as f:
speaker_db = json.load(f)
# Add new embedding
speaker_db[str(person_db_id)] = {
'name': request.name,
'person_id': request.person_id,
'embedding': self._latest_voice_embedding.tolist(),
'timestamp': request.timestamp,
}
# Save updated embeddings
with open(self.speaker_embeddings_path, 'w') as f:
json.dump(speaker_db, f, indent=2)
self.get_logger().info(
f'Speaker embedding saved for {request.name}'
)
except Exception as e:
self.get_logger().error(f'Failed to update speaker embeddings: {e}')
def _enroll_face(self, person_db_id: int, request: EnrollmentRequest) -> None:
"""Enroll face via face_recognizer service."""
try:
if self._latest_face_embedding is None:
return
if not self._enroll_face_client.wait_for_service(timeout_sec=2.0):
self.get_logger().warn(
f'Face recognizer service not available. Skipping face enrollment.'
)
return
# Call EnrollPerson service
req = EnrollPerson.Request()
req.name = request.name
req.mode = 'face'
req.n_samples = 1
future = self._enroll_face_client.call_async(req)
self.get_logger().info(f'Face enrollment service called for {request.name}')
except Exception as e:
self.get_logger().error(f'Face enrollment service call failed: {e}')
def _publish_enrollment_status(self, status: str, person_db_id: Optional[int]) -> None:
"""Publish enrollment completion status."""
try:
status_msg = {
'status': status,
'person_id': self._enrollment_request.person_id if self._enrollment_request else None,
'name': self._enrollment_request.name if self._enrollment_request else None,
'person_db_id': person_db_id,
'timestamp': time.time(),
}
self._pub_status.publish(String(data=json.dumps(status_msg)))
except Exception as e:
self.get_logger().error(f'Failed to publish enrollment status: {e}')
def main(args=None):
rclpy.init(args=args)
node = SocialEnrollmentNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()