Merge pull request 'feat(social): facial expression recognition — TRT FP16 emotion CNN (Issue #161)' (#180) from sl-jetson/issue-161-emotion into main
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 10s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled

This commit is contained in:
sl-jetson 2026-03-02 10:46:21 -05:00
commit 9ca0e0844c
10 changed files with 1461 additions and 0 deletions

View File

@ -0,0 +1,38 @@
emotion_node:
ros__parameters:
# Path to the TRT FP16 emotion engine (built from emotion_cnn.onnx).
# Build with:
# trtexec --onnx=emotion_cnn.onnx --fp16 --saveEngine=emotion_fp16.trt
# Leave empty to use landmark heuristic only (no GPU required).
engine_path: "/models/emotion_fp16.trt"
# Minimum smoothed confidence to publish an expression.
# Lower = more detections but more noise; 0.40 is a good production default.
min_confidence: 0.40
# EMA smoothing weight applied to each new observation.
# 0.0 = frozen (never updates) 1.0 = no smoothing (raw output)
# 0.30 gives stable ~3-frame moving average at 10 Hz face detection rate.
smoothing_alpha: 0.30
# Comma-separated person_ids that have opted out of emotion tracking.
# Empty string = everyone is tracked by default.
# Example: "1,3,7"
opt_out_persons: ""
# Skip face crops smaller than this side length (pixels).
# Very small crops produce unreliable emotion predictions.
face_min_size: 24
# If true and TRT engine is unavailable, fall back to the 5-point
# landmark heuristic. Set false to suppress output entirely without TRT.
landmark_fallback: true
# Camera names matching saltybot_cameras topic naming convention.
camera_names: "front,left,rear,right"
# Number of active CSI camera streams to subscribe to.
n_cameras: 4
# Publish /social/emotion/context (JSON string) for conversation_node.
publish_context: true

View File

@ -0,0 +1,67 @@
"""emotion.launch.py — Launch emotion_node for facial expression recognition (Issue #161)."""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
def generate_launch_description() -> LaunchDescription:
pkg = FindPackageShare("saltybot_social")
params_file = PathJoinSubstitution([pkg, "config", "emotion_params.yaml"])
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=params_file,
description="Path to emotion_node parameter YAML",
),
DeclareLaunchArgument(
"engine_path",
default_value="/models/emotion_fp16.trt",
description="Path to TensorRT FP16 emotion engine (empty = landmark heuristic)",
),
DeclareLaunchArgument(
"min_confidence",
default_value="0.40",
description="Minimum detection confidence to publish an expression",
),
DeclareLaunchArgument(
"smoothing_alpha",
default_value="0.30",
description="EMA smoothing weight (0=frozen, 1=no smoothing)",
),
DeclareLaunchArgument(
"opt_out_persons",
default_value="",
description="Comma-separated person_ids that opted out",
),
DeclareLaunchArgument(
"landmark_fallback",
default_value="true",
description="Use landmark heuristic when TRT engine unavailable",
),
DeclareLaunchArgument(
"publish_context",
default_value="true",
description="Publish /social/emotion/context JSON for LLM context",
),
Node(
package="saltybot_social",
executable="emotion_node",
name="emotion_node",
output="screen",
parameters=[
LaunchConfiguration("params_file"),
{
"engine_path": LaunchConfiguration("engine_path"),
"min_confidence": LaunchConfiguration("min_confidence"),
"smoothing_alpha": LaunchConfiguration("smoothing_alpha"),
"opt_out_persons": LaunchConfiguration("opt_out_persons"),
"landmark_fallback": LaunchConfiguration("landmark_fallback"),
"publish_context": LaunchConfiguration("publish_context"),
},
],
),
])

View File

