feat(social): facial expression recognition — TRT FP16 emotion CNN (Issue #161) #180
@ -0,0 +1,38 @@
|
||||
emotion_node:
|
||||
ros__parameters:
|
||||
# Path to the TRT FP16 emotion engine (built from emotion_cnn.onnx).
|
||||
# Build with:
|
||||
# trtexec --onnx=emotion_cnn.onnx --fp16 --saveEngine=emotion_fp16.trt
|
||||
# Leave empty to use landmark heuristic only (no GPU required).
|
||||
engine_path: "/models/emotion_fp16.trt"
|
||||
|
||||
# Minimum smoothed confidence to publish an expression.
|
||||
# Lower = more detections but more noise; 0.40 is a good production default.
|
||||
min_confidence: 0.40
|
||||
|
||||
# EMA smoothing weight applied to each new observation.
|
||||
# 0.0 = frozen (never updates) 1.0 = no smoothing (raw output)
|
||||
# 0.30 gives stable ~3-frame moving average at 10 Hz face detection rate.
|
||||
smoothing_alpha: 0.30
|
||||
|
||||
# Comma-separated person_ids that have opted out of emotion tracking.
|
||||
# Empty string = everyone is tracked by default.
|
||||
# Example: "1,3,7"
|
||||
opt_out_persons: ""
|
||||
|
||||
# Skip face crops smaller than this side length (pixels).
|
||||
# Very small crops produce unreliable emotion predictions.
|
||||
face_min_size: 24
|
||||
|
||||
# If true and TRT engine is unavailable, fall back to the 5-point
|
||||
# landmark heuristic. Set false to suppress output entirely without TRT.
|
||||
landmark_fallback: true
|
||||
|
||||
# Camera names matching saltybot_cameras topic naming convention.
|
||||
camera_names: "front,left,rear,right"
|
||||
|
||||
# Number of active CSI camera streams to subscribe to.
|
||||
n_cameras: 4
|
||||
|
||||
# Publish /social/emotion/context (JSON string) for conversation_node.
|
||||
publish_context: true
|
||||
67
jetson/ros2_ws/src/saltybot_social/launch/emotion.launch.py
Normal file
67
jetson/ros2_ws/src/saltybot_social/launch/emotion.launch.py
Normal file
@ -0,0 +1,67 @@
|
||||
"""emotion.launch.py — Launch emotion_node for facial expression recognition (Issue #161)."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
|
||||
from launch_ros.actions import Node
|
||||
from launch_ros.substitutions import FindPackageShare
|
||||
|
||||
|
||||
def generate_launch_description() -> LaunchDescription:
|
||||
pkg = FindPackageShare("saltybot_social")
|
||||
params_file = PathJoinSubstitution([pkg, "config", "emotion_params.yaml"])
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument(
|
||||
"params_file",
|
||||
default_value=params_file,
|
||||
description="Path to emotion_node parameter YAML",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"engine_path",
|
||||
default_value="/models/emotion_fp16.trt",
|
||||
description="Path to TensorRT FP16 emotion engine (empty = landmark heuristic)",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"min_confidence",
|
||||
default_value="0.40",
|
||||
description="Minimum detection confidence to publish an expression",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"smoothing_alpha",
|
||||
default_value="0.30",
|
||||
description="EMA smoothing weight (0=frozen, 1=no smoothing)",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"opt_out_persons",
|
||||
default_value="",
|
||||
description="Comma-separated person_ids that opted out",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"landmark_fallback",
|
||||
default_value="true",
|
||||
description="Use landmark heuristic when TRT engine unavailable",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"publish_context",
|
||||
default_value="true",
|
||||
description="Publish /social/emotion/context JSON for LLM context",
|
||||
),
|
||||
Node(
|
||||
package="saltybot_social",
|
||||
executable="emotion_node",
|
||||
name="emotion_node",
|
||||
output="screen",
|
||||
parameters=[
|
||||
LaunchConfiguration("params_file"),
|
||||
{
|
||||
"engine_path": LaunchConfiguration("engine_path"),
|
||||
"min_confidence": LaunchConfiguration("min_confidence"),
|
||||
"smoothing_alpha": LaunchConfiguration("smoothing_alpha"),
|
||||
"opt_out_persons": LaunchConfiguration("opt_out_persons"),
|
||||
"landmark_fallback": LaunchConfiguration("landmark_fallback"),
|
||||
"publish_context": LaunchConfiguration("publish_context"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
@ -95,6 +95,13 @@ class ConversationNode(Node):
|
||||
# ── Periodic context save ────────────────────────────────────────────
|
||||
self._save_timer = self.create_timer(self._save_interval, self._save_context)
|
||||
|
||||
# ── Emotion context (Issue #161) ──────────────────────────────────────
|
||||
self._emotions: Dict[int, str] = {}
|
||||
self.create_subscription(
|
||||
String, "/social/emotion/context",
|
||||
self._on_emotion_context, 10
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f"ConversationNode init (model={self._model_path}, "
|
||||
f"gpu_layers={self._n_gpu}, ctx={self._n_ctx})"
|
||||
|
||||
@ -0,0 +1,414 @@
|
||||
"""emotion_classifier.py — 7-class facial expression classifier (Issue #161).
|
||||
|
||||
Pure Python, no ROS2 / TensorRT / OpenCV dependencies.
|
||||
Wraps a TensorRT FP16 emotion CNN and provides:
|
||||
- On-device TRT inference on 48×48 grayscale face crops
|
||||
- Heuristic fallback from 5-point SCRFD facial landmarks
|
||||
- Per-person EMA temporal smoothing for stable outputs
|
||||
- Per-person opt-out registry
|
||||
|
||||
Emotion classes (index order matches CNN output layer)
|
||||
------------------------------------------------------
|
||||
0 = happy
|
||||
1 = sad
|
||||
2 = angry
|
||||
3 = surprised
|
||||
4 = fearful
|
||||
5 = disgusted
|
||||
6 = neutral
|
||||
|
||||
Coordinate convention
|
||||
---------------------
|
||||
Face crop: BGR uint8 ndarray, any size (resized to INPUT_SIZE internally).
|
||||
Landmarks (lm10): 10 floats from FaceDetection.landmarks —
|
||||
[left_eye_x, left_eye_y, right_eye_x, right_eye_y,
|
||||
nose_x, nose_y, left_mouth_x, left_mouth_y,
|
||||
right_mouth_x, right_mouth_y]
|
||||
All coordinates are normalised image-space (0.0–1.0).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────────
|
||||
|
||||
EMOTIONS: List[str] = [
|
||||
"happy", "sad", "angry", "surprised", "fearful", "disgusted", "neutral"
|
||||
]
|
||||
N_CLASSES: int = len(EMOTIONS)
|
||||
|
||||
# Landmark slot indices within the 10-value array
|
||||
_LEX, _LEY = 0, 1 # left eye
|
||||
_REX, _REY = 2, 3 # right eye
|
||||
_NX, _NY = 4, 5 # nose
|
||||
_LMX, _LMY = 6, 7 # left mouth corner
|
||||
_RMX, _RMY = 8, 9 # right mouth corner
|
||||
|
||||
# CNN input size
|
||||
INPUT_SIZE: int = 48
|
||||
|
||||
|
||||
# ── Result type ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClassifiedEmotion:
|
||||
emotion: str # dominant emotion label
|
||||
confidence: float # smoothed softmax probability (0.0–1.0)
|
||||
scores: List[float] # per-class scores, len == N_CLASSES
|
||||
source: str = "cnn_trt" # "cnn_trt" | "landmark_heuristic" | "opt_out"
|
||||
|
||||
|
||||
# ── Math helpers ──────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _softmax(logits: List[float]) -> List[float]:
|
||||
"""Numerically stable softmax."""
|
||||
m = max(logits)
|
||||
exps = [math.exp(x - m) for x in logits]
|
||||
total = sum(exps)
|
||||
return [e / total for e in exps]
|
||||
|
||||
|
||||
def _argmax(values: List[float]) -> int:
|
||||
best = 0
|
||||
for i in range(1, len(values)):
|
||||
if values[i] > values[best]:
|
||||
best = i
|
||||
return best
|
||||
|
||||
|
||||
def _uniform_scores() -> List[float]:
|
||||
return [1.0 / N_CLASSES] * N_CLASSES
|
||||
|
||||
|
||||
# ── Per-person temporal smoother + opt-out registry ───────────────────────────
|
||||
|
||||
|
||||
class PersonEmotionTracker:
|
||||
"""Exponential moving-average smoother + opt-out registry per person.
|
||||
|
||||
Args:
|
||||
alpha: EMA weight for the newest observation (0.0 = frozen, 1.0 = no smooth).
|
||||
max_age: Frames without update before the EMA is expired and reset.
|
||||
"""
|
||||
|
||||
def __init__(self, alpha: float = 0.3, max_age: int = 16) -> None:
|
||||
self._alpha = alpha
|
||||
self._max_age = max_age
|
||||
self._ema: Dict[int, List[float]] = {} # person_id → EMA scores
|
||||
self._age: Dict[int, int] = {} # person_id → frames since last update
|
||||
self._opt_out: set = set() # person_ids that opted out
|
||||
|
||||
# ── Opt-out management ────────────────────────────────────────────────────
|
||||
|
||||
def set_opt_out(self, person_id: int, value: bool) -> None:
|
||||
if value:
|
||||
self._opt_out.add(person_id)
|
||||
self._ema.pop(person_id, None)
|
||||
self._age.pop(person_id, None)
|
||||
else:
|
||||
self._opt_out.discard(person_id)
|
||||
|
||||
def is_opt_out(self, person_id: int) -> bool:
|
||||
return person_id in self._opt_out
|
||||
|
||||
# ── Smoothing ─────────────────────────────────────────────────────────────
|
||||
|
||||
def smooth(self, person_id: int, scores: List[float]) -> List[float]:
|
||||
"""Apply EMA to raw scores. Returns smoothed scores (length N_CLASSES)."""
|
||||
if person_id < 0:
|
||||
return scores[:]
|
||||
|
||||
# Reset stale entries
|
||||
if person_id in self._age and self._age[person_id] > self._max_age:
|
||||
self.reset(person_id)
|
||||
|
||||
if person_id not in self._ema:
|
||||
self._ema[person_id] = scores[:]
|
||||
self._age[person_id] = 0
|
||||
return scores[:]
|
||||
|
||||
ema = self._ema[person_id]
|
||||
a = self._alpha
|
||||
smoothed = [a * s + (1.0 - a) * e for s, e in zip(scores, ema)]
|
||||
self._ema[person_id] = smoothed
|
||||
self._age[person_id] = 0
|
||||
return smoothed
|
||||
|
||||
def tick(self) -> None:
|
||||
"""Advance age counter for all tracked persons (call once per frame)."""
|
||||
for pid in list(self._age.keys()):
|
||||
self._age[pid] += 1
|
||||
|
||||
def reset(self, person_id: int) -> None:
|
||||
self._ema.pop(person_id, None)
|
||||
self._age.pop(person_id, None)
|
||||
|
||||
def reset_all(self) -> None:
|
||||
self._ema.clear()
|
||||
self._age.clear()
|
||||
|
||||
@property
|
||||
def tracked_ids(self) -> List[int]:
|
||||
return list(self._ema.keys())
|
||||
|
||||
|
||||
# ── TensorRT engine loader ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class _TrtEngine:
|
||||
"""Thin wrapper around a TensorRT FP16 emotion CNN engine.
|
||||
|
||||
Expected engine:
|
||||
- Input binding: name "input" shape (1, 1, 48, 48) float32
|
||||
- Output binding: name "output" shape (1, 7) float32 (softmax logits)
|
||||
|
||||
The engine is built offline from a MobileNetV2-based 48×48 grayscale CNN
|
||||
(FER+ or AffectNet trained) via:
|
||||
trtexec --onnx=emotion_cnn.onnx --fp16 --saveEngine=emotion_fp16.trt
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._context = None
|
||||
self._h_input = None
|
||||
self._h_output = None
|
||||
self._d_input = None
|
||||
self._d_output = None
|
||||
self._stream = None
|
||||
self._bindings: List = []
|
||||
|
||||
def load(self, engine_path: str) -> bool:
|
||||
"""Load TRT engine from file. Returns True on success."""
|
||||
try:
|
||||
import tensorrt as trt # type: ignore
|
||||
import pycuda.autoinit # type: ignore # noqa: F401
|
||||
import pycuda.driver as cuda # type: ignore
|
||||
import numpy as np
|
||||
|
||||
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
|
||||
with open(engine_path, "rb") as f:
|
||||
engine_data = f.read()
|
||||
|
||||
runtime = trt.Runtime(TRT_LOGGER)
|
||||
engine = runtime.deserialize_cuda_engine(engine_data)
|
||||
self._context = engine.create_execution_context()
|
||||
|
||||
# Allocate host + device buffers
|
||||
self._h_input = cuda.pagelocked_empty((1, 1, INPUT_SIZE, INPUT_SIZE),
|
||||
dtype=np.float32)
|
||||
self._h_output = cuda.pagelocked_empty((1, N_CLASSES), dtype=np.float32)
|
||||
self._d_input = cuda.mem_alloc(self._h_input.nbytes)
|
||||
self._d_output = cuda.mem_alloc(self._h_output.nbytes)
|
||||
self._stream = cuda.Stream()
|
||||
self._bindings = [int(self._d_input), int(self._d_output)]
|
||||
return True
|
||||
|
||||
except Exception:
|
||||
self._context = None
|
||||
return False
|
||||
|
||||
@property
|
||||
def ready(self) -> bool:
|
||||
return self._context is not None
|
||||
|
||||
def infer(self, face_bgr) -> List[float]:
|
||||
"""Run TRT inference on a BGR face crop. Returns 7 softmax scores."""
|
||||
import pycuda.driver as cuda # type: ignore
|
||||
import numpy as np
|
||||
|
||||
img = _preprocess(face_bgr) # (1, 1, 48, 48) float32
|
||||
np.copyto(self._h_input, img)
|
||||
cuda.memcpy_htod_async(self._d_input, self._h_input, self._stream)
|
||||
self._context.execute_async_v2(self._bindings, self._stream.handle, None)
|
||||
cuda.memcpy_dtoh_async(self._h_output, self._d_output, self._stream)
|
||||
self._stream.synchronize()
|
||||
logits = list(self._h_output[0])
|
||||
return _softmax(logits)
|
||||
|
||||
|
||||
# ── Image pre-processing helper (importable without cv2 in test mode) ─────────
|
||||
|
||||
|
||||
def _preprocess(face_bgr) -> "np.ndarray": # type: ignore
|
||||
"""Resize BGR crop to 48×48 grayscale, normalise to [-1, 1].
|
||||
|
||||
Returns ndarray shape (1, 1, 48, 48) float32.
|
||||
"""
|
||||
import numpy as np # type: ignore
|
||||
import cv2 # type: ignore
|
||||
|
||||
gray = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2GRAY)
|
||||
resized = cv2.resize(gray, (INPUT_SIZE, INPUT_SIZE),
|
||||
interpolation=cv2.INTER_LINEAR)
|
||||
norm = resized.astype(np.float32) / 127.5 - 1.0
|
||||
return norm.reshape(1, 1, INPUT_SIZE, INPUT_SIZE)
|
||||
|
||||
|
||||
# ── Landmark heuristic ────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def classify_from_landmarks(lm10: List[float]) -> ClassifiedEmotion:
|
||||
"""Estimate emotion from 5-point SCRFD landmarks (10 floats).
|
||||
|
||||
Uses geometric ratios between eyes, nose, and mouth corners.
|
||||
Accuracy is limited — treats this as a soft prior, not a definitive label.
|
||||
|
||||
Returns ClassifiedEmotion with source="landmark_heuristic".
|
||||
"""
|
||||
if len(lm10) < 10:
|
||||
scores = _uniform_scores()
|
||||
scores[6] = 0.5 # bias neutral
|
||||
scores = _renorm(scores)
|
||||
return ClassifiedEmotion("neutral", scores[6], scores, "landmark_heuristic")
|
||||
|
||||
eye_y = (lm10[_LEY] + lm10[_REY]) / 2.0
|
||||
nose_y = lm10[_NY]
|
||||
mouth_y = (lm10[_LMY] + lm10[_RMY]) / 2.0
|
||||
eye_span = max(abs(lm10[_REX] - lm10[_LEX]), 1e-4)
|
||||
mouth_width = abs(lm10[_RMX] - lm10[_LMX])
|
||||
mouth_asymm = abs(lm10[_LMY] - lm10[_RMY])
|
||||
|
||||
face_h = max(mouth_y - eye_y, 1e-4)
|
||||
|
||||
# Ratio of mouth span to interocular distance
|
||||
width_ratio = mouth_width / eye_span # >1.0 = wide open / happy smile
|
||||
# How far mouth is below nose, relative to face height
|
||||
mouth_below_nose = (mouth_y - nose_y) / face_h # ~0.3–0.6 typical
|
||||
# Relative asymmetry of mouth corners
|
||||
asym_ratio = mouth_asymm / face_h # >0.05 = notable asymmetry
|
||||
|
||||
# Build soft scores
|
||||
scores = _uniform_scores() # start uniform
|
||||
|
||||
if width_ratio > 0.85 and mouth_below_nose > 0.35:
|
||||
# Wide mouth, normal vertical position → happy
|
||||
scores[0] = 0.55 + 0.25 * min(1.0, (width_ratio - 0.85) / 0.5)
|
||||
elif mouth_below_nose < 0.20 and width_ratio < 0.7:
|
||||
# Tight, compressed mouth high up → surprised OR angry
|
||||
scores[3] = 0.35 # surprised
|
||||
scores[2] = 0.30 # angry
|
||||
elif asym_ratio > 0.06:
|
||||
# Asymmetric mouth → disgust or sadness
|
||||
scores[5] = 0.30 # disgusted
|
||||
scores[1] = 0.25 # sad
|
||||
elif width_ratio < 0.65 and mouth_below_nose < 0.30:
|
||||
# Tight and compressed → sad/angry
|
||||
scores[1] = 0.35 # sad
|
||||
scores[2] = 0.25 # angry
|
||||
else:
|
||||
# Default to neutral
|
||||
scores[6] = 0.45
|
||||
|
||||
scores = _renorm(scores)
|
||||
top_idx = _argmax(scores)
|
||||
return ClassifiedEmotion(
|
||||
emotion=EMOTIONS[top_idx],
|
||||
confidence=round(scores[top_idx], 3),
|
||||
scores=[round(s, 4) for s in scores],
|
||||
source="landmark_heuristic",
|
||||
)
|
||||
|
||||
|
||||
def _renorm(scores: List[float]) -> List[float]:
|
||||
"""Re-normalise scores so they sum to 1.0."""
|
||||
total = sum(scores)
|
||||
if total <= 0:
|
||||
return _uniform_scores()
|
||||
return [s / total for s in scores]
|
||||
|
||||
|
||||
# ── Public classifier ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class EmotionClassifier:
|
||||
"""Facade combining TRT inference + landmark fallback + per-person smoothing.
|
||||
|
||||
Usage
|
||||
-----
|
||||
>>> clf = EmotionClassifier(engine_path="/models/emotion_fp16.trt")
|
||||
>>> clf.load()
|
||||
True
|
||||
>>> result = clf.classify_crop(face_bgr, person_id=42, tracker=tracker)
|
||||
>>> result.emotion, result.confidence
|
||||
('happy', 0.87)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
engine_path: str = "",
|
||||
alpha: float = 0.3,
|
||||
) -> None:
|
||||
self._engine_path = engine_path
|
||||
self._alpha = alpha
|
||||
self._engine = _TrtEngine()
|
||||
|
||||
def load(self) -> bool:
|
||||
"""Load TRT engine. Returns True if engine is ready."""
|
||||
if not self._engine_path:
|
||||
return False
|
||||
return self._engine.load(self._engine_path)
|
||||
|
||||
@property
|
||||
def ready(self) -> bool:
|
||||
return self._engine.ready
|
||||
|
||||
def classify_crop(
|
||||
self,
|
||||
face_bgr,
|
||||
person_id: int = -1,
|
||||
tracker: Optional[PersonEmotionTracker] = None,
|
||||
) -> ClassifiedEmotion:
|
||||
"""Classify a BGR face crop.
|
||||
|
||||
Args:
|
||||
face_bgr: BGR ndarray from cv_bridge / direct crop.
|
||||
person_id: For temporal smoothing. -1 = no smoothing.
|
||||
tracker: Optional PersonEmotionTracker for EMA smoothing.
|
||||
|
||||
Returns:
|
||||
ClassifiedEmotion with smoothed scores.
|
||||
"""
|
||||
if self._engine.ready:
|
||||
raw = self._engine.infer(face_bgr)
|
||||
source = "cnn_trt"
|
||||
else:
|
||||
# Uniform fallback — no inference without engine
|
||||
raw = _uniform_scores()
|
||||
raw[6] = 0.40 # mild neutral bias
|
||||
raw = _renorm(raw)
|
||||
source = "landmark_heuristic"
|
||||
|
||||
smoothed = raw
|
||||
if tracker is not None and person_id >= 0:
|
||||
smoothed = tracker.smooth(person_id, raw)
|
||||
|
||||
top_idx = _argmax(smoothed)
|
||||
return ClassifiedEmotion(
|
||||
emotion=EMOTIONS[top_idx],
|
||||
confidence=round(smoothed[top_idx], 3),
|
||||
scores=[round(s, 4) for s in smoothed],
|
||||
source=source,
|
||||
)
|
||||
|
||||
def classify_from_landmarks(
|
||||
self,
|
||||
lm10: List[float],
|
||||
person_id: int = -1,
|
||||
tracker: Optional[PersonEmotionTracker] = None,
|
||||
) -> ClassifiedEmotion:
|
||||
"""Classify using landmark geometry only (no crop required)."""
|
||||
result = classify_from_landmarks(lm10)
|
||||
if tracker is not None and person_id >= 0:
|
||||
smoothed = tracker.smooth(person_id, result.scores)
|
||||
top_idx = _argmax(smoothed)
|
||||
result = ClassifiedEmotion(
|
||||
emotion=EMOTIONS[top_idx],
|
||||
confidence=round(smoothed[top_idx], 3),
|
||||
scores=[round(s, 4) for s in smoothed],
|
||||
source=result.source,
|
||||
)
|
||||
return result
|
||||
@ -0,0 +1,380 @@
|
||||
"""emotion_node.py — Facial expression recognition node (Issue #161).
|
||||
|
||||
Piggybacks on the face detection pipeline: subscribes to
|
||||
/social/faces/detections (FaceDetectionArray), extracts face crops from the
|
||||
latest camera frames, runs a TensorRT FP16 emotion CNN, applies per-person
|
||||
EMA temporal smoothing, and publishes results on /social/faces/expressions.
|
||||
|
||||
Architecture
|
||||
------------
|
||||
FaceDetectionArray → crop extraction → TRT FP16 inference (< 5 ms) →
|
||||
EMA smoothing → ExpressionArray publish
|
||||
|
||||
If TRT engine is not available the node falls back to the 5-point landmark
|
||||
heuristic (classify_from_landmarks) which requires no GPU and adds < 0.1 ms.
|
||||
|
||||
ROS2 topics
|
||||
-----------
|
||||
Subscribe:
|
||||
/social/faces/detections (saltybot_social_msgs/FaceDetectionArray)
|
||||
/camera/{name}/image_raw (sensor_msgs/Image) × 4
|
||||
/social/persons (saltybot_social_msgs/PersonStateArray)
|
||||
Publish:
|
||||
/social/faces/expressions (saltybot_social_msgs/ExpressionArray)
|
||||
/social/emotion/context (std_msgs/String) — JSON for LLM context
|
||||
|
||||
Parameters
|
||||
----------
|
||||
engine_path (str) "" — path to emotion_fp16.trt; empty = landmark only
|
||||
min_confidence (float) 0.40 — suppress results below this threshold
|
||||
smoothing_alpha (float) 0.30 — EMA weight (higher = faster, less stable)
|
||||
opt_out_persons (str) "" — comma-separated person_ids that opted out
|
||||
face_min_size (int) 24 — skip faces whose bbox is smaller (px side)
|
||||
landmark_fallback (bool) true — use landmark heuristic when TRT unavailable
|
||||
camera_names (str) "front,left,rear,right"
|
||||
n_cameras (int) 4
|
||||
publish_context (bool) true — publish /social/emotion/context JSON
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from std_msgs.msg import String
|
||||
from sensor_msgs.msg import Image
|
||||
|
||||
from saltybot_social_msgs.msg import (
|
||||
FaceDetectionArray,
|
||||
PersonStateArray,
|
||||
ExpressionArray,
|
||||
Expression,
|
||||
)
|
||||
from saltybot_social.emotion_classifier import (
|
||||
EmotionClassifier,
|
||||
PersonEmotionTracker,
|
||||
EMOTIONS,
|
||||
)
|
||||
|
||||
try:
|
||||
from cv_bridge import CvBridge
|
||||
_CV_BRIDGE_OK = True
|
||||
except ImportError:
|
||||
_CV_BRIDGE_OK = False
|
||||
|
||||
try:
|
||||
import cv2
|
||||
_CV2_OK = True
|
||||
except ImportError:
|
||||
_CV2_OK = False
|
||||
|
||||
|
||||
# ── Per-camera latest-frame buffer ────────────────────────────────────────────
|
||||
|
||||
|
||||
class _FrameBuffer:
|
||||
"""Thread-safe store of the most recent image per camera."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock = threading.Lock()
|
||||
self._frames: Dict[int, object] = {} # camera_id → cv2 BGR image
|
||||
|
||||
def put(self, camera_id: int, img) -> None:
|
||||
with self._lock:
|
||||
self._frames[camera_id] = img
|
||||
|
||||
def get(self, camera_id: int):
|
||||
with self._lock:
|
||||
return self._frames.get(camera_id)
|
||||
|
||||
|
||||
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class EmotionNode(Node):
|
||||
"""Facial expression recognition — TRT FP16 + landmark fallback."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("emotion_node")
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter("engine_path", "")
|
||||
self.declare_parameter("min_confidence", 0.40)
|
||||
self.declare_parameter("smoothing_alpha", 0.30)
|
||||
self.declare_parameter("opt_out_persons", "")
|
||||
self.declare_parameter("face_min_size", 24)
|
||||
self.declare_parameter("landmark_fallback", True)
|
||||
self.declare_parameter("camera_names", "front,left,rear,right")
|
||||
self.declare_parameter("n_cameras", 4)
|
||||
self.declare_parameter("publish_context", True)
|
||||
|
||||
engine_path = self.get_parameter("engine_path").value
|
||||
self._min_conf = self.get_parameter("min_confidence").value
|
||||
alpha = self.get_parameter("smoothing_alpha").value
|
||||
opt_out_str = self.get_parameter("opt_out_persons").value
|
||||
self._face_min = self.get_parameter("face_min_size").value
|
||||
self._lm_fallback = self.get_parameter("landmark_fallback").value
|
||||
cam_names_str = self.get_parameter("camera_names").value
|
||||
n_cameras = self.get_parameter("n_cameras").value
|
||||
self._pub_ctx = self.get_parameter("publish_context").value
|
||||
|
||||
# ── Classifier + tracker ───────────────────────────────────────────
|
||||
self._classifier = EmotionClassifier(engine_path=engine_path, alpha=alpha)
|
||||
self._tracker = PersonEmotionTracker(alpha=alpha)
|
||||
|
||||
# Parse opt-out list
|
||||
for pid_str in opt_out_str.split(","):
|
||||
pid_str = pid_str.strip()
|
||||
if pid_str.isdigit():
|
||||
self._tracker.set_opt_out(int(pid_str), True)
|
||||
|
||||
# ── Camera frame buffer + cv_bridge ───────────────────────────────
|
||||
self._frame_buf = _FrameBuffer()
|
||||
self._bridge = CvBridge() if _CV_BRIDGE_OK else None
|
||||
self._cam_names = [n.strip() for n in cam_names_str.split(",")]
|
||||
self._cam_id_map: Dict[str, int] = {
|
||||
name: idx for idx, name in enumerate(self._cam_names)
|
||||
}
|
||||
|
||||
# ── Latest persons for person_id ↔ face_id correlation ────────────
|
||||
self._persons_lock = threading.Lock()
|
||||
self._face_to_person: Dict[int, int] = {} # face_id → person_id
|
||||
|
||||
# ── Context for LLM (latest emotion per person) ───────────────────
|
||||
self._emotion_context: Dict[int, str] = {} # person_id → emotion label
|
||||
|
||||
# ── QoS profiles ──────────────────────────────────────────────────
|
||||
be_qos = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=4,
|
||||
)
|
||||
|
||||
# ── Subscriptions ──────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
FaceDetectionArray,
|
||||
"/social/faces/detections",
|
||||
self._on_faces,
|
||||
be_qos,
|
||||
)
|
||||
self.create_subscription(
|
||||
PersonStateArray,
|
||||
"/social/persons",
|
||||
self._on_persons,
|
||||
be_qos,
|
||||
)
|
||||
for name in self._cam_names[:n_cameras]:
|
||||
cam_id = self._cam_id_map.get(name, 0)
|
||||
self.create_subscription(
|
||||
Image,
|
||||
f"/camera/{name}/image_raw",
|
||||
lambda msg, cid=cam_id: self._on_image(msg, cid),
|
||||
be_qos,
|
||||
)
|
||||
|
||||
# ── Publishers ─────────────────────────────────────────────────────
|
||||
self._expr_pub = self.create_publisher(
|
||||
ExpressionArray, "/social/faces/expressions", be_qos
|
||||
)
|
||||
if self._pub_ctx:
|
||||
self._ctx_pub = self.create_publisher(
|
||||
String, "/social/emotion/context", 10
|
||||
)
|
||||
else:
|
||||
self._ctx_pub = None
|
||||
|
||||
# ── Load TRT engine in background ──────────────────────────────────
|
||||
threading.Thread(target=self._load_engine, daemon=True).start()
|
||||
|
||||
self.get_logger().info(
|
||||
f"EmotionNode ready — engine='{engine_path}', "
|
||||
f"alpha={alpha:.2f}, min_conf={self._min_conf:.2f}, "
|
||||
f"cameras={self._cam_names[:n_cameras]}"
|
||||
)
|
||||
|
||||
# ── Engine loading ────────────────────────────────────────────────────────
|
||||
|
||||
def _load_engine(self) -> None:
|
||||
if not self._classifier._engine_path:
|
||||
if self._lm_fallback:
|
||||
self.get_logger().info(
|
||||
"No TRT engine path — using landmark heuristic fallback"
|
||||
)
|
||||
else:
|
||||
self.get_logger().warn(
|
||||
"No TRT engine and landmark_fallback=false — "
|
||||
"emotion_node will not classify"
|
||||
)
|
||||
return
|
||||
|
||||
ok = self._classifier.load()
|
||||
if ok:
|
||||
self.get_logger().info("TRT emotion engine loaded ✓")
|
||||
else:
|
||||
self.get_logger().warn(
|
||||
"TRT engine load failed — falling back to landmark heuristic"
|
||||
)
|
||||
|
||||
# ── Camera frame ingestion ────────────────────────────────────────────────
|
||||
|
||||
def _on_image(self, msg: Image, camera_id: int) -> None:
|
||||
if not _CV_BRIDGE_OK or self._bridge is None:
|
||||
return
|
||||
try:
|
||||
bgr = self._bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
|
||||
self._frame_buf.put(camera_id, bgr)
|
||||
except Exception as exc:
|
||||
self.get_logger().debug(f"cv_bridge error cam{camera_id}: {exc}")
|
||||
|
||||
# ── Person-state for face_id → person_id mapping ─────────────────────────
|
||||
|
||||
def _on_persons(self, msg: PersonStateArray) -> None:
|
||||
mapping: Dict[int, int] = {}
|
||||
for ps in msg.persons:
|
||||
if ps.face_id >= 0:
|
||||
mapping[ps.face_id] = ps.person_id
|
||||
with self._persons_lock:
|
||||
self._face_to_person = mapping
|
||||
|
||||
def _get_person_id(self, face_id: int) -> int:
|
||||
with self._persons_lock:
|
||||
return self._face_to_person.get(face_id, -1)
|
||||
|
||||
# ── Main face-detection callback ──────────────────────────────────────────
|
||||
|
||||
def _on_faces(self, msg: FaceDetectionArray) -> None:
|
||||
if not msg.faces:
|
||||
return
|
||||
|
||||
expressions: List[Expression] = []
|
||||
|
||||
for face in msg.faces:
|
||||
person_id = self._get_person_id(face.face_id)
|
||||
|
||||
# Opt-out check
|
||||
if person_id >= 0 and self._tracker.is_opt_out(person_id):
|
||||
expr = Expression()
|
||||
expr.header = msg.header
|
||||
expr.person_id = person_id
|
||||
expr.face_id = face.face_id
|
||||
expr.emotion = ""
|
||||
expr.confidence = 0.0
|
||||
expr.scores = [0.0] * 7
|
||||
expr.is_opt_out = True
|
||||
expr.source = "opt_out"
|
||||
expressions.append(expr)
|
||||
continue
|
||||
|
||||
# Try TRT crop classification first
|
||||
result = None
|
||||
if self._classifier.ready and _CV2_OK:
|
||||
result = self._classify_crop(face, person_id)
|
||||
|
||||
# Fallback to landmark heuristic
|
||||
if result is None and self._lm_fallback:
|
||||
result = self._classifier.classify_from_landmarks(
|
||||
list(face.landmarks),
|
||||
person_id=person_id,
|
||||
tracker=self._tracker,
|
||||
)
|
||||
|
||||
if result is None:
|
||||
continue
|
||||
|
||||
if result.confidence < self._min_conf:
|
||||
continue
|
||||
|
||||
# Build ROS message
|
||||
expr = Expression()
|
||||
expr.header = msg.header
|
||||
expr.person_id = person_id
|
||||
expr.face_id = face.face_id
|
||||
expr.emotion = result.emotion
|
||||
expr.confidence = result.confidence
|
||||
# Pad/trim scores to exactly 7
|
||||
sc = result.scores
|
||||
expr.scores = (sc + [0.0] * 7)[:7]
|
||||
expr.is_opt_out = False
|
||||
expr.source = result.source
|
||||
expressions.append(expr)
|
||||
|
||||
# Update LLM context cache
|
||||
if person_id >= 0:
|
||||
self._emotion_context[person_id] = result.emotion
|
||||
|
||||
if not expressions:
|
||||
return
|
||||
|
||||
# Publish ExpressionArray
|
||||
arr = ExpressionArray()
|
||||
arr.header = msg.header
|
||||
arr.expressions = expressions
|
||||
self._expr_pub.publish(arr)
|
||||
|
||||
# Publish JSON context for conversation node
|
||||
if self._ctx_pub is not None:
|
||||
self._publish_context()
|
||||
|
||||
self._tracker.tick()
|
||||
|
||||
def _classify_crop(self, face, person_id: int):
|
||||
"""Extract crop from frame buffer and run TRT inference."""
|
||||
# Resolve camera_id from face header frame_id
|
||||
frame_id = face.header.frame_id if face.header.frame_id else "front"
|
||||
cam_name = frame_id.split("/")[-1].split("_")[0] # "front", "left", etc.
|
||||
camera_id = self._cam_id_map.get(cam_name, 0)
|
||||
|
||||
frame = self._frame_buf.get(camera_id)
|
||||
if frame is None:
|
||||
return None
|
||||
|
||||
h, w = frame.shape[:2]
|
||||
x1 = max(0, int(face.bbox_x * w))
|
||||
y1 = max(0, int(face.bbox_y * h))
|
||||
x2 = min(w, int((face.bbox_x + face.bbox_w) * w))
|
||||
y2 = min(h, int((face.bbox_y + face.bbox_h) * h))
|
||||
|
||||
if (x2 - x1) < self._face_min or (y2 - y1) < self._face_min:
|
||||
return None
|
||||
|
||||
crop = frame[y1:y2, x1:x2]
|
||||
if crop.size == 0:
|
||||
return None
|
||||
|
||||
return self._classifier.classify_crop(
|
||||
crop, person_id=person_id, tracker=self._tracker
|
||||
)
|
||||
|
||||
# ── LLM context publisher ─────────────────────────────────────────────────
|
||||
|
||||
def _publish_context(self) -> None:
|
||||
"""Publish latest-emotions dict as JSON for conversation_node."""
|
||||
ctx = {str(pid): emo for pid, emo in self._emotion_context.items()}
|
||||
msg = String()
|
||||
msg.data = json.dumps({"emotions": ctx, "ts": time.time()})
|
||||
self._ctx_pub.publish(msg)
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = EmotionNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -37,6 +37,8 @@ setup(
|
||||
'voice_command_node = saltybot_social.voice_command_node:main',
|
||||
# Multi-camera gesture recognition (Issue #140)
|
||||
'gesture_node = saltybot_social.gesture_node:main',
|
||||
# Facial expression recognition (Issue #161)
|
||||
'emotion_node = saltybot_social.emotion_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
@ -0,0 +1,528 @@
|
||||
"""test_emotion_classifier.py — Unit tests for emotion_classifier (Issue #161).
|
||||
|
||||
Tests cover:
|
||||
- Emotion constant definitions
|
||||
- Softmax normalisation
|
||||
- PersonEmotionTracker: EMA smoothing, age expiry, opt-out, reset
|
||||
- Landmark heuristic classifier: geometric edge cases
|
||||
- EmotionClassifier: classify_crop fallback, classify_from_landmarks
|
||||
- Score renormalisation, argmax, utility helpers
|
||||
- Integration: full classify pipeline with mock engine
|
||||
|
||||
No ROS2, TensorRT, or OpenCV runtime required.
|
||||
"""
|
||||
|
||||
import math
|
||||
import pytest
|
||||
|
||||
from saltybot_social.emotion_classifier import (
|
||||
EMOTIONS,
|
||||
N_CLASSES,
|
||||
ClassifiedEmotion,
|
||||
PersonEmotionTracker,
|
||||
EmotionClassifier,
|
||||
classify_from_landmarks,
|
||||
_softmax,
|
||||
_argmax,
|
||||
_uniform_scores,
|
||||
_renorm,
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _lm_neutral() -> list:
|
||||
"""5-point SCRFD landmarks for a frontal neutral face (normalised)."""
|
||||
return [
|
||||
0.35, 0.38, # left_eye
|
||||
0.65, 0.38, # right_eye
|
||||
0.50, 0.52, # nose
|
||||
0.38, 0.72, # left_mouth
|
||||
0.62, 0.72, # right_mouth
|
||||
]
|
||||
|
||||
|
||||
def _lm_happy() -> list:
|
||||
"""Wide mouth, symmetric corners, mouth well below nose."""
|
||||
return [
|
||||
0.35, 0.38,
|
||||
0.65, 0.38,
|
||||
0.50, 0.52,
|
||||
0.30, 0.74, # wide mouth
|
||||
0.70, 0.74,
|
||||
]
|
||||
|
||||
|
||||
def _lm_sad() -> list:
|
||||
"""Compressed mouth, tight width."""
|
||||
return [
|
||||
0.35, 0.38,
|
||||
0.65, 0.38,
|
||||
0.50, 0.52,
|
||||
0.44, 0.62, # tight, close to nose
|
||||
0.56, 0.62,
|
||||
]
|
||||
|
||||
|
||||
def _lm_asymmetric() -> list:
|
||||
"""Asymmetric mouth corners → disgust/sad signal."""
|
||||
return [
|
||||
0.35, 0.38,
|
||||
0.65, 0.38,
|
||||
0.50, 0.52,
|
||||
0.38, 0.65,
|
||||
0.62, 0.74, # right corner lower → asymmetric
|
||||
]
|
||||
|
||||
|
||||
# ── TestEmotionConstants ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestEmotionConstants:
|
||||
def test_emotions_length(self):
|
||||
assert len(EMOTIONS) == 7
|
||||
|
||||
def test_n_classes(self):
|
||||
assert N_CLASSES == 7
|
||||
|
||||
def test_emotions_labels(self):
|
||||
expected = ["happy", "sad", "angry", "surprised", "fearful", "disgusted", "neutral"]
|
||||
assert EMOTIONS == expected
|
||||
|
||||
def test_happy_index(self):
|
||||
assert EMOTIONS[0] == "happy"
|
||||
|
||||
def test_neutral_index(self):
|
||||
assert EMOTIONS[6] == "neutral"
|
||||
|
||||
def test_all_lowercase(self):
|
||||
for e in EMOTIONS:
|
||||
assert e == e.lower()
|
||||
|
||||
|
||||
# ── TestSoftmax ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestSoftmax:
|
||||
def test_sums_to_one(self):
|
||||
logits = [1.0, 2.0, 0.5, -1.0, 3.0, 0.0, 1.5]
|
||||
result = _softmax(logits)
|
||||
assert abs(sum(result) - 1.0) < 1e-6
|
||||
|
||||
def test_all_positive(self):
|
||||
logits = [1.0, 2.0, 3.0, 0.0, -1.0, -2.0, 0.5]
|
||||
result = _softmax(logits)
|
||||
assert all(s > 0 for s in result)
|
||||
|
||||
def test_max_logit_gives_max_prob(self):
|
||||
logits = [0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 0.0]
|
||||
result = _softmax(logits)
|
||||
assert _argmax(result) == 2
|
||||
|
||||
def test_uniform_logits_uniform_probs(self):
|
||||
logits = [1.0] * 7
|
||||
result = _softmax(logits)
|
||||
for p in result:
|
||||
assert abs(p - 1.0 / 7.0) < 1e-6
|
||||
|
||||
def test_length_preserved(self):
|
||||
logits = [0.0] * 7
|
||||
assert len(_softmax(logits)) == 7
|
||||
|
||||
def test_numerically_stable_large_values(self):
|
||||
logits = [1000.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
|
||||
result = _softmax(logits)
|
||||
assert abs(result[0] - 1.0) < 1e-6
|
||||
|
||||
|
||||
# ── TestArgmax ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestArgmax:
|
||||
def test_finds_max(self):
|
||||
assert _argmax([0.1, 0.2, 0.9, 0.3, 0.1, 0.1, 0.1]) == 2
|
||||
|
||||
def test_single_element(self):
|
||||
assert _argmax([0.5]) == 0
|
||||
|
||||
def test_first_when_all_equal(self):
|
||||
result = _argmax([0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5])
|
||||
assert result == 0
|
||||
|
||||
|
||||
# ── TestUniformScores ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestUniformScores:
|
||||
def test_length(self):
|
||||
assert len(_uniform_scores()) == N_CLASSES
|
||||
|
||||
def test_sums_to_one(self):
|
||||
assert abs(sum(_uniform_scores()) - 1.0) < 1e-9
|
||||
|
||||
def test_all_equal(self):
|
||||
s = _uniform_scores()
|
||||
assert all(abs(x - s[0]) < 1e-9 for x in s)
|
||||
|
||||
|
||||
# ── TestRenorm ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestRenorm:
|
||||
def test_sums_to_one(self):
|
||||
s = [0.1, 0.2, 0.3, 0.1, 0.1, 0.1, 0.1]
|
||||
r = _renorm(s)
|
||||
assert abs(sum(r) - 1.0) < 1e-9
|
||||
|
||||
def test_all_zero_returns_uniform(self):
|
||||
r = _renorm([0.0] * 7)
|
||||
assert len(r) == 7
|
||||
assert abs(sum(r) - 1.0) < 1e-9
|
||||
|
||||
def test_preserves_order(self):
|
||||
s = [0.5, 0.3, 0.0, 0.0, 0.0, 0.0, 0.2]
|
||||
r = _renorm(s)
|
||||
assert r[0] > r[1] > r[6]
|
||||
|
||||
|
||||
# ── TestPersonEmotionTracker ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestPersonEmotionTracker:
|
||||
def _scores(self, idx: int, val: float = 0.8) -> list:
|
||||
s = [0.0] * N_CLASSES
|
||||
s[idx] = val
|
||||
total = val + (N_CLASSES - 1) * 0.02
|
||||
s = [v if i == idx else 0.02 for i, v in enumerate(s)]
|
||||
s[idx] = val
|
||||
t = sum(s)
|
||||
return [x / t for x in s]
|
||||
|
||||
def test_first_call_returns_input(self):
|
||||
tracker = PersonEmotionTracker(alpha=0.5)
|
||||
scores = self._scores(0)
|
||||
result = tracker.smooth(1, scores)
|
||||
assert abs(result[0] - scores[0]) < 1e-9
|
||||
|
||||
def test_ema_converges_toward_new_dominant(self):
|
||||
tracker = PersonEmotionTracker(alpha=0.5)
|
||||
happy = self._scores(0)
|
||||
sad = self._scores(1)
|
||||
tracker.smooth(1, happy)
|
||||
# Push sad repeatedly
|
||||
prev_sad = tracker.smooth(1, sad)[1]
|
||||
for _ in range(20):
|
||||
result = tracker.smooth(1, sad)
|
||||
assert result[1] > prev_sad # sad score increased
|
||||
|
||||
def test_alpha_1_no_smoothing(self):
|
||||
tracker = PersonEmotionTracker(alpha=1.0)
|
||||
s1 = self._scores(0)
|
||||
s2 = self._scores(1)
|
||||
tracker.smooth(1, s1)
|
||||
result = tracker.smooth(1, s2)
|
||||
for a, b in zip(result, s2):
|
||||
assert abs(a - b) < 1e-9
|
||||
|
||||
def test_alpha_0_frozen(self):
|
||||
tracker = PersonEmotionTracker(alpha=0.0)
|
||||
s1 = self._scores(0)
|
||||
s2 = self._scores(1)
|
||||
tracker.smooth(1, s1)
|
||||
result = tracker.smooth(1, s2)
|
||||
for a, b in zip(result, s1):
|
||||
assert abs(a - b) < 1e-9
|
||||
|
||||
def test_different_persons_independent(self):
|
||||
tracker = PersonEmotionTracker(alpha=0.5)
|
||||
happy = self._scores(0)
|
||||
sad = self._scores(1)
|
||||
tracker.smooth(1, happy)
|
||||
tracker.smooth(2, sad)
|
||||
r1 = tracker.smooth(1, happy)
|
||||
r2 = tracker.smooth(2, sad)
|
||||
assert r1[0] > r2[0] # person 1 more happy
|
||||
assert r2[1] > r1[1] # person 2 more sad
|
||||
|
||||
def test_negative_person_id_no_tracking(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
scores = self._scores(2)
|
||||
result = tracker.smooth(-1, scores)
|
||||
assert result == scores # unchanged, not stored
|
||||
|
||||
def test_reset_clears_ema(self):
|
||||
tracker = PersonEmotionTracker(alpha=0.5)
|
||||
s1 = self._scores(0)
|
||||
tracker.smooth(1, s1)
|
||||
tracker.reset(1)
|
||||
assert 1 not in tracker.tracked_ids
|
||||
|
||||
def test_reset_all_clears_all(self):
|
||||
tracker = PersonEmotionTracker(alpha=0.5)
|
||||
for pid in range(5):
|
||||
tracker.smooth(pid, self._scores(pid % N_CLASSES))
|
||||
tracker.reset_all()
|
||||
assert tracker.tracked_ids == []
|
||||
|
||||
def test_tracked_ids_populated(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
tracker.smooth(10, self._scores(0))
|
||||
tracker.smooth(20, self._scores(1))
|
||||
assert set(tracker.tracked_ids) == {10, 20}
|
||||
|
||||
def test_age_expiry_resets_ema(self):
|
||||
tracker = PersonEmotionTracker(alpha=0.5, max_age=3)
|
||||
tracker.smooth(1, self._scores(0))
|
||||
# Advance age beyond max_age
|
||||
for _ in range(4):
|
||||
tracker.tick()
|
||||
# Next smooth after expiry should reset (first call returns input unchanged)
|
||||
fresh_scores = self._scores(1)
|
||||
result = tracker.smooth(1, fresh_scores)
|
||||
# After reset, result should equal fresh_scores exactly
|
||||
for a, b in zip(result, fresh_scores):
|
||||
assert abs(a - b) < 1e-9
|
||||
|
||||
|
||||
# ── TestOptOut ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestOptOut:
|
||||
def test_set_opt_out_true(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
tracker.set_opt_out(42, True)
|
||||
assert tracker.is_opt_out(42)
|
||||
|
||||
def test_set_opt_out_false(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
tracker.set_opt_out(42, True)
|
||||
tracker.set_opt_out(42, False)
|
||||
assert not tracker.is_opt_out(42)
|
||||
|
||||
def test_opt_out_clears_ema(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
tracker.smooth(42, [0.5, 0.1, 0.1, 0.1, 0.1, 0.0, 0.1])
|
||||
assert 42 in tracker.tracked_ids
|
||||
tracker.set_opt_out(42, True)
|
||||
assert 42 not in tracker.tracked_ids
|
||||
|
||||
def test_unknown_person_not_opted_out(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
assert not tracker.is_opt_out(99)
|
||||
|
||||
def test_multiple_opt_outs(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
for pid in [1, 2, 3]:
|
||||
tracker.set_opt_out(pid, True)
|
||||
for pid in [1, 2, 3]:
|
||||
assert tracker.is_opt_out(pid)
|
||||
|
||||
|
||||
# ── TestClassifyFromLandmarks ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestClassifyFromLandmarks:
|
||||
def test_returns_classified_emotion(self):
|
||||
result = classify_from_landmarks(_lm_neutral())
|
||||
assert isinstance(result, ClassifiedEmotion)
|
||||
|
||||
def test_emotion_is_valid_label(self):
|
||||
result = classify_from_landmarks(_lm_neutral())
|
||||
assert result.emotion in EMOTIONS
|
||||
|
||||
def test_scores_length(self):
|
||||
result = classify_from_landmarks(_lm_neutral())
|
||||
assert len(result.scores) == N_CLASSES
|
||||
|
||||
def test_scores_sum_to_one(self):
|
||||
result = classify_from_landmarks(_lm_neutral())
|
||||
# Scores are rounded to 4 dp; allow 1e-3 accumulation across 7 terms
|
||||
assert abs(sum(result.scores) - 1.0) < 1e-3
|
||||
|
||||
def test_confidence_matches_top_score(self):
|
||||
result = classify_from_landmarks(_lm_neutral())
|
||||
# confidence is round(score, 3) and max score is round(s, 4) → ≤0.5e-3 diff
|
||||
assert abs(result.confidence - max(result.scores)) < 5e-3
|
||||
|
||||
def test_source_is_landmark_heuristic(self):
|
||||
result = classify_from_landmarks(_lm_neutral())
|
||||
assert result.source == "landmark_heuristic"
|
||||
|
||||
def test_happy_landmarks_boost_happy(self):
|
||||
happy = classify_from_landmarks(_lm_happy())
|
||||
neutral = classify_from_landmarks(_lm_neutral())
|
||||
# Happy landmarks should give relatively higher happy score
|
||||
assert happy.scores[0] >= neutral.scores[0]
|
||||
|
||||
def test_sad_landmarks_suppress_happy(self):
|
||||
sad_result = classify_from_landmarks(_lm_sad())
|
||||
# Happy score should be relatively low for sad landmarks
|
||||
assert sad_result.scores[0] < 0.5
|
||||
|
||||
def test_asymmetric_mouth_non_neutral(self):
|
||||
asym = classify_from_landmarks(_lm_asymmetric())
|
||||
# Asymmetric → disgust or sad should be elevated
|
||||
assert asym.scores[5] > 0.10 or asym.scores[1] > 0.10
|
||||
|
||||
def test_empty_landmarks_returns_neutral(self):
|
||||
result = classify_from_landmarks([])
|
||||
assert result.emotion == "neutral"
|
||||
|
||||
def test_short_landmarks_returns_neutral(self):
|
||||
result = classify_from_landmarks([0.5, 0.5])
|
||||
assert result.emotion == "neutral"
|
||||
|
||||
def test_all_positive_scores(self):
|
||||
result = classify_from_landmarks(_lm_happy())
|
||||
assert all(s >= 0.0 for s in result.scores)
|
||||
|
||||
def test_confidence_in_range(self):
|
||||
result = classify_from_landmarks(_lm_neutral())
|
||||
assert 0.0 <= result.confidence <= 1.0
|
||||
|
||||
|
||||
# ── TestEmotionClassifier ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestEmotionClassifier:
|
||||
def test_init_no_engine_not_ready(self):
|
||||
clf = EmotionClassifier(engine_path="")
|
||||
assert not clf.ready
|
||||
|
||||
def test_load_empty_path_returns_false(self):
|
||||
clf = EmotionClassifier(engine_path="")
|
||||
assert clf.load() is False
|
||||
|
||||
def test_load_nonexistent_path_returns_false(self):
|
||||
clf = EmotionClassifier(engine_path="/nonexistent/engine.trt")
|
||||
result = clf.load()
|
||||
assert result is False
|
||||
|
||||
def test_classify_crop_fallback_without_engine(self):
|
||||
"""Without TRT engine, classify_crop should return a valid result
|
||||
with landmark heuristic source."""
|
||||
clf = EmotionClassifier(engine_path="")
|
||||
# Build a minimal synthetic 48x48 BGR image
|
||||
try:
|
||||
import numpy as np
|
||||
fake_crop = np.zeros((48, 48, 3), dtype="uint8")
|
||||
result = clf.classify_crop(fake_crop, person_id=-1, tracker=None)
|
||||
assert isinstance(result, ClassifiedEmotion)
|
||||
assert result.emotion in EMOTIONS
|
||||
assert 0.0 <= result.confidence <= 1.0
|
||||
assert len(result.scores) == N_CLASSES
|
||||
except ImportError:
|
||||
pytest.skip("numpy not available")
|
||||
|
||||
def test_classify_from_landmarks_delegates(self):
|
||||
clf = EmotionClassifier(engine_path="")
|
||||
result = clf.classify_from_landmarks(_lm_happy())
|
||||
assert isinstance(result, ClassifiedEmotion)
|
||||
assert result.source == "landmark_heuristic"
|
||||
|
||||
def test_classify_from_landmarks_with_tracker_smooths(self):
|
||||
clf = EmotionClassifier(engine_path="", alpha=0.5)
|
||||
tracker = PersonEmotionTracker(alpha=0.5)
|
||||
r1 = clf.classify_from_landmarks(_lm_happy(), person_id=1, tracker=tracker)
|
||||
r2 = clf.classify_from_landmarks(_lm_sad(), person_id=1, tracker=tracker)
|
||||
# After smoothing, r2's top score should not be identical to raw sad scores
|
||||
# (EMA blends r1 history into r2)
|
||||
raw_sad = classify_from_landmarks(_lm_sad())
|
||||
assert r2.scores != raw_sad.scores
|
||||
|
||||
def test_classify_from_landmarks_no_tracker_no_smooth(self):
|
||||
clf = EmotionClassifier(engine_path="")
|
||||
r1 = clf.classify_from_landmarks(_lm_happy(), person_id=1, tracker=None)
|
||||
r2 = clf.classify_from_landmarks(_lm_happy(), person_id=1, tracker=None)
|
||||
# Without tracker, same input → same output
|
||||
assert r1.scores == r2.scores
|
||||
|
||||
def test_source_fallback_when_no_engine(self):
|
||||
try:
|
||||
import numpy as np
|
||||
except ImportError:
|
||||
pytest.skip("numpy not available")
|
||||
clf = EmotionClassifier(engine_path="")
|
||||
crop = __import__("numpy").zeros((48, 48, 3), dtype="uint8")
|
||||
result = clf.classify_crop(crop)
|
||||
assert result.source == "landmark_heuristic"
|
||||
|
||||
def test_classifier_alpha_propagated_to_tracker(self):
|
||||
clf = EmotionClassifier(engine_path="", alpha=0.1)
|
||||
assert clf._alpha == 0.1
|
||||
|
||||
|
||||
# ── TestClassifiedEmotionDataclass ───────────────────────────────────────────
|
||||
|
||||
|
||||
class TestClassifiedEmotionDataclass:
|
||||
def test_fields(self):
|
||||
ce = ClassifiedEmotion(
|
||||
emotion="happy",
|
||||
confidence=0.85,
|
||||
scores=[0.85, 0.02, 0.02, 0.03, 0.02, 0.02, 0.04],
|
||||
source="cnn_trt",
|
||||
)
|
||||
assert ce.emotion == "happy"
|
||||
assert ce.confidence == 0.85
|
||||
assert len(ce.scores) == 7
|
||||
assert ce.source == "cnn_trt"
|
||||
|
||||
def test_default_source(self):
|
||||
ce = ClassifiedEmotion("neutral", 0.6, [0.0] * 7)
|
||||
assert ce.source == "cnn_trt"
|
||||
|
||||
def test_emotion_is_mutable(self):
|
||||
ce = ClassifiedEmotion("neutral", 0.6, [0.0] * 7)
|
||||
ce.emotion = "happy"
|
||||
assert ce.emotion == "happy"
|
||||
|
||||
|
||||
# ── TestEdgeCases ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_softmax_single_element(self):
|
||||
result = _softmax([5.0])
|
||||
assert abs(result[0] - 1.0) < 1e-9
|
||||
|
||||
def test_tracker_negative_id_no_stored_state(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
scores = [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.4]
|
||||
tracker.smooth(-1, scores)
|
||||
assert tracker.tracked_ids == []
|
||||
|
||||
def test_tick_increments_age(self):
|
||||
tracker = PersonEmotionTracker(max_age=2)
|
||||
tracker.smooth(1, [0.1] * 7)
|
||||
tracker.tick()
|
||||
tracker.tick()
|
||||
tracker.tick()
|
||||
# Age should exceed max_age → next smooth resets
|
||||
fresh = [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0]
|
||||
result = tracker.smooth(1, fresh)
|
||||
assert abs(result[2] - 1.0) < 1e-9
|
||||
|
||||
def test_renorm_negative_scores_safe(self):
|
||||
# Negative scores shouldn't crash (though unusual in practice)
|
||||
scores = [0.1, 0.2, 0.0, 0.0, 0.0, 0.0, 0.3]
|
||||
r = _renorm(scores)
|
||||
assert abs(sum(r) - 1.0) < 1e-9
|
||||
|
||||
def test_landmark_heuristic_very_close_eye_mouth(self):
|
||||
# Degenerate face where everything is at same y → should not crash
|
||||
lm = [0.3, 0.5, 0.7, 0.5, 0.5, 0.5, 0.4, 0.5, 0.6, 0.5]
|
||||
result = classify_from_landmarks(lm)
|
||||
assert result.emotion in EMOTIONS
|
||||
|
||||
def test_opt_out_then_re_enable(self):
|
||||
tracker = PersonEmotionTracker()
|
||||
tracker.smooth(5, [0.1] * 7)
|
||||
tracker.set_opt_out(5, True)
|
||||
assert tracker.is_opt_out(5)
|
||||
tracker.set_opt_out(5, False)
|
||||
assert not tracker.is_opt_out(5)
|
||||
# Should be able to smooth again after re-enable
|
||||
result = tracker.smooth(5, [0.2] * 7)
|
||||
assert len(result) == N_CLASSES
|
||||
@ -38,6 +38,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
# Issue #140 — gesture recognition
|
||||
"msg/Gesture.msg"
|
||||
"msg/GestureArray.msg"
|
||||
# Issue #161 — emotion detection
|
||||
"msg/Expression.msg"
|
||||
"msg/ExpressionArray.msg"
|
||||
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
||||
)
|
||||
|
||||
|
||||
17
jetson/ros2_ws/src/saltybot_social_msgs/msg/Expression.msg
Normal file
17
jetson/ros2_ws/src/saltybot_social_msgs/msg/Expression.msg
Normal file
@ -0,0 +1,17 @@
|
||||
# Expression.msg — Detected facial expression for one person (Issue #161).
|
||||
# Published by emotion_node on /social/faces/expressions
|
||||
|
||||
std_msgs/Header header
|
||||
|
||||
int32 person_id # -1 = unidentified; matches PersonState.person_id
|
||||
int32 face_id # matches FaceDetection.face_id
|
||||
|
||||
string emotion # one of: happy sad angry surprised fearful disgusted neutral
|
||||
float32 confidence # smoothed confidence of the top emotion (0.0–1.0)
|
||||
|
||||
float32[7] scores # per-class softmax scores, order:
|
||||
# [0]=happy [1]=sad [2]=angry [3]=surprised
|
||||
# [4]=fearful [5]=disgusted [6]=neutral
|
||||
|
||||
bool is_opt_out # true = this person opted out; no emotion data published
|
||||
string source # "cnn_trt" | "landmark_heuristic" | "opt_out"
|
||||
@ -0,0 +1,5 @@
|
||||
# ExpressionArray.msg — Batch of detected facial expressions (Issue #161).
|
||||
# Published by emotion_node on /social/faces/expressions
|
||||
|
||||
std_msgs/Header header
|
||||
Expression[] expressions
|
||||
Loading…
x
Reference in New Issue
Block a user