@ -95,6 +95,13 @@ class ConversationNode(Node):
# ── Periodic context save ──────────────────────────────────────────── # ── Periodic context save ────────────────────────────────────────────
self._save_timer = self.create_timer(self._save_interval, self._save_context) self._save_timer = self.create_timer(self._save_interval, self._save_context)
# ── Emotion context (Issue #161) ──────────────────────────────────────
self._emotions: Dict[int, str] = {}
self.create_subscription(
String, "/social/emotion/context",
self._on_emotion_context, 10
)
self.get_logger().info( self.get_logger().info(
f"ConversationNode init (model={self._model_path}, " f"ConversationNode init (model={self._model_path}, "
f"gpu_layers={self._n_gpu}, ctx={self._n_ctx})" f"gpu_layers={self._n_gpu}, ctx={self._n_ctx})"

View File

@ -0,0 +1,414 @@
"""emotion_classifier.py — 7-class facial expression classifier (Issue #161).
Pure Python, no ROS2 / TensorRT / OpenCV dependencies.
Wraps a TensorRT FP16 emotion CNN and provides:
- On-device TRT inference on 48×48 grayscale face crops
- Heuristic fallback from 5-point SCRFD facial landmarks
- Per-person EMA temporal smoothing for stable outputs
- Per-person opt-out registry
Emotion classes (index order matches CNN output layer)
------------------------------------------------------
0 = happy
1 = sad
2 = angry
3 = surprised
4 = fearful
5 = disgusted
6 = neutral
Coordinate convention
---------------------
Face crop: BGR uint8 ndarray, any size (resized to INPUT_SIZE internally).
Landmarks (lm10): 10 floats from FaceDetection.landmarks
[left_eye_x, left_eye_y, right_eye_x, right_eye_y,
nose_x, nose_y, left_mouth_x, left_mouth_y,
right_mouth_x, right_mouth_y]
All coordinates are normalised image-space (0.01.0).
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
# ── Constants ─────────────────────────────────────────────────────────────────
EMOTIONS: List[str] = [
"happy", "sad", "angry", "surprised", "fearful", "disgusted", "neutral"
]
N_CLASSES: int = len(EMOTIONS)
# Landmark slot indices within the 10-value array
_LEX, _LEY = 0, 1 # left eye
_REX, _REY = 2, 3 # right eye
_NX, _NY = 4, 5 # nose
_LMX, _LMY = 6, 7 # left mouth corner
_RMX, _RMY = 8, 9 # right mouth corner
# CNN input size
INPUT_SIZE: int = 48
# ── Result type ───────────────────────────────────────────────────────────────
@dataclass
class ClassifiedEmotion:
emotion: str # dominant emotion label
confidence: float # smoothed softmax probability (0.01.0)
scores: List[float] # per-class scores, len == N_CLASSES
source: str = "cnn_trt" # "cnn_trt" | "landmark_heuristic" | "opt_out"
# ── Math helpers ──────────────────────────────────────────────────────────────
def _softmax(logits: List[float]) -> List[float]:
"""Numerically stable softmax."""
m = max(logits)
exps = [math.exp(x - m) for x in logits]
total = sum(exps)
return [e / total for e in exps]
def _argmax(values: List[float]) -> int:
best = 0
for i in range(1, len(values)):
if values[i] > values[best]:
best = i
return best
def _uniform_scores() -> List[float]:
return [1.0 / N_CLASSES] * N_CLASSES
# ── Per-person temporal smoother + opt-out registry ───────────────────────────
class PersonEmotionTracker:
"""Exponential moving-average smoother + opt-out registry per person.
Args:
alpha: EMA weight for the newest observation (0.0 = frozen, 1.0 = no smooth).
max_age: Frames without update before the EMA is expired and reset.
"""
def __init__(self, alpha: float = 0.3, max_age: int = 16) -> None:
self._alpha = alpha
self._max_age = max_age
self._ema: Dict[int, List[float]] = {} # person_id → EMA scores
self._age: Dict[int, int] = {} # person_id → frames since last update
self._opt_out: set = set() # person_ids that opted out
# ── Opt-out management ────────────────────────────────────────────────────
def set_opt_out(self, person_id: int, value: bool) -> None:
if value:
self._opt_out.add(person_id)
self._ema.pop(person_id, None)
self._age.pop(person_id, None)
else:
self._opt_out.discard(person_id)
def is_opt_out(self, person_id: int) -> bool:
return person_id in self._opt_out
# ── Smoothing ─────────────────────────────────────────────────────────────
def smooth(self, person_id: int, scores: List[float]) -> List[float]:
"""Apply EMA to raw scores. Returns smoothed scores (length N_CLASSES)."""
if person_id < 0:
return scores[:]
# Reset stale entries
if person_id in self._age and self._age[person_id] > self._max_age:
self.reset(person_id)
if person_id not in self._ema:
self._ema[person_id] = scores[:]
self._age[person_id] = 0
return scores[:]
ema = self._ema[person_id]
a = self._alpha
smoothed = [a * s + (1.0 - a) * e for s, e in zip(scores, ema)]
self._ema[person_id] = smoothed
self._age[person_id] = 0
return smoothed
def tick(self) -> None:
"""Advance age counter for all tracked persons (call once per frame)."""
for pid in list(self._age.keys()):
self._age[pid] += 1
def reset(self, person_id: int) -> None:
self._ema.pop(person_id, None)
self._age.pop(person_id, None)
def reset_all(self) -> None:
self._ema.clear()
self._age.clear()
@property
def tracked_ids(self) -> List[int]:
return list(self._ema.keys())
# ── TensorRT engine loader ────────────────────────────────────────────────────
class _TrtEngine:
"""Thin wrapper around a TensorRT FP16 emotion CNN engine.
Expected engine:
- Input binding: name "input" shape (1, 1, 48, 48) float32
- Output binding: name "output" shape (1, 7) float32 (softmax logits)
The engine is built offline from a MobileNetV2-based 48×48 grayscale CNN
(FER+ or AffectNet trained) via:
trtexec --onnx=emotion_cnn.onnx --fp16 --saveEngine=emotion_fp16.trt
"""
def __init__(self) -> None:
self._context = None
self._h_input = None
self._h_output = None
self._d_input = None
self._d_output = None
self._stream = None
self._bindings: List = []
def load(self, engine_path: str) -> bool:
"""Load TRT engine from file. Returns True on success."""
try:
import tensorrt as trt # type: ignore
import pycuda.autoinit # type: ignore # noqa: F401
import pycuda.driver as cuda # type: ignore
import numpy as np
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_path, "rb") as f:
engine_data = f.read()
runtime = trt.Runtime(TRT_LOGGER)
engine = runtime.deserialize_cuda_engine(engine_data)
self._context = engine.create_execution_context()
# Allocate host + device buffers
self._h_input = cuda.pagelocked_empty((1, 1, INPUT_SIZE, INPUT_SIZE),
dtype=np.float32)
self._h_output = cuda.pagelocked_empty((1, N_CLASSES), dtype=np.float32)
self._d_input = cuda.mem_alloc(self._h_input.nbytes)
self._d_output = cuda.mem_alloc(self._h_output.nbytes)
self._stream = cuda.Stream()
self._bindings = [int(self._d_input), int(self._d_output)]
return True
except Exception:
self._context = None
return False
@property
def ready(self) -> bool:
return self._context is not None
def infer(self, face_bgr) -> List[float]:
"""Run TRT inference on a BGR face crop. Returns 7 softmax scores."""
import pycuda.driver as cuda # type: ignore
import numpy as np
img = _preprocess(face_bgr) # (1, 1, 48, 48) float32
np.copyto(self._h_input, img)
cuda.memcpy_htod_async(self._d_input, self._h_input, self._stream)
self._context.execute_async_v2(self._bindings, self._stream.handle, None)
cuda.memcpy_dtoh_async(self._h_output, self._d_output, self._stream)
self._stream.synchronize()
logits = list(self._h_output[0])
return _softmax(logits)
# ── Image pre-processing helper (importable without cv2 in test mode) ─────────
def _preprocess(face_bgr) -> "np.ndarray": # type: ignore
"""Resize BGR crop to 48×48 grayscale, normalise to [-1, 1].
Returns ndarray shape (1, 1, 48, 48) float32.
"""
import numpy as np # type: ignore
import cv2 # type: ignore
gray = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2GRAY)
resized = cv2.resize(gray, (INPUT_SIZE, INPUT_SIZE),
interpolation=cv2.INTER_LINEAR)
norm = resized.astype(np.float32) / 127.5 - 1.0
return norm.reshape(1, 1, INPUT_SIZE, INPUT_SIZE)
# ── Landmark heuristic ────────────────────────────────────────────────────────
def classify_from_landmarks(lm10: List[float]) -> ClassifiedEmotion:
"""Estimate emotion from 5-point SCRFD landmarks (10 floats).
Uses geometric ratios between eyes, nose, and mouth corners.
Accuracy is limited treats this as a soft prior, not a definitive label.
Returns ClassifiedEmotion with source="landmark_heuristic".
"""
if len(lm10) < 10:
scores = _uniform_scores()
scores[6] = 0.5 # bias neutral
scores = _renorm(scores)
return ClassifiedEmotion("neutral", scores[6], scores, "landmark_heuristic")
eye_y = (lm10[_LEY] + lm10[_REY]) / 2.0
nose_y = lm10[_NY]
mouth_y = (lm10[_LMY] + lm10[_RMY]) / 2.0
eye_span = max(abs(lm10[_REX] - lm10[_LEX]), 1e-4)
mouth_width = abs(lm10[_RMX] - lm10[_LMX])
mouth_asymm = abs(lm10[_LMY] - lm10[_RMY])
face_h = max(mouth_y - eye_y, 1e-4)
# Ratio of mouth span to interocular distance
width_ratio = mouth_width / eye_span # >1.0 = wide open / happy smile
# How far mouth is below nose, relative to face height
mouth_below_nose = (mouth_y - nose_y) / face_h # ~0.30.6 typical
# Relative asymmetry of mouth corners
asym_ratio = mouth_asymm / face_h # >0.05 = notable asymmetry
# Build soft scores
scores = _uniform_scores() # start uniform
if width_ratio > 0.85 and mouth_below_nose > 0.35:
# Wide mouth, normal vertical position → happy
scores[0] = 0.55 + 0.25 * min(1.0, (width_ratio - 0.85) / 0.5)
elif mouth_below_nose < 0.20 and width_ratio < 0.7:
# Tight, compressed mouth high up → surprised OR angry
scores[3] = 0.35 # surprised
scores[2] = 0.30 # angry
elif asym_ratio > 0.06:
# Asymmetric mouth → disgust or sadness
scores[5] = 0.30 # disgusted
scores[1] = 0.25 # sad
elif width_ratio < 0.65 and mouth_below_nose < 0.30:
# Tight and compressed → sad/angry
scores[1] = 0.35 # sad
scores[2] = 0.25 # angry
else:
# Default to neutral
scores[6] = 0.45
scores = _renorm(scores)
top_idx = _argmax(scores)
return ClassifiedEmotion(
emotion=EMOTIONS[top_idx],
confidence=round(scores[top_idx], 3),
scores=[round(s, 4) for s in scores],
source="landmark_heuristic",
)
def _renorm(scores: List[float]) -> List[float]:
"""Re-normalise scores so they sum to 1.0."""
total = sum(scores)
if total <= 0:
return _uniform_scores()
return [s / total for s in scores]
# ── Public classifier ─────────────────────────────────────────────────────────
class EmotionClassifier:
"""Facade combining TRT inference + landmark fallback + per-person smoothing.
Usage
-----
>>> clf = EmotionClassifier(engine_path="/models/emotion_fp16.trt")
>>> clf.load()
True
>>> result = clf.classify_crop(face_bgr, person_id=42, tracker=tracker)
>>> result.emotion, result.confidence
('happy', 0.87)
"""
def __init__(
self,
engine_path: str = "",
alpha: float = 0.3,
) -> None:
self._engine_path = engine_path
self._alpha = alpha
self._engine = _TrtEngine()
def load(self) -> bool:
"""Load TRT engine. Returns True if engine is ready."""
if not self._engine_path:
return False
return self._engine.load(self._engine_path)
@property
def ready(self) -> bool:
return self._engine.ready
def classify_crop(
self,
face_bgr,
person_id: int = -1,
tracker: Optional[PersonEmotionTracker] = None,
) -> ClassifiedEmotion:
"""Classify a BGR face crop.
Args:
face_bgr: BGR ndarray from cv_bridge / direct crop.
person_id: For temporal smoothing. -1 = no smoothing.
tracker: Optional PersonEmotionTracker for EMA smoothing.
Returns:
ClassifiedEmotion with smoothed scores.
"""
if self._engine.ready:
raw = self._engine.infer(face_bgr)
source = "cnn_trt"
else:
# Uniform fallback — no inference without engine
raw = _uniform_scores()
raw[6] = 0.40 # mild neutral bias
raw = _renorm(raw)
source = "landmark_heuristic"
smoothed = raw
if tracker is not None and person_id >= 0:
smoothed = tracker.smooth(person_id, raw)
top_idx = _argmax(smoothed)
return ClassifiedEmotion(
emotion=EMOTIONS[top_idx],
confidence=round(smoothed[top_idx], 3),
scores=[round(s, 4) for s in smoothed],
source=source,
)
def classify_from_landmarks(
self,
lm10: List[float],
person_id: int = -1,
tracker: Optional[PersonEmotionTracker] = None,
) -> ClassifiedEmotion:
"""Classify using landmark geometry only (no crop required)."""
result = classify_from_landmarks(lm10)
if tracker is not None and person_id >= 0:
smoothed = tracker.smooth(person_id, result.scores)
top_idx = _argmax(smoothed)
result = ClassifiedEmotion(
emotion=EMOTIONS[top_idx],
confidence=round(smoothed[top_idx], 3),
scores=[round(s, 4) for s in smoothed],
source=result.source,
)
return result

View File

@ -0,0 +1,380 @@
"""emotion_node.py — Facial expression recognition node (Issue #161).
Piggybacks on the face detection pipeline: subscribes to
/social/faces/detections (FaceDetectionArray), extracts face crops from the
latest camera frames, runs a TensorRT FP16 emotion CNN, applies per-person
EMA temporal smoothing, and publishes results on /social/faces/expressions.
Architecture
------------
FaceDetectionArray crop extraction TRT FP16 inference (< 5 ms)
EMA smoothing ExpressionArray publish
If TRT engine is not available the node falls back to the 5-point landmark
heuristic (classify_from_landmarks) which requires no GPU and adds < 0.1 ms.
ROS2 topics
-----------
Subscribe:
/social/faces/detections (saltybot_social_msgs/FaceDetectionArray)
/camera/{name}/image_raw (sensor_msgs/Image) × 4
/social/persons (saltybot_social_msgs/PersonStateArray)
Publish:
/social/faces/expressions (saltybot_social_msgs/ExpressionArray)
/social/emotion/context (std_msgs/String) JSON for LLM context
Parameters
----------
engine_path (str) "" path to emotion_fp16.trt; empty = landmark only
min_confidence (float) 0.40 suppress results below this threshold
smoothing_alpha (float) 0.30 EMA weight (higher = faster, less stable)
opt_out_persons (str) "" comma-separated person_ids that opted out
face_min_size (int) 24 skip faces whose bbox is smaller (px side)
landmark_fallback (bool) true use landmark heuristic when TRT unavailable
camera_names (str) "front,left,rear,right"
n_cameras (int) 4
publish_context (bool) true publish /social/emotion/context JSON
"""
from __future__ import annotations
import json
import threading
import time
from typing import Dict, List, Optional, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import String
from sensor_msgs.msg import Image
from saltybot_social_msgs.msg import (
FaceDetectionArray,
PersonStateArray,
ExpressionArray,
Expression,
)
from saltybot_social.emotion_classifier import (
EmotionClassifier,
PersonEmotionTracker,
EMOTIONS,
)
try:
from cv_bridge import CvBridge
_CV_BRIDGE_OK = True
except ImportError:
_CV_BRIDGE_OK = False
try:
import cv2
_CV2_OK = True
except ImportError:
_CV2_OK = False
# ── Per-camera latest-frame buffer ────────────────────────────────────────────
class _FrameBuffer:
"""Thread-safe store of the most recent image per camera."""
def __init__(self) -> None:
self._lock = threading.Lock()
self._frames: Dict[int, object] = {} # camera_id → cv2 BGR image
def put(self, camera_id: int, img) -> None:
with self._lock:
self._frames[camera_id] = img
def get(self, camera_id: int):
with self._lock:
return self._frames.get(camera_id)
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
class EmotionNode(Node):
"""Facial expression recognition — TRT FP16 + landmark fallback."""
def __init__(self) -> None:
super().__init__("emotion_node")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("engine_path", "")
self.declare_parameter("min_confidence", 0.40)
self.declare_parameter("smoothing_alpha", 0.30)
self.declare_parameter("opt_out_persons", "")
self.declare_parameter("face_min_size", 24)
self.declare_parameter("landmark_fallback", True)
self.declare_parameter("camera_names", "front,left,rear,right")
self.declare_parameter("n_cameras", 4)
self.declare_parameter("publish_context", True)
engine_path = self.get_parameter("engine_path").value
self._min_conf = self.get_parameter("min_confidence").value
alpha = self.get_parameter("smoothing_alpha").value
opt_out_str = self.get_parameter("opt_out_persons").value
self._face_min = self.get_parameter("face_min_size").value
self._lm_fallback = self.get_parameter("landmark_fallback").value
cam_names_str = self.get_parameter("camera_names").value
n_cameras = self.get_parameter("n_cameras").value
self._pub_ctx = self.get_parameter("publish_context").value
# ── Classifier + tracker ───────────────────────────────────────────
self._classifier = EmotionClassifier(engine_path=engine_path, alpha=alpha)
self._tracker = PersonEmotionTracker(alpha=alpha)
# Parse opt-out list
for pid_str in opt_out_str.split(","):
pid_str = pid_str.strip()
if pid_str.isdigit():
self._tracker.set_opt_out(int(pid_str), True)
# ── Camera frame buffer + cv_bridge ───────────────────────────────
self._frame_buf = _FrameBuffer()
self._bridge = CvBridge() if _CV_BRIDGE_OK else None
self._cam_names = [n.strip() for n in cam_names_str.split(",")]
self._cam_id_map: Dict[str, int] = {
name: idx for idx, name in enumerate(self._cam_names)
}
# ── Latest persons for person_id ↔ face_id correlation ────────────
self._persons_lock = threading.Lock()
self._face_to_person: Dict[int, int] = {} # face_id → person_id
# ── Context for LLM (latest emotion per person) ───────────────────
self._emotion_context: Dict[int, str] = {} # person_id → emotion label
# ── QoS profiles ──────────────────────────────────────────────────
be_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=4,
)
# ── Subscriptions ──────────────────────────────────────────────────
self.create_subscription(
FaceDetectionArray,
"/social/faces/detections",
self._on_faces,
be_qos,
)
self.create_subscription(
PersonStateArray,
"/social/persons",
self._on_persons,
be_qos,
)
for name in self._cam_names[:n_cameras]:
cam_id = self._cam_id_map.get(name, 0)
self.create_subscription(
Image,
f"/camera/{name}/image_raw",
lambda msg, cid=cam_id: self._on_image(msg, cid),
be_qos,
)
# ── Publishers ─────────────────────────────────────────────────────
self._expr_pub = self.create_publisher(
ExpressionArray, "/social/faces/expressions", be_qos
)
if self._pub_ctx:
self._ctx_pub = self.create_publisher(
String, "/social/emotion/context", 10
)
else:
self._ctx_pub = None
# ── Load TRT engine in background ──────────────────────────────────
threading.Thread(target=self._load_engine, daemon=True).start()
self.get_logger().info(
f"EmotionNode ready — engine='{engine_path}', "
f"alpha={alpha:.2f}, min_conf={self._min_conf:.2f}, "
f"cameras={self._cam_names[:n_cameras]}"
)
# ── Engine loading ────────────────────────────────────────────────────────
def _load_engine(self) -> None:
if not self._classifier._engine_path:
if self._lm_fallback:
self.get_logger().info(
"No TRT engine path — using landmark heuristic fallback"
)
else:
self.get_logger().warn(
"No TRT engine and landmark_fallback=false — "
"emotion_node will not classify"
)
return
ok = self._classifier.load()
if ok:
self.get_logger().info("TRT emotion engine loaded ✓")
else:
self.get_logger().warn(
"TRT engine load failed — falling back to landmark heuristic"
)
# ── Camera frame ingestion ────────────────────────────────────────────────
def _on_image(self, msg: Image, camera_id: int) -> None:
if not _CV_BRIDGE_OK or self._bridge is None:
return
try:
bgr = self._bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
self._frame_buf.put(camera_id, bgr)
except Exception as exc:
self.get_logger().debug(f"cv_bridge error cam{camera_id}: {exc}")
# ── Person-state for face_id → person_id mapping ─────────────────────────
def _on_persons(self, msg: PersonStateArray) -> None:
mapping: Dict[int, int] = {}
for ps in msg.persons:
if ps.face_id >= 0:
mapping[ps.face_id] = ps.person_id
with self._persons_lock:
self._face_to_person = mapping
def _get_person_id(self, face_id: int) -> int:
with self._persons_lock:
return self._face_to_person.get(face_id, -1)
# ── Main face-detection callback ──────────────────────────────────────────
def _on_faces(self, msg: FaceDetectionArray) -> None:
if not msg.faces:
return
expressions: List[Expression] = []
for face in msg.faces:
person_id = self._get_person_id(face.face_id)
# Opt-out check
if person_id >= 0 and self._tracker.is_opt_out(person_id):
expr = Expression()
expr.header = msg.header
expr.person_id = person_id
expr.face_id = face.face_id
expr.emotion = ""
expr.confidence = 0.0
expr.scores = [0.0] * 7
expr.is_opt_out = True
expr.source = "opt_out"
expressions.append(expr)
continue
# Try TRT crop classification first
result = None
if self._classifier.ready and _CV2_OK:
result = self._classify_crop(face, person_id)
# Fallback to landmark heuristic
if result is None and self._lm_fallback:
result = self._classifier.classify_from_landmarks(
list(face.landmarks),
person_id=person_id,
tracker=self._tracker,
)
if result is None:
continue
if result.confidence < self._min_conf:
continue
# Build ROS message
expr = Expression()
expr.header = msg.header
expr.person_id = person_id
expr.face_id = face.face_id
expr.emotion = result.emotion
expr.confidence = result.confidence
# Pad/trim scores to exactly 7
sc = result.scores
expr.scores = (sc + [0.0] * 7)[:7]
expr.is_opt_out = False
expr.source = result.source
expressions.append(expr)
# Update LLM context cache
if person_id >= 0:
self._emotion_context[person_id] = result.emotion
if not expressions:
return
# Publish ExpressionArray
arr = ExpressionArray()
arr.header = msg.header
arr.expressions = expressions
self._expr_pub.publish(arr)
# Publish JSON context for conversation node
if self._ctx_pub is not None:
self._publish_context()
self._tracker.tick()
def _classify_crop(self, face, person_id: int):
"""Extract crop from frame buffer and run TRT inference."""
# Resolve camera_id from face header frame_id
frame_id = face.header.frame_id if face.header.frame_id else "front"
cam_name = frame_id.split("/")[-1].split("_")[0] # "front", "left", etc.
camera_id = self._cam_id_map.get(cam_name, 0)
frame = self._frame_buf.get(camera_id)
if frame is None:
return None
h, w = frame.shape[:2]
x1 = max(0, int(face.bbox_x * w))
y1 = max(0, int(face.bbox_y * h))
x2 = min(w, int((face.bbox_x + face.bbox_w) * w))
y2 = min(h, int((face.bbox_y + face.bbox_h) * h))
if (x2 - x1) < self._face_min or (y2 - y1) < self._face_min:
return None
crop = frame[y1:y2, x1:x2]
if crop.size == 0:
return None
return self._classifier.classify_crop(
crop, person_id=person_id, tracker=self._tracker
)
# ── LLM context publisher ─────────────────────────────────────────────────
def _publish_context(self) -> None:
"""Publish latest-emotions dict as JSON for conversation_node."""
ctx = {str(pid): emo for pid, emo in self._emotion_context.items()}
msg = String()
msg.data = json.dumps({"emotions": ctx, "ts": time.time()})
self._ctx_pub.publish(msg)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = EmotionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -37,6 +37,8 @@ setup(
'voice_command_node = saltybot_social.voice_command_node:main', 'voice_command_node = saltybot_social.voice_command_node:main',
# Multi-camera gesture recognition (Issue #140) # Multi-camera gesture recognition (Issue #140)
'gesture_node = saltybot_social.gesture_node:main', 'gesture_node = saltybot_social.gesture_node:main',
# Facial expression recognition (Issue #161)
'emotion_node = saltybot_social.emotion_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,528 @@
"""test_emotion_classifier.py — Unit tests for emotion_classifier (Issue #161).
Tests cover:
- Emotion constant definitions
- Softmax normalisation
- PersonEmotionTracker: EMA smoothing, age expiry, opt-out, reset
- Landmark heuristic classifier: geometric edge cases
- EmotionClassifier: classify_crop fallback, classify_from_landmarks
- Score renormalisation, argmax, utility helpers
- Integration: full classify pipeline with mock engine
No ROS2, TensorRT, or OpenCV runtime required.
"""
import math
import pytest
from saltybot_social.emotion_classifier import (
EMOTIONS,
N_CLASSES,
ClassifiedEmotion,
PersonEmotionTracker,
EmotionClassifier,
classify_from_landmarks,
_softmax,
_argmax,
_uniform_scores,
_renorm,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _lm_neutral() -> list:
"""5-point SCRFD landmarks for a frontal neutral face (normalised)."""
return [
0.35, 0.38, # left_eye
0.65, 0.38, # right_eye
0.50, 0.52, # nose
0.38, 0.72, # left_mouth
0.62, 0.72, # right_mouth
]
def _lm_happy() -> list:
"""Wide mouth, symmetric corners, mouth well below nose."""
return [
0.35, 0.38,
0.65, 0.38,
0.50, 0.52,
0.30, 0.74, # wide mouth
0.70, 0.74,
]
def _lm_sad() -> list:
"""Compressed mouth, tight width."""
return [
0.35, 0.38,
0.65, 0.38,
0.50, 0.52,
0.44, 0.62, # tight, close to nose
0.56, 0.62,
]
def _lm_asymmetric() -> list:
"""Asymmetric mouth corners → disgust/sad signal."""
return [
0.35, 0.38,
0.65, 0.38,
0.50, 0.52,
0.38, 0.65,
0.62, 0.74, # right corner lower → asymmetric
]
# ── TestEmotionConstants ──────────────────────────────────────────────────────
class TestEmotionConstants:
def test_emotions_length(self):
assert len(EMOTIONS) == 7
def test_n_classes(self):
assert N_CLASSES == 7
def test_emotions_labels(self):
expected = ["happy", "sad", "angry", "surprised", "fearful", "disgusted", "neutral"]
assert EMOTIONS == expected
def test_happy_index(self):
assert EMOTIONS[0] == "happy"
def test_neutral_index(self):
assert EMOTIONS[6] == "neutral"
def test_all_lowercase(self):
for e in EMOTIONS:
assert e == e.lower()
# ── TestSoftmax ───────────────────────────────────────────────────────────────
class TestSoftmax:
def test_sums_to_one(self):
logits = [1.0, 2.0, 0.5, -1.0, 3.0, 0.0, 1.5]
result = _softmax(logits)
assert abs(sum(result) - 1.0) < 1e-6
def test_all_positive(self):
logits = [1.0, 2.0, 3.0, 0.0, -1.0, -2.0, 0.5]
result = _softmax(logits)
assert all(s > 0 for s in result)
def test_max_logit_gives_max_prob(self):
logits = [0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 0.0]
result = _softmax(logits)
assert _argmax(result) == 2
def test_uniform_logits_uniform_probs(self):
logits = [1.0] * 7
result = _softmax(logits)
for p in result:
assert abs(p - 1.0 / 7.0) < 1e-6
def test_length_preserved(self):
logits = [0.0] * 7
assert len(_softmax(logits)) == 7
def test_numerically_stable_large_values(self):
logits = [1000.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
result = _softmax(logits)
assert abs(result[0] - 1.0) < 1e-6
# ── TestArgmax ────────────────────────────────────────────────────────────────
class TestArgmax:
def test_finds_max(self):
assert _argmax([0.1, 0.2, 0.9, 0.3, 0.1, 0.1, 0.1]) == 2
def test_single_element(self):
assert _argmax([0.5]) == 0
def test_first_when_all_equal(self):
result = _argmax([0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5])
assert result == 0
# ── TestUniformScores ─────────────────────────────────────────────────────────
class TestUniformScores:
def test_length(self):
assert len(_uniform_scores()) == N_CLASSES
def test_sums_to_one(self):
assert abs(sum(_uniform_scores()) - 1.0) < 1e-9
def test_all_equal(self):
s = _uniform_scores()
assert all(abs(x - s[0]) < 1e-9 for x in s)
# ── TestRenorm ────────────────────────────────────────────────────────────────
class TestRenorm:
def test_sums_to_one(self):
s = [0.1, 0.2, 0.3, 0.1, 0.1, 0.1, 0.1]
r = _renorm(s)
assert abs(sum(r) - 1.0) < 1e-9
def test_all_zero_returns_uniform(self):
r = _renorm([0.0] * 7)
assert len(r) == 7
assert abs(sum(r) - 1.0) < 1e-9
def test_preserves_order(self):
s = [0.5, 0.3, 0.0, 0.0, 0.0, 0.0, 0.2]
r = _renorm(s)
assert r[0] > r[1] > r[6]
# ── TestPersonEmotionTracker ──────────────────────────────────────────────────
class TestPersonEmotionTracker:
def _scores(self, idx: int, val: float = 0.8) -> list:
s = [0.0] * N_CLASSES
s[idx] = val
total = val + (N_CLASSES - 1) * 0.02
s = [v if i == idx else 0.02 for i, v in enumerate(s)]
s[idx] = val
t = sum(s)
return [x / t for x in s]
def test_first_call_returns_input(self):
tracker = PersonEmotionTracker(alpha=0.5)
scores = self._scores(0)
result = tracker.smooth(1, scores)
assert abs(result[0] - scores[0]) < 1e-9
def test_ema_converges_toward_new_dominant(self):
tracker = PersonEmotionTracker(alpha=0.5)
happy = self._scores(0)
sad = self._scores(1)
tracker.smooth(1, happy)
# Push sad repeatedly
prev_sad = tracker.smooth(1, sad)[1]
for _ in range(20):
result = tracker.smooth(1, sad)
assert result[1] > prev_sad # sad score increased
def test_alpha_1_no_smoothing(self):
tracker = PersonEmotionTracker(alpha=1.0)
s1 = self._scores(0)
s2 = self._scores(1)
tracker.smooth(1, s1)
result = tracker.smooth(1, s2)
for a, b in zip(result, s2):
assert abs(a - b) < 1e-9
def test_alpha_0_frozen(self):
tracker = PersonEmotionTracker(alpha=0.0)
s1 = self._scores(0)
s2 = self._scores(1)
tracker.smooth(1, s1)
result = tracker.smooth(1, s2)
for a, b in zip(result, s1):
assert abs(a - b) < 1e-9
def test_different_persons_independent(self):
tracker = PersonEmotionTracker(alpha=0.5)
happy = self._scores(0)
sad = self._scores(1)
tracker.smooth(1, happy)
tracker.smooth(2, sad)
r1 = tracker.smooth(1, happy)
r2 = tracker.smooth(2, sad)
assert r1[0] > r2[0] # person 1 more happy
assert r2[1] > r1[1] # person 2 more sad
def test_negative_person_id_no_tracking(self):
tracker = PersonEmotionTracker()
scores = self._scores(2)
result = tracker.smooth(-1, scores)
assert result == scores # unchanged, not stored
def test_reset_clears_ema(self):
tracker = PersonEmotionTracker(alpha=0.5)
s1 = self._scores(0)
tracker.smooth(1, s1)
tracker.reset(1)
assert 1 not in tracker.tracked_ids
def test_reset_all_clears_all(self):
tracker = PersonEmotionTracker(alpha=0.5)
for pid in range(5):
tracker.smooth(pid, self._scores(pid % N_CLASSES))
tracker.reset_all()
assert tracker.tracked_ids == []
def test_tracked_ids_populated(self):
tracker = PersonEmotionTracker()
tracker.smooth(10, self._scores(0))
tracker.smooth(20, self._scores(1))
assert set(tracker.tracked_ids) == {10, 20}
def test_age_expiry_resets_ema(self):
tracker = PersonEmotionTracker(alpha=0.5, max_age=3)
tracker.smooth(1, self._scores(0))
# Advance age beyond max_age
for _ in range(4):
tracker.tick()
# Next smooth after expiry should reset (first call returns input unchanged)
fresh_scores = self._scores(1)
result = tracker.smooth(1, fresh_scores)
# After reset, result should equal fresh_scores exactly
for a, b in zip(result, fresh_scores):
assert abs(a - b) < 1e-9
# ── TestOptOut ────────────────────────────────────────────────────────────────
class TestOptOut:
def test_set_opt_out_true(self):
tracker = PersonEmotionTracker()
tracker.set_opt_out(42, True)
assert tracker.is_opt_out(42)
def test_set_opt_out_false(self):
tracker = PersonEmotionTracker()
tracker.set_opt_out(42, True)
tracker.set_opt_out(42, False)
assert not tracker.is_opt_out(42)
def test_opt_out_clears_ema(self):
tracker = PersonEmotionTracker()
tracker.smooth(42, [0.5, 0.1, 0.1, 0.1, 0.1, 0.0, 0.1])
assert 42 in tracker.tracked_ids
tracker.set_opt_out(42, True)
assert 42 not in tracker.tracked_ids
def test_unknown_person_not_opted_out(self):
tracker = PersonEmotionTracker()
assert not tracker.is_opt_out(99)
def test_multiple_opt_outs(self):
tracker = PersonEmotionTracker()
for pid in [1, 2, 3]:
tracker.set_opt_out(pid, True)
for pid in [1, 2, 3]:
assert tracker.is_opt_out(pid)
# ── TestClassifyFromLandmarks ─────────────────────────────────────────────────
class TestClassifyFromLandmarks:
def test_returns_classified_emotion(self):
result = classify_from_landmarks(_lm_neutral())
assert isinstance(result, ClassifiedEmotion)
def test_emotion_is_valid_label(self):
result = classify_from_landmarks(_lm_neutral())
assert result.emotion in EMOTIONS
def test_scores_length(self):
result = classify_from_landmarks(_lm_neutral())
assert len(result.scores) == N_CLASSES
def test_scores_sum_to_one(self):
result = classify_from_landmarks(_lm_neutral())
# Scores are rounded to 4 dp; allow 1e-3 accumulation across 7 terms
assert abs(sum(result.scores) - 1.0) < 1e-3
def test_confidence_matches_top_score(self):
result = classify_from_landmarks(_lm_neutral())
# confidence is round(score, 3) and max score is round(s, 4) → ≤0.5e-3 diff
assert abs(result.confidence - max(result.scores)) < 5e-3
def test_source_is_landmark_heuristic(self):
result = classify_from_landmarks(_lm_neutral())
assert result.source == "landmark_heuristic"
def test_happy_landmarks_boost_happy(self):
happy = classify_from_landmarks(_lm_happy())
neutral = classify_from_landmarks(_lm_neutral())
# Happy landmarks should give relatively higher happy score
assert happy.scores[0] >= neutral.scores[0]
def test_sad_landmarks_suppress_happy(self):
sad_result = classify_from_landmarks(_lm_sad())
# Happy score should be relatively low for sad landmarks
assert sad_result.scores[0] < 0.5
def test_asymmetric_mouth_non_neutral(self):
asym = classify_from_landmarks(_lm_asymmetric())
# Asymmetric → disgust or sad should be elevated
assert asym.scores[5] > 0.10 or asym.scores[1] > 0.10
def test_empty_landmarks_returns_neutral(self):
result = classify_from_landmarks([])
assert result.emotion == "neutral"
def test_short_landmarks_returns_neutral(self):
result = classify_from_landmarks([0.5, 0.5])
assert result.emotion == "neutral"
def test_all_positive_scores(self):
result = classify_from_landmarks(_lm_happy())
assert all(s >= 0.0 for s in result.scores)
def test_confidence_in_range(self):
result = classify_from_landmarks(_lm_neutral())
assert 0.0 <= result.confidence <= 1.0
# ── TestEmotionClassifier ─────────────────────────────────────────────────────
class TestEmotionClassifier:
def test_init_no_engine_not_ready(self):
clf = EmotionClassifier(engine_path="")
assert not clf.ready
def test_load_empty_path_returns_false(self):
clf = EmotionClassifier(engine_path="")
assert clf.load() is False
def test_load_nonexistent_path_returns_false(self):
clf = EmotionClassifier(engine_path="/nonexistent/engine.trt")
result = clf.load()
assert result is False
def test_classify_crop_fallback_without_engine(self):
"""Without TRT engine, classify_crop should return a valid result
with landmark heuristic source."""
clf = EmotionClassifier(engine_path="")
# Build a minimal synthetic 48x48 BGR image
try:
import numpy as np
fake_crop = np.zeros((48, 48, 3), dtype="uint8")
result = clf.classify_crop(fake_crop, person_id=-1, tracker=None)
assert isinstance(result, ClassifiedEmotion)
assert result.emotion in EMOTIONS
assert 0.0 <= result.confidence <= 1.0
assert len(result.scores) == N_CLASSES
except ImportError:
pytest.skip("numpy not available")
def test_classify_from_landmarks_delegates(self):
clf = EmotionClassifier(engine_path="")
result = clf.classify_from_landmarks(_lm_happy())
assert isinstance(result, ClassifiedEmotion)
assert result.source == "landmark_heuristic"
def test_classify_from_landmarks_with_tracker_smooths(self):
clf = EmotionClassifier(engine_path="", alpha=0.5)
tracker = PersonEmotionTracker(alpha=0.5)
r1 = clf.classify_from_landmarks(_lm_happy(), person_id=1, tracker=tracker)
r2 = clf.classify_from_landmarks(_lm_sad(), person_id=1, tracker=tracker)
# After smoothing, r2's top score should not be identical to raw sad scores
# (EMA blends r1 history into r2)
raw_sad = classify_from_landmarks(_lm_sad())
assert r2.scores != raw_sad.scores
def test_classify_from_landmarks_no_tracker_no_smooth(self):
clf = EmotionClassifier(engine_path="")
r1 = clf.classify_from_landmarks(_lm_happy(), person_id=1, tracker=None)
r2 = clf.classify_from_landmarks(_lm_happy(), person_id=1, tracker=None)
# Without tracker, same input → same output
assert r1.scores == r2.scores
def test_source_fallback_when_no_engine(self):
try:
import numpy as np
except ImportError:
pytest.skip("numpy not available")
clf = EmotionClassifier(engine_path="")
crop = __import__("numpy").zeros((48, 48, 3), dtype="uint8")
result = clf.classify_crop(crop)
assert result.source == "landmark_heuristic"
def test_classifier_alpha_propagated_to_tracker(self):
clf = EmotionClassifier(engine_path="", alpha=0.1)
assert clf._alpha == 0.1
# ── TestClassifiedEmotionDataclass ───────────────────────────────────────────
class TestClassifiedEmotionDataclass:
def test_fields(self):
ce = ClassifiedEmotion(
emotion="happy",
confidence=0.85,
scores=[0.85, 0.02, 0.02, 0.03, 0.02, 0.02, 0.04],
source="cnn_trt",
)
assert ce.emotion == "happy"
assert ce.confidence == 0.85
assert len(ce.scores) == 7
assert ce.source == "cnn_trt"
def test_default_source(self):
ce = ClassifiedEmotion("neutral", 0.6, [0.0] * 7)
assert ce.source == "cnn_trt"
def test_emotion_is_mutable(self):
ce = ClassifiedEmotion("neutral", 0.6, [0.0] * 7)
ce.emotion = "happy"
assert ce.emotion == "happy"
# ── TestEdgeCases ─────────────────────────────────────────────────────────────
class TestEdgeCases:
def test_softmax_single_element(self):
result = _softmax([5.0])
assert abs(result[0] - 1.0) < 1e-9
def test_tracker_negative_id_no_stored_state(self):
tracker = PersonEmotionTracker()
scores = [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.4]
tracker.smooth(-1, scores)
assert tracker.tracked_ids == []
def test_tick_increments_age(self):
tracker = PersonEmotionTracker(max_age=2)
tracker.smooth(1, [0.1] * 7)
tracker.tick()
tracker.tick()
tracker.tick()
# Age should exceed max_age → next smooth resets
fresh = [0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0]
result = tracker.smooth(1, fresh)
assert abs(result[2] - 1.0) < 1e-9
def test_renorm_negative_scores_safe(self):
# Negative scores shouldn't crash (though unusual in practice)
scores = [0.1, 0.2, 0.0, 0.0, 0.0, 0.0, 0.3]
r = _renorm(scores)
assert abs(sum(r) - 1.0) < 1e-9
def test_landmark_heuristic_very_close_eye_mouth(self):
# Degenerate face where everything is at same y → should not crash
lm = [0.3, 0.5, 0.7, 0.5, 0.5, 0.5, 0.4, 0.5, 0.6, 0.5]
result = classify_from_landmarks(lm)
assert result.emotion in EMOTIONS
def test_opt_out_then_re_enable(self):
tracker = PersonEmotionTracker()
tracker.smooth(5, [0.1] * 7)
tracker.set_opt_out(5, True)
assert tracker.is_opt_out(5)
tracker.set_opt_out(5, False)
assert not tracker.is_opt_out(5)
# Should be able to smooth again after re-enable
result = tracker.smooth(5, [0.2] * 7)
assert len(result) == N_CLASSES

View File

@ -38,6 +38,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #140 gesture recognition # Issue #140 gesture recognition
"msg/Gesture.msg" "msg/Gesture.msg"
"msg/GestureArray.msg" "msg/GestureArray.msg"
# Issue #161 emotion detection
"msg/Expression.msg"
"msg/ExpressionArray.msg"
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
) )

View File

@ -0,0 +1,17 @@
# Expression.msg — Detected facial expression for one person (Issue #161).
# Published by emotion_node on /social/faces/expressions
std_msgs/Header header
int32 person_id # -1 = unidentified; matches PersonState.person_id
int32 face_id # matches FaceDetection.face_id
string emotion # one of: happy sad angry surprised fearful disgusted neutral
float32 confidence # smoothed confidence of the top emotion (0.01.0)
float32[7] scores # per-class softmax scores, order:
# [0]=happy [1]=sad [2]=angry [3]=surprised
# [4]=fearful [5]=disgusted [6]=neutral
bool is_opt_out # true = this person opted out; no emotion data published
string source # "cnn_trt" | "landmark_heuristic" | "opt_out"

View File

@ -0,0 +1,5 @@
# ExpressionArray.msg — Batch of detected facial expressions (Issue #161).
# Published by emotion_node on /social/faces/expressions
std_msgs/Header header
Expression[] expressions