Merge pull request 'feat(social): multi-camera gesture recognition — MediaPipe Hands + Pose (Issue #140)' (#162) from sl-jetson/issue-140-gestures into main
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 13s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled

This commit is contained in:
sl-jetson 2026-03-02 10:26:03 -05:00
commit ecb95c738b
9 changed files with 1695 additions and 0 deletions

View File

@ -0,0 +1,31 @@
gesture_node:
ros__parameters:
# Number of active CSI camera streams to subscribe to
n_cameras: 4
# Target processing rate in Hz. At 10 Hz with 4 cameras, each camera
# contributes frames at approximately 2.5 Hz (round-robin scheduling).
process_fps: 10.0
# Minimum MediaPipe detection confidence to publish a gesture
min_confidence: 0.60
# MediaPipe model complexity:
# 0 = lite (fastest, ≈ 13 ms/frame on Orin Nano — use for production)
# 1 = full (more accurate, ≈ 25 ms/frame)
# 2 = heavy (highest accuracy, ≈ 50 ms/frame, requires large VRAM)
model_complexity: 0
# Maximum hands detected per frame
max_hands: 2
# Enable/disable inference modules
pose_enabled: true
hands_enabled: true
# Comma-separated gesture allowlist. Empty string = all gestures enabled.
# Example: "wave,stop_palm,thumbs_up,arms_up"
enabled_gestures: ""
# Camera name list (must match saltybot_cameras topic naming)
camera_names: "front,left,rear,right"

View File

@ -0,0 +1,67 @@
"""gesture.launch.py — Launch gesture_node for multi-camera gesture recognition (Issue #140)."""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
def generate_launch_description() -> LaunchDescription:
pkg = FindPackageShare("saltybot_social")
params_file = PathJoinSubstitution([pkg, "config", "gesture_params.yaml"])
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=params_file,
description="Path to gesture_node parameter YAML",
),
DeclareLaunchArgument(
"process_fps",
default_value="10.0",
description="Target gesture detection rate (Hz)",
),
DeclareLaunchArgument(
"model_complexity",
default_value="0",
description="MediaPipe model complexity: 0=lite, 1=full, 2=heavy",
),
DeclareLaunchArgument(
"pose_enabled",
default_value="true",
description="Enable MediaPipe Pose body-gesture detection",
),
DeclareLaunchArgument(
"hands_enabled",
default_value="true",
description="Enable MediaPipe Hands hand-gesture detection",
),
DeclareLaunchArgument(
"enabled_gestures",
default_value="",
description="Comma-separated gesture allowlist; empty = all",
),
DeclareLaunchArgument(
"min_confidence",
default_value="0.60",
description="Minimum detection confidence threshold",
),
Node(
package="saltybot_social",
executable="gesture_node",
name="gesture_node",
output="screen",
parameters=[
LaunchConfiguration("params_file"),
{
"process_fps": LaunchConfiguration("process_fps"),
"model_complexity": LaunchConfiguration("model_complexity"),
"pose_enabled": LaunchConfiguration("pose_enabled"),
"hands_enabled": LaunchConfiguration("hands_enabled"),
"enabled_gestures": LaunchConfiguration("enabled_gestures"),
"min_confidence": LaunchConfiguration("min_confidence"),
},
],
),
])

View File

@ -0,0 +1,483 @@
"""gesture_classifier.py — Geometric gesture classification from MediaPipe landmarks.
Pure Python, no ROS2 / MediaPipe / OpenCV dependencies.
Consumes normalised landmark arrays produced by MediaPipe Hands and Pose,
classifies them into named gesture types, and tracks temporal gestures
(wave, come_here) via per-stream history buffers.
Usage
-----
>>> from saltybot_social.gesture_classifier import (
... Landmark, classify_hand, classify_pose, WaveDetector
... )
>>> lm = [Landmark(x, y, z) for x, y, z in raw_coords]
>>> result = classify_hand(lm, is_right_hand=True, wave_det=None)
>>> result.gesture_type, result.confidence
('stop_palm', 0.92)
Coordinate convention
---------------------
MediaPipe Hands / Pose use image-normalised coordinates:
x: 0.0 = left edge, 1.0 = right edge of frame
y: 0.0 = top edge, 1.0 = bottom edge ( positive)
z: depth (approx), 0 at wrist/hip plane; negative = towards camera
"""
from __future__ import annotations
import math
from collections import deque
from dataclasses import dataclass
from typing import Deque, List, Optional, Tuple
# ── Landmark type ─────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class Landmark:
x: float
y: float
z: float = 0.0
visibility: float = 1.0
# ── Result type ───────────────────────────────────────────────────────────────
@dataclass
class ClassifiedGesture:
gesture_type: str # e.g. "stop_palm", "wave", "arms_up"
confidence: float # 0.01.0
hand_x: float = 0.5 # normalised image x of gesture anchor
hand_y: float = 0.5 # normalised image y of gesture anchor
is_right_hand: bool = True
direction: str = "" # for "point": "left"/"right"/"up"/"forward"
source: str = "hand" # "hand" or "body_pose"
# ── MediaPipe Hands landmark indices ─────────────────────────────────────────
# Reference: https://developers.google.com/mediapipe/solutions/vision/hand_landmarker
_WRIST = 0
_THUMB_CMC = 1; _THUMB_MCP = 2; _THUMB_IP = 3; _THUMB_TIP = 4
_INDEX_MCP = 5; _INDEX_PIP = 6; _INDEX_DIP = 7; _INDEX_TIP = 8
_MIDDLE_MCP = 9; _MIDDLE_PIP = 10; _MIDDLE_DIP = 11; _MIDDLE_TIP = 12
_RING_MCP = 13; _RING_PIP = 14; _RING_DIP = 15; _RING_TIP = 16
_PINKY_MCP = 17; _PINKY_PIP = 18; _PINKY_DIP = 19; _PINKY_TIP = 20
# ── MediaPipe Pose landmark indices ──────────────────────────────────────────
_L_SHOULDER = 11; _R_SHOULDER = 12
_L_ELBOW = 13; _R_ELBOW = 14
_L_WRIST = 15; _R_WRIST = 16
_L_HIP = 23; _R_HIP = 24
_L_KNEE = 25; _R_KNEE = 26
_L_ANKLE = 27; _R_ANKLE = 28
# ── Finger-extension helpers ──────────────────────────────────────────────────
def _finger_up(lm: List[Landmark], tip: int, pip: int) -> bool:
"""Return True if the finger tip is above (smaller y) its PIP joint."""
return lm[tip].y < lm[pip].y
def _finger_up_score(lm: List[Landmark], tip: int, pip: int, mcp: int) -> float:
"""Return extension score in [0, 1] based on how far tip is above MCP."""
spread = lm[mcp].y - lm[tip].y # positive = tip above mcp
palm_height = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
return max(0.0, min(1.0, spread / palm_height))
def _thumb_up(lm: List[Landmark], is_right: bool) -> bool:
"""True when the thumb is extended upward (tip clearly above CMC base)."""
return lm[_THUMB_TIP].y < lm[_THUMB_CMC].y - 0.02
def _thumb_down(lm: List[Landmark]) -> bool:
"""True when the thumb is pointing downward (tip below the wrist)."""
return lm[_THUMB_TIP].y > lm[_WRIST].y + 0.04
def _four_fingers_curled(lm: List[Landmark]) -> bool:
"""True when index, middle, ring, and pinky are all NOT extended."""
fingers = [
(_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP),
(_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP),
]
return not any(_finger_up(lm, t, p) for t, p in fingers)
def _count_fingers_up(lm: List[Landmark]) -> int:
"""Count how many of index/middle/ring/pinky are extended."""
pairs = [
(_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP),
(_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP),
]
return sum(_finger_up(lm, t, p) for t, p in pairs)
def _palm_center(lm: List[Landmark]) -> Tuple[float, float]:
"""Return (x, y) centroid of the four MCP knuckles."""
xs = [lm[i].x for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)]
ys = [lm[i].y for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)]
return sum(xs) / 4, sum(ys) / 4
# ── Point direction ───────────────────────────────────────────────────────────
def _point_direction(lm: List[Landmark]) -> str:
"""Determine the pointing direction from index MCP → TIP vector."""
dx = lm[_INDEX_TIP].x - lm[_INDEX_MCP].x
dy = lm[_INDEX_TIP].y - lm[_INDEX_MCP].y # positive = downward in image
angle_deg = math.degrees(math.atan2(-dy, dx)) # flip y so up=+90
# Quantise into 8 compass directions
if -22.5 <= angle_deg < 22.5:
return "right"
elif 22.5 <= angle_deg < 67.5:
return "upper_right"
elif 67.5 <= angle_deg < 112.5:
return "up"
elif 112.5 <= angle_deg < 157.5:
return "upper_left"
elif angle_deg >= 157.5 or angle_deg < -157.5:
return "left"
elif -157.5 <= angle_deg < -112.5:
return "lower_left"
elif -112.5 <= angle_deg < -67.5:
return "down"
else:
return "lower_right"
# ── Hand gesture classifiers ──────────────────────────────────────────────────
def _classify_stop_palm(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
"""All five fingers extended — 'stop' hand signal."""
n_up = _count_fingers_up(lm)
if n_up < 4:
return None
# Avg extension score for quality measure
ext_scores = [
_finger_up_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP),
_finger_up_score(lm, _MIDDLE_TIP, _MIDDLE_PIP, _MIDDLE_MCP),
_finger_up_score(lm, _RING_TIP, _RING_PIP, _RING_MCP),
_finger_up_score(lm, _PINKY_TIP, _PINKY_PIP, _PINKY_MCP),
]
avg_ext = sum(ext_scores) / len(ext_scores)
conf = 0.6 + 0.35 * avg_ext + 0.05 * (n_up == 4)
cx, cy = _palm_center(lm)
return ClassifiedGesture(
gesture_type="stop_palm", confidence=round(conf, 3),
hand_x=cx, hand_y=cy
)
def _classify_thumbs_up(lm: List[Landmark], is_right: bool) -> Optional[ClassifiedGesture]:
"""Thumb extended upward, other fingers curled."""
if not _thumb_up(lm, is_right):
return None
if not _four_fingers_curled(lm):
return None
gap = lm[_THUMB_CMC].y - lm[_THUMB_TIP].y # larger = more vertical
palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
conf = min(0.95, 0.6 + 0.35 * min(1.0, gap / palm_h))
cx, cy = _palm_center(lm)
return ClassifiedGesture(
gesture_type="thumbs_up", confidence=round(conf, 3),
hand_x=cx, hand_y=cy
)
def _classify_thumbs_down(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
"""Thumb pointing downward, other fingers curled."""
if not _thumb_down(lm):
return None
if not _four_fingers_curled(lm):
return None
drop = lm[_THUMB_TIP].y - lm[_WRIST].y
palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
conf = min(0.95, 0.6 + 0.35 * min(1.0, drop / palm_h))
cx, cy = _palm_center(lm)
return ClassifiedGesture(
gesture_type="thumbs_down", confidence=round(conf, 3),
hand_x=cx, hand_y=cy
)
def _classify_point(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
"""Index finger extended, middle/ring/pinky curled."""
if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP):
return None
others_up = sum([
_finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP),
_finger_up(lm, _RING_TIP, _RING_PIP),
_finger_up(lm, _PINKY_TIP, _PINKY_PIP),
])
if others_up >= 1:
return None
# Strongly horizontal pointing defers to "follow" gesture
if abs(lm[_INDEX_TIP].x - lm[_INDEX_MCP].x) > 0.20:
return None
ext = _finger_up_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP)
conf = 0.65 + 0.30 * ext - 0.10 * others_up
direction = _point_direction(lm)
return ClassifiedGesture(
gesture_type="point", confidence=round(conf, 3),
hand_x=lm[_INDEX_TIP].x, hand_y=lm[_INDEX_TIP].y,
direction=direction,
)
def _classify_come_here(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
"""Beckoning pose: index MCP raised but index TIP curled toward palm."""
# MCP above wrist (hand raised), TIP clearly below MCP (finger curled down)
mcp_raised = lm[_INDEX_MCP].y < lm[_WRIST].y - 0.02
tip_curled = lm[_INDEX_TIP].y > lm[_INDEX_MCP].y + 0.03
if not (mcp_raised and tip_curled):
return None
# Middle, ring, pinky should be curled too (not fully open)
n_others_up = sum([
_finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP),
_finger_up(lm, _RING_TIP, _RING_PIP),
_finger_up(lm, _PINKY_TIP, _PINKY_PIP),
])
if n_others_up >= 2:
return None
curl_depth = lm[_INDEX_TIP].y - lm[_INDEX_PIP].y
palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
conf = min(0.90, 0.60 + 0.30 * min(1.0, curl_depth / (palm_h * 0.5)))
cx, cy = _palm_center(lm)
return ClassifiedGesture(
gesture_type="come_here", confidence=round(conf, 3),
hand_x=cx, hand_y=cy
)
def _classify_follow(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
"""Follow gesture: index extended horizontally, others curled."""
if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP):
return None
others_up = sum([
_finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP),
_finger_up(lm, _RING_TIP, _RING_PIP),
_finger_up(lm, _PINKY_TIP, _PINKY_PIP),
])
if others_up >= 2:
return None
# Horizontal: |dx| > |dy| between index MCP and TIP
dx = abs(lm[_INDEX_TIP].x - lm[_INDEX_MCP].x)
dy = abs(lm[_INDEX_TIP].y - lm[_INDEX_MCP].y)
if dx <= dy:
return None
conf = 0.65 + 0.25 * min(1.0, dx / max(dy, 0.001) / 3.0)
direction = "right" if lm[_INDEX_TIP].x > lm[_INDEX_MCP].x else "left"
return ClassifiedGesture(
gesture_type="follow", confidence=round(conf, 3),
hand_x=lm[_INDEX_TIP].x, hand_y=lm[_INDEX_TIP].y,
direction=direction,
)
# ── Temporal wave detector ────────────────────────────────────────────────────
class WaveDetector:
"""Sliding-window wave gesture detector.
Tracks wrist X-coordinate history and detects lateral oscillation
(minimum 2 direction reversals above amplitude threshold).
Args:
history_len: Number of frames to keep (default 24 0.8 s at 30 fps)
min_reversals: Direction-reversal count required to trigger (default 2)
min_amplitude: Minimum peak-to-peak x excursion in normalised coords
"""
def __init__(
self,
history_len: int = 24,
min_reversals: int = 2,
min_amplitude: float = 0.08,
) -> None:
self._history: Deque[float] = deque(maxlen=history_len)
self._min_reversals = min_reversals
self._min_amplitude = min_amplitude
def push(self, wrist_x: float) -> Tuple[bool, float]:
"""Add a new wrist-X sample.
Returns:
(is_waving, confidence) confidence is 0.0 when not waving.
"""
self._history.append(wrist_x)
if len(self._history) < 6:
return False, 0.0
return self._detect()
def reset(self) -> None:
self._history.clear()
def _detect(self) -> Tuple[bool, float]:
samples = list(self._history)
mean_x = sum(samples) / len(samples)
centered = [x - mean_x for x in samples]
amplitude = max(centered) - min(centered)
if amplitude < self._min_amplitude:
return False, 0.0
# Count sign changes (direction reversals)
reversals = 0
for i in range(1, len(centered)):
if centered[i - 1] * centered[i] < 0:
reversals += 1
if reversals < self._min_reversals:
return False, 0.0
# Confidence: blend amplitude and reversal quality
amp_score = min(1.0, amplitude / 0.30)
rev_score = min(1.0, reversals / 6.0)
conf = round(0.5 * amp_score + 0.5 * rev_score, 3)
return True, conf
# ── Public hand-gesture API ───────────────────────────────────────────────────
# Priority order for static hand-gesture classifiers
_HAND_CLASSIFIERS = [
("stop_palm", lambda lm, rh: _classify_stop_palm(lm)),
("thumbs_up", lambda lm, rh: _classify_thumbs_up(lm, rh)),
("thumbs_down", lambda lm, rh: _classify_thumbs_down(lm)),
("point", lambda lm, rh: _classify_point(lm)),
("come_here", lambda lm, rh: _classify_come_here(lm)),
("follow", lambda lm, rh: _classify_follow(lm)),
]
def classify_hand(
landmarks: List[Landmark],
is_right_hand: bool = True,
wave_det: Optional[WaveDetector] = None,
enabled: Optional[set] = None,
) -> Optional[ClassifiedGesture]:
"""Classify a single hand's 21-landmark list into a named gesture.
Args:
landmarks: 21 Landmark objects from MediaPipe Hands.
is_right_hand: Handedness flag (affects thumb direction logic).
wave_det: Optional WaveDetector instance for wave tracking.
If provided, wave detection is attempted first.
enabled: Set of gesture type strings to allow; None = all.
Returns:
ClassifiedGesture or None if no gesture recognised.
"""
if len(landmarks) < 21:
return None
def _allowed(name: str) -> bool:
return enabled is None or name in enabled
# Wave check (temporal — must come before static classifiers)
if wave_det is not None and _allowed("wave"):
is_waving, wconf = wave_det.push(landmarks[_WRIST].x)
if is_waving:
cx, cy = _palm_center(landmarks)
return ClassifiedGesture(
gesture_type="wave", confidence=wconf,
hand_x=cx, hand_y=cy,
is_right_hand=is_right_hand,
)
# Static classifiers — return the first (highest-priority) match
for name, classifier in _HAND_CLASSIFIERS:
if not _allowed(name):
continue
result = classifier(landmarks, is_right_hand)
if result is not None:
result.is_right_hand = is_right_hand
return result
return None
# ── Body-pose gesture classifiers ────────────────────────────────────────────
def classify_pose(
landmarks: List[Landmark],
enabled: Optional[set] = None,
) -> Optional[ClassifiedGesture]:
"""Classify full-body pose landmarks (33 points) into a body gesture.
Args:
landmarks: 33 Landmark objects from MediaPipe Pose.
enabled: Set of allowed gesture types; None = all.
Returns:
ClassifiedGesture or None.
"""
if len(landmarks) < 29: # need up to ankle (28)
return None
def _ok(name: str) -> bool:
return enabled is None or name in enabled
# ── arms_up: both wrists raised above their shoulders ─────────────────
if _ok("arms_up"):
l_above = landmarks[_L_WRIST].y < landmarks[_L_SHOULDER].y - 0.05
r_above = landmarks[_R_WRIST].y < landmarks[_R_SHOULDER].y - 0.05
if l_above and r_above:
l_margin = landmarks[_L_SHOULDER].y - landmarks[_L_WRIST].y
r_margin = landmarks[_R_SHOULDER].y - landmarks[_R_WRIST].y
shoulder_span = abs(landmarks[_L_SHOULDER].y - landmarks[_R_HIP].y) or 0.1
conf = min(0.95, 0.60 + 0.35 * min(1.0, (l_margin + r_margin) / (2 * shoulder_span)))
mid_x = (landmarks[_L_WRIST].x + landmarks[_R_WRIST].x) / 2
mid_y = (landmarks[_L_WRIST].y + landmarks[_R_WRIST].y) / 2
return ClassifiedGesture(
gesture_type="arms_up", confidence=round(conf, 3),
hand_x=mid_x, hand_y=mid_y, source="body_pose",
)
# ── arms_spread: both arms extended laterally ──────────────────────────
if _ok("arms_spread"):
l_span = abs(landmarks[_L_WRIST].x - landmarks[_L_SHOULDER].x)
r_span = abs(landmarks[_R_WRIST].x - landmarks[_R_SHOULDER].x)
# Wrists should be at roughly shoulder height
l_level = abs(landmarks[_L_WRIST].y - landmarks[_L_SHOULDER].y) < 0.12
r_level = abs(landmarks[_R_WRIST].y - landmarks[_R_SHOULDER].y) < 0.12
shoulder_w = abs(landmarks[_L_SHOULDER].x - landmarks[_R_SHOULDER].x) or 0.1
if l_level and r_level and l_span > shoulder_w * 0.6 and r_span > shoulder_w * 0.6:
conf = min(0.90, 0.55 + 0.35 * min(1.0, (l_span + r_span) / (shoulder_w * 2)))
mid_x = (landmarks[_L_WRIST].x + landmarks[_R_WRIST].x) / 2
mid_y = (landmarks[_L_SHOULDER].y + landmarks[_R_SHOULDER].y) / 2
return ClassifiedGesture(
gesture_type="arms_spread", confidence=round(conf, 3),
hand_x=mid_x, hand_y=mid_y, source="body_pose",
)
# ── crouch: hips dropped below normal standing threshold ──────────────
if _ok("crouch"):
l_hip_y = landmarks[_L_HIP].y
r_hip_y = landmarks[_R_HIP].y
avg_hip_y = (l_hip_y + r_hip_y) / 2
# In a crouching person the hips appear lower in frame (larger y).
# We use 0.65 as threshold (hips in the lower 65% of frame height).
if avg_hip_y > 0.65:
# Additional check: knees should be significantly bent
l_knee_y = landmarks[_L_KNEE].y if len(landmarks) > _L_KNEE else avg_hip_y
r_knee_y = landmarks[_R_KNEE].y if len(landmarks) > _R_KNEE else avg_hip_y
knee_drop = ((l_knee_y - l_hip_y) + (r_knee_y - r_hip_y)) / 2
conf = min(0.90, 0.55 + 0.35 * min(1.0, (avg_hip_y - 0.65) / 0.20))
mid_x = (landmarks[_L_HIP].x + landmarks[_R_HIP].x) / 2
return ClassifiedGesture(
gesture_type="crouch", confidence=round(conf, 3),
hand_x=mid_x, hand_y=avg_hip_y, source="body_pose",
)
return None

View File

@ -0,0 +1,432 @@
"""gesture_node.py — Multi-camera hand + body-pose gesture recognition (Issue #140).
Subscribes to images from up to 4 CSI cameras, runs MediaPipe Hands and Pose
on each frame, classifies gestures via gesture_classifier.py, and publishes
/social/gestures (GestureArray).
Pipeline
--------
camera/*/image_raw cv_bridge MediaPipe Hands + Pose
gesture_classifier correlate with PersonStateArray
publish /social/gestures
Multi-camera
------------
Up to n_cameras (default 4) camera streams are processed.
Topic names follow the saltybot_cameras convention:
/camera/front/image_raw, /camera/left/image_raw,
/camera/rear/image_raw, /camera/right/image_raw
All cameras share a single round-robin processing queue so that GPU/CPU
resources are not over-subscribed. At process_fps=10 Hz, each camera
receives approximately process_fps / n_cameras updates per second.
Person-ID correlation
---------------------
When a gesture is detected on camera N, the node inspects the cached
PersonStateArray. A person with matching camera_id is preferred; if only
one person is tracked, they receive the gesture unconditionally.
ROS2 topics
-----------
Subscribe:
/camera/front/image_raw (sensor_msgs/Image)
/camera/left/image_raw (sensor_msgs/Image)
/camera/rear/image_raw (sensor_msgs/Image)
/camera/right/image_raw (sensor_msgs/Image)
/social/persons (saltybot_social_msgs/PersonStateArray)
Publish:
/social/gestures (saltybot_social_msgs/GestureArray)
Parameters
----------
n_cameras int 4 Number of active camera streams
process_fps float 10.0 Target processing rate (frames/sec)
min_confidence float 0.60 Discard detections below this threshold
model_complexity int 0 MediaPipe model (0=lite, 1=full, 2=heavy)
max_hands int 2 Max hands per frame (MediaPipe param)
pose_enabled bool true Enable MediaPipe Pose body-pose detection
hands_enabled bool true Enable MediaPipe Hands
enabled_gestures str "" Comma-separated allowlist; empty = all
camera_names str "front,left,rear,right" Camera name list
"""
from __future__ import annotations
import threading
import time
from typing import Dict, List, Optional, Set
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from sensor_msgs.msg import Image
from saltybot_social_msgs.msg import (
GestureArray, Gesture, PersonStateArray,
)
from .gesture_classifier import (
Landmark, ClassifiedGesture,
classify_hand, classify_pose,
WaveDetector,
)
# Optional imports — may not be present in CI / test environments
try:
import cv2
from cv_bridge import CvBridge
_HAS_CV = True
except ImportError:
_HAS_CV = False
try:
import mediapipe as mp
_HAS_MP = True
except ImportError:
_HAS_MP = False
# ── Camera index map ──────────────────────────────────────────────────────────
_CAMERA_NAMES = ["front", "left", "rear", "right"]
_CAMERA_ID: Dict[str, int] = {name: i for i, name in enumerate(_CAMERA_NAMES)}
# ── Latest-frame buffer ───────────────────────────────────────────────────────
class _FrameBuffer:
"""Thread-safe single-slot buffer per camera — always keeps newest frame."""
def __init__(self) -> None:
self._frames: Dict[int, object] = {} # camera_id → Image msg
self._lock = threading.Lock()
def put(self, camera_id: int, msg: object) -> None:
with self._lock:
self._frames[camera_id] = msg
def drain(self) -> List[tuple]:
with self._lock:
result = list(self._frames.items())
self._frames = {}
return result
# ── MediaPipe wrapper ─────────────────────────────────────────────────────────
class _MediaPipeProcessor:
"""Wraps MediaPipe Hands + Pose with lazy initialisation."""
def __init__(
self,
model_complexity: int,
max_hands: int,
hands_enabled: bool,
pose_enabled: bool,
) -> None:
self._complexity = model_complexity
self._max_hands = max_hands
self._hands_enabled = hands_enabled
self._pose_enabled = pose_enabled
self._hands = None
self._pose = None
self._ready = False
def init(self) -> None:
if not _HAS_MP:
return
mp_hands = mp.solutions.hands
mp_pose = mp.solutions.pose
if self._hands_enabled:
self._hands = mp_hands.Hands(
static_image_mode=False,
max_num_hands=self._max_hands,
min_detection_confidence=0.50,
min_tracking_confidence=0.50,
model_complexity=self._complexity,
)
if self._pose_enabled:
self._pose = mp_pose.Pose(
static_image_mode=False,
min_detection_confidence=0.50,
min_tracking_confidence=0.50,
model_complexity=self._complexity,
)
self._ready = True
def process(self, bgr_image) -> tuple:
"""Returns (hands_results, pose_results) from MediaPipe."""
if not self._ready or not _HAS_MP:
return None, None
try:
import numpy as np
rgb = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)
rgb.flags.writeable = False
h_res = self._hands.process(rgb) if self._hands else None
p_res = self._pose.process(rgb) if self._pose else None
return h_res, p_res
except Exception:
return None, None
def close(self) -> None:
if self._hands:
self._hands.close()
if self._pose:
self._pose.close()
# ── Gesture node ──────────────────────────────────────────────────────────────
class GestureNode(Node):
"""Multi-camera gesture recognition node."""
def __init__(self) -> None:
super().__init__("gesture_node")
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter("n_cameras", 4)
self.declare_parameter("process_fps", 10.0)
self.declare_parameter("min_confidence", 0.60)
self.declare_parameter("model_complexity", 0)
self.declare_parameter("max_hands", 2)
self.declare_parameter("pose_enabled", True)
self.declare_parameter("hands_enabled", True)
self.declare_parameter("enabled_gestures", "")
self.declare_parameter("camera_names", "front,left,rear,right")
self._n_cameras: int = self.get_parameter("n_cameras").value
self._process_fps: float = self.get_parameter("process_fps").value
self._min_conf: float = self.get_parameter("min_confidence").value
self._complexity: int = self.get_parameter("model_complexity").value
self._max_hands: int = self.get_parameter("max_hands").value
self._pose_enabled: bool = self.get_parameter("pose_enabled").value
self._hands_enabled: bool = self.get_parameter("hands_enabled").value
_enabled_str: str = self.get_parameter("enabled_gestures").value
camera_names_str: str = self.get_parameter("camera_names").value
self._enabled: Optional[Set[str]] = (
set(g.strip() for g in _enabled_str.split(",") if g.strip())
if _enabled_str.strip() else None
)
self._camera_names: List[str] = [
n.strip() for n in camera_names_str.split(",")
if n.strip()
][:self._n_cameras]
# ── QoS ─────────────────────────────────────────────────────────────
img_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
)
reliable_qos = QoSProfile(depth=10)
# ── Publishers ───────────────────────────────────────────────────────
self._gesture_pub = self.create_publisher(
GestureArray, "/social/gestures", reliable_qos
)
# ── Subscribers ──────────────────────────────────────────────────────
self._buf = _FrameBuffer()
self._cam_subs = []
for name in self._camera_names:
topic = f"/camera/{name}/image_raw"
cam_id = _CAMERA_ID.get(name, len(self._cam_subs))
sub = self.create_subscription(
Image, topic,
lambda msg, cid=cam_id: self._buf.put(cid, msg),
img_qos,
)
self._cam_subs.append(sub)
self.get_logger().info(f"Subscribed to {topic} (camera_id={cam_id})")
self._persons: Optional[PersonStateArray] = None
self._persons_sub = self.create_subscription(
PersonStateArray, "/social/persons",
lambda msg: setattr(self, "_persons", msg),
reliable_qos,
)
# ── Per-camera WaveDetector instances: (camera_id, hand_idx) → det ──
self._wave_detectors: Dict[tuple, WaveDetector] = {}
# ── MediaPipe (initialised in background thread) ─────────────────────
self._mp = _MediaPipeProcessor(
model_complexity=self._complexity,
max_hands=self._max_hands,
hands_enabled=self._hands_enabled,
pose_enabled=self._pose_enabled,
)
self._bridge = CvBridge() if _HAS_CV else None
self._mp_ready = False
self._init_thread = threading.Thread(
target=self._init_mediapipe, daemon=True
)
self._init_thread.start()
# ── Processing timer ─────────────────────────────────────────────────
self._timer = self.create_timer(
1.0 / self._process_fps, self._process_tick
)
self.get_logger().info(
f"gesture_node starting — cameras={self._camera_names}, "
f"fps={self._process_fps}, min_conf={self._min_conf}, "
f"complexity={self._complexity}"
)
# ── Initialisation ────────────────────────────────────────────────────────
def _init_mediapipe(self) -> None:
if not _HAS_MP:
self.get_logger().warn(
"mediapipe not installed — gesture_node will publish no gestures. "
"Install with: pip install mediapipe"
)
return
if not _HAS_CV:
self.get_logger().error("opencv-python / cv_bridge not available")
return
self.get_logger().info("Initialising MediaPipe Hands + Pose…")
t0 = time.time()
self._mp.init()
self._mp_ready = True
self.get_logger().info(
f"MediaPipe ready ({time.time() - t0:.1f}s) — "
f"hands={'on' if self._hands_enabled else 'off'}, "
f"pose={'on' if self._pose_enabled else 'off'}"
)
# ── Processing tick ───────────────────────────────────────────────────────
def _process_tick(self) -> None:
if not self._mp_ready:
return
frames = self._buf.drain()
if not frames:
return
all_gestures: List[Gesture] = []
for cam_id, img_msg in frames:
gestures = self._process_frame(cam_id, img_msg)
all_gestures.extend(gestures)
if all_gestures:
arr = GestureArray()
arr.header.stamp = self.get_clock().now().to_msg()
arr.gestures = all_gestures
self._gesture_pub.publish(arr)
def _process_frame(self, cam_id: int, img_msg) -> List[Gesture]:
"""Run MediaPipe on one camera frame and return gesture messages."""
if self._bridge is None:
return []
try:
bgr = self._bridge.imgmsg_to_cv2(img_msg, desired_encoding="bgr8")
except Exception as e:
self.get_logger().debug(f"cv_bridge error cam {cam_id}: {e}")
return []
h_results, p_results = self._mp.process(bgr)
gestures: List[Gesture] = []
# ── Hand gestures ─────────────────────────────────────────────────
if h_results and h_results.multi_hand_landmarks:
for hand_idx, (hand_lm, hand_info) in enumerate(
zip(h_results.multi_hand_landmarks,
h_results.multi_handedness)
):
is_right = hand_info.classification[0].label == "Right"
lm = [
Landmark(p.x, p.y, p.z)
for p in hand_lm.landmark
]
wave_key = (cam_id, hand_idx)
wave_det = self._wave_detectors.get(wave_key)
if wave_det is None:
wave_det = WaveDetector()
self._wave_detectors[wave_key] = wave_det
result = classify_hand(
lm, is_right_hand=is_right,
wave_det=wave_det,
enabled=self._enabled,
)
if result and result.confidence >= self._min_conf:
gestures.append(
self._to_msg(result, cam_id)
)
# ── Body pose gestures ────────────────────────────────────────────
if p_results and p_results.pose_landmarks:
lm = [
Landmark(p.x, p.y, p.z, p.visibility)
for p in p_results.pose_landmarks.landmark
]
result = classify_pose(lm, enabled=self._enabled)
if result and result.confidence >= self._min_conf:
gestures.append(self._to_msg(result, cam_id))
return gestures
# ── Person correlation ────────────────────────────────────────────────────
def _resolve_person_id(self, cam_id: int, hand_x: float) -> int:
"""Find person_id for a gesture on camera cam_id. Returns -1 if unknown."""
persons = self._persons
if persons is None or not persons.persons:
return -1
# Filter persons seen on this camera
on_cam = [p for p in persons.persons if p.camera_id == cam_id]
if len(on_cam) == 1:
return on_cam[0].person_id
if len(on_cam) > 1:
# Pick closest by horizontal image position (heuristic)
return min(on_cam, key=lambda p: abs(p.bearing_deg / 90.0 - hand_x)).person_id
# Fall back to any tracked person
if persons.persons:
return persons.persons[0].person_id
return -1
# ── Message construction ──────────────────────────────────────────────────
def _to_msg(self, g: ClassifiedGesture, cam_id: int) -> Gesture:
msg = Gesture()
msg.header.stamp = self.get_clock().now().to_msg()
msg.gesture_type = g.gesture_type
msg.confidence = float(g.confidence)
msg.camera_id = cam_id
msg.hand_x = float(g.hand_x)
msg.hand_y = float(g.hand_y)
msg.is_right_hand = g.is_right_hand
msg.direction = g.direction
msg.source = g.source
msg.person_id = self._resolve_person_id(cam_id, g.hand_x)
return msg
# ── Cleanup ───────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
self._mp.close()
super().destroy_node()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = GestureNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -35,6 +35,8 @@ setup(
'orchestrator_node = saltybot_social.orchestrator_node:main', 'orchestrator_node = saltybot_social.orchestrator_node:main',
# Voice command NLU bridge (Issue #137) # Voice command NLU bridge (Issue #137)
'voice_command_node = saltybot_social.voice_command_node:main', 'voice_command_node = saltybot_social.voice_command_node:main',
# Multi-camera gesture recognition (Issue #140)
'gesture_node = saltybot_social.gesture_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,640 @@
"""test_gesture_classifier.py — Unit tests for gesture_classifier (Issue #140).
Tests cover:
- All 7 hand gestures with multiple landmark configurations
- All 3 body-pose gestures
- WaveDetector temporal detection
- Entity extraction (direction for point/follow)
- enabled allowlist filtering
- Edge cases: too few landmarks, marginal values, near-misses
No ROS2 or MediaPipe runtime required.
"""
import pytest
from saltybot_social.gesture_classifier import (
Landmark,
ClassifiedGesture,
WaveDetector,
classify_hand,
classify_pose,
_finger_up,
_thumb_up,
_thumb_down,
_four_fingers_curled,
_count_fingers_up,
_point_direction,
)
# ── Landmark builder helpers ──────────────────────────────────────────────────
def _lm(x: float, y: float, z: float = 0.0) -> Landmark:
return Landmark(x=x, y=y, z=z)
def _make_hand_landmarks(
fingers_up: list = None, # True/False per [index, middle, ring, pinky]
thumb_up: bool = False,
thumb_down: bool = False,
wrist_y: float = 0.8,
palm_height: float = 0.2,
) -> list:
"""
Build a synthetic 21-landmark hand with controllable finger states.
fingers_up: [index, middle, ring, pinky] True = finger extended
thumb_up: thumb tip is above wrist
thumb_down: thumb tip is below wrist
wrist_y: y-coordinate of the wrist (image-space, 0=top, 1=bottom)
palm_height: vertical extent of the palm
"""
if fingers_up is None:
fingers_up = [False, False, False, False]
lm = [None] * 21
mcp_y = wrist_y - palm_height * 0.5 # MCP knuckles above wrist
pip_y = mcp_y - palm_height * 0.2 # PIP above MCP
tip_y_up = pip_y - palm_height * 0.3 # TIP above PIP (extended)
tip_y_dn = pip_y + palm_height * 0.1 # TIP below PIP (curled)
# Wrist
lm[0] = _lm(0.5, wrist_y)
# Thumb (CMC → MCP → IP → TIP)
lm[1] = _lm(0.45, wrist_y - palm_height * 0.05) # CMC
lm[2] = _lm(0.42, wrist_y - palm_height * 0.15) # MCP
lm[3] = _lm(0.38, wrist_y - palm_height * 0.25) # IP
if thumb_up:
lm[4] = _lm(0.35, wrist_y - palm_height * 0.50) # TIP high up
elif thumb_down:
lm[4] = _lm(0.48, wrist_y + 0.06) # TIP below wrist
else:
lm[4] = _lm(0.40, wrist_y - palm_height * 0.10) # tucked
# Finger MCP, PIP, DIP, TIP — four fingers evenly spaced across x
finger_x = [0.45, 0.50, 0.55, 0.60]
finger_offsets = [ # (mcp, pip, dip, tip) indices
(5, 6, 7, 8),
(9, 10, 11, 12),
(13, 14, 15, 16),
(17, 18, 19, 20),
]
for fi, (mcp_i, pip_i, dip_i, tip_i) in enumerate(finger_offsets):
x = finger_x[fi]
lm[mcp_i] = _lm(x, mcp_y)
lm[pip_i] = _lm(x, pip_y)
lm[dip_i] = _lm(x, (pip_y + tip_y_up) / 2 if fingers_up[fi] else pip_y - 0.01)
lm[tip_i] = _lm(x, tip_y_up if fingers_up[fi] else tip_y_dn)
return lm
def _make_pose_landmarks(
arms_up: bool = False,
arms_spread: bool = False,
crouching: bool = False,
shoulder_y: float = 0.30,
) -> list:
"""Build synthetic 33-landmark body pose."""
lm = [_lm(0.5, 0.5)] * 33
hip_y = 0.55 if not crouching else 0.75
knee_y = hip_y + 0.15
# Shoulders
lm[11] = _lm(0.35, shoulder_y) # L shoulder
lm[12] = _lm(0.65, shoulder_y) # R shoulder
if arms_up:
lm[15] = _lm(0.30, shoulder_y - 0.20) # L wrist above L shoulder
lm[16] = _lm(0.70, shoulder_y - 0.20) # R wrist above R shoulder
elif arms_spread:
lm[13] = _lm(0.20, shoulder_y + 0.05) # L elbow out
lm[14] = _lm(0.80, shoulder_y + 0.05) # R elbow out
lm[15] = _lm(0.10, shoulder_y + 0.02) # L wrist far left
lm[16] = _lm(0.90, shoulder_y + 0.02) # R wrist far right
else:
lm[15] = _lm(0.35, shoulder_y + 0.20) # L wrist at side
lm[16] = _lm(0.65, shoulder_y + 0.20) # R wrist at side
lm[23] = _lm(0.40, hip_y) # L hip
lm[24] = _lm(0.60, hip_y) # R hip
lm[25] = _lm(0.40, knee_y) # L knee
lm[26] = _lm(0.60, knee_y) # R knee
lm[27] = _lm(0.40, knee_y + 0.15) # L ankle
lm[28] = _lm(0.60, knee_y + 0.15) # R ankle
return lm
# ── Helper tests ──────────────────────────────────────────────────────────────
class TestFingerHelpers:
def test_finger_up_when_tip_above_pip(self):
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
assert _finger_up(lm, 8, 6) is True # index tip above pip
def test_finger_down_when_tip_below_pip(self):
lm = _make_hand_landmarks(fingers_up=[False, False, False, False])
assert _finger_up(lm, 8, 6) is False
def test_count_fingers_all_up(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
assert _count_fingers_up(lm) == 4
def test_count_fingers_none_up(self):
lm = _make_hand_landmarks(fingers_up=[False, False, False, False])
assert _count_fingers_up(lm) == 0
def test_count_fingers_two_up(self):
lm = _make_hand_landmarks(fingers_up=[True, True, False, False])
assert _count_fingers_up(lm) == 2
def test_four_fingers_curled_all_down(self):
lm = _make_hand_landmarks(fingers_up=[False, False, False, False])
assert _four_fingers_curled(lm) is True
def test_four_fingers_not_curled_one_up(self):
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
assert _four_fingers_curled(lm) is False
def test_thumb_up_when_tip_high(self):
lm = _make_hand_landmarks(thumb_up=True)
assert _thumb_up(lm, True) is True
def test_thumb_not_up_when_tucked(self):
lm = _make_hand_landmarks(thumb_up=False)
assert _thumb_up(lm, True) is False
def test_thumb_down_when_tip_below_wrist(self):
lm = _make_hand_landmarks(thumb_down=True)
assert _thumb_down(lm) is True
def test_thumb_not_down_when_tucked(self):
lm = _make_hand_landmarks(thumb_up=False, thumb_down=False)
assert _thumb_down(lm) is False
# ── Hand gesture: stop_palm ───────────────────────────────────────────────────
class TestStopPalm:
def test_all_fingers_up(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "stop_palm"
def test_confidence_range(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm)
assert 0.60 <= r.confidence <= 1.0
def test_three_fingers_does_not_trigger(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, False])
r = classify_hand(lm)
assert r is None or r.gesture_type != "stop_palm"
def test_source_is_hand(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm)
assert r.source == "hand"
# ── Hand gesture: thumbs_up ───────────────────────────────────────────────────
class TestThumbsUp:
def test_basic(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False], thumb_up=True
)
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "thumbs_up"
def test_confidence_range(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False], thumb_up=True
)
r = classify_hand(lm)
assert 0.60 <= r.confidence <= 1.0
def test_thumb_up_with_fingers_open_not_thumbs_up(self):
# If fingers are also open → stop_palm wins in priority
lm = _make_hand_landmarks(
fingers_up=[True, True, True, True], thumb_up=True
)
r = classify_hand(lm)
assert r.gesture_type == "stop_palm"
def test_right_hand_flag(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False], thumb_up=True
)
r = classify_hand(lm, is_right_hand=True)
assert r.is_right_hand is True
def test_left_hand_flag(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False], thumb_up=True
)
r = classify_hand(lm, is_right_hand=False)
assert r.is_right_hand is False
# ── Hand gesture: thumbs_down ─────────────────────────────────────────────────
class TestThumbsDown:
def test_basic(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False], thumb_down=True
)
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "thumbs_down"
def test_confidence_range(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False], thumb_down=True
)
r = classify_hand(lm)
assert 0.60 <= r.confidence <= 1.0
def test_fingers_open_prevents_thumbs_down(self):
lm = _make_hand_landmarks(
fingers_up=[True, True, True, True], thumb_down=True
)
r = classify_hand(lm)
# stop_palm takes priority
assert r is None or r.gesture_type != "thumbs_down"
# ── Hand gesture: point ───────────────────────────────────────────────────────
class TestPoint:
def test_index_only_up(self):
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "point"
def test_direction_right(self):
"""Index tip to the right of MCP → direction='right'."""
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
# Override index tip to point right (TIP above PIP, dx=0.15 ≤ 0.20 threshold)
lm[8] = _lm(lm[5].x + 0.15, lm[6].y - 0.02)
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "point"
assert r.direction == "right"
def test_direction_up(self):
"""Index tip significantly above MCP → direction='up'."""
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
lm[8] = _lm(lm[5].x, lm[5].y - 0.20) # far above MCP
r = classify_hand(lm)
assert r is not None
assert r.direction == "up"
def test_two_fingers_up_not_point(self):
lm = _make_hand_landmarks(fingers_up=[True, True, False, False])
r = classify_hand(lm)
# With 2 fingers up, point classifier should reject
assert r is None or r.gesture_type != "point"
def test_confidence_range(self):
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
r = classify_hand(lm)
assert 0.60 <= r.confidence <= 1.0
# ── Hand gesture: come_here ───────────────────────────────────────────────────
class TestComeHere:
def _make_beckoning_hand(self) -> list:
"""Index MCP raised but TIP curled below PIP."""
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False],
wrist_y=0.85,
palm_height=0.25,
)
# Index MCP should be above wrist
assert lm[5].y < lm[0].y # sanity: MCP.y < WRIST.y (above in image)
# Force TIP to be BELOW MCP (clearly curled — satisfies MCP+0.03 threshold)
lm[8] = _lm(lm[5].x, lm[5].y + 0.06) # tip 0.06 below MCP
return lm
def test_beckoning_detected(self):
lm = self._make_beckoning_hand()
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "come_here"
def test_confidence_range(self):
lm = self._make_beckoning_hand()
r = classify_hand(lm)
assert 0.60 <= r.confidence <= 1.0
def test_open_hand_not_come_here(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm)
assert r is None or r.gesture_type != "come_here"
# ── Hand gesture: follow ──────────────────────────────────────────────────────
class TestFollow:
def _make_follow_hand(self, direction: str = "right") -> list:
"""Index pointing horizontally, others curled."""
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
# Move index TIP far to the right/left of MCP (dx=0.25 > 0.20 threshold → follow)
# TIP above PIP so _finger_up returns True; dy=0.06 keeps dx >> dy
if direction == "right":
lm[8] = _lm(lm[5].x + 0.25, lm[6].y - 0.02)
else:
lm[8] = _lm(lm[5].x - 0.25, lm[6].y - 0.02)
return lm
def test_follow_right(self):
lm = self._make_follow_hand("right")
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "follow"
assert r.direction == "right"
def test_follow_left(self):
lm = self._make_follow_hand("left")
r = classify_hand(lm)
assert r is not None
assert r.gesture_type == "follow"
assert r.direction == "left"
def test_vertical_point_not_follow(self):
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
# TIP straight up — dy >> dx
lm[8] = _lm(lm[5].x, lm[5].y - 0.30)
r = classify_hand(lm)
# Should be "point", not "follow"
assert r is None or r.gesture_type == "point"
def test_confidence_range(self):
lm = self._make_follow_hand()
r = classify_hand(lm)
assert r is not None
assert 0.60 <= r.confidence <= 1.0
# ── WaveDetector ──────────────────────────────────────────────────────────────
class TestWaveDetector:
def test_not_waving_insufficient_data(self):
det = WaveDetector()
for x in [0.4, 0.5]:
waving, conf = det.push(x)
assert waving is False
assert conf == 0.0
def test_wave_oscillation_detected(self):
det = WaveDetector(history_len=20, min_reversals=2, min_amplitude=0.06)
xs = [0.3, 0.5, 0.3, 0.5, 0.3, 0.5, 0.3, 0.5, 0.3, 0.5]
for x in xs:
waving, conf = det.push(x)
assert waving is True
assert conf > 0.0
def test_small_amplitude_not_wave(self):
det = WaveDetector(min_amplitude=0.10)
# Tiny oscillation — amplitude < threshold
xs = [0.50, 0.51, 0.50, 0.51, 0.50, 0.51, 0.50, 0.51, 0.50, 0.51]
for x in xs:
waving, conf = det.push(x)
assert waving is False
def test_confidence_increases_with_larger_amplitude(self):
det_small = WaveDetector()
det_large = WaveDetector()
for x in [0.40, 0.60, 0.40, 0.60, 0.40, 0.60, 0.40, 0.60]:
_, c_small = det_small.push(x)
for x in [0.20, 0.80, 0.20, 0.80, 0.20, 0.80, 0.20, 0.80]:
_, c_large = det_large.push(x)
assert c_large >= c_small
def test_reset_clears_history(self):
det = WaveDetector()
for x in [0.3, 0.7, 0.3, 0.7, 0.3, 0.7, 0.3, 0.7, 0.3, 0.7]:
det.push(x)
det.reset()
waving, conf = det.push(0.5)
assert waving is False
def test_wave_via_classify_hand(self):
"""Wave detector triggered through classify_hand API."""
det = WaveDetector(min_reversals=2, min_amplitude=0.06)
xs = [0.3, 0.6, 0.3, 0.6, 0.3, 0.6, 0.3, 0.6, 0.3, 0.6]
result = None
for x in xs:
lm = _make_hand_landmarks()
lm[0] = _lm(x, lm[0].y) # move wrist X
result = classify_hand(lm, wave_det=det)
assert result is not None
assert result.gesture_type == "wave"
# ── Body-pose: arms_up ────────────────────────────────────────────────────────
class TestArmsUp:
def test_both_arms_raised(self):
lm = _make_pose_landmarks(arms_up=True)
r = classify_pose(lm)
assert r is not None
assert r.gesture_type == "arms_up"
def test_confidence_range(self):
lm = _make_pose_landmarks(arms_up=True)
r = classify_pose(lm)
assert 0.60 <= r.confidence <= 1.0
def test_arms_at_side_not_arms_up(self):
lm = _make_pose_landmarks(arms_up=False, arms_spread=False)
r = classify_pose(lm)
assert r is None or r.gesture_type != "arms_up"
def test_source_is_body_pose(self):
lm = _make_pose_landmarks(arms_up=True)
r = classify_pose(lm)
assert r.source == "body_pose"
# ── Body-pose: arms_spread ────────────────────────────────────────────────────
class TestArmsSpread:
def test_arms_extended_laterally(self):
lm = _make_pose_landmarks(arms_spread=True)
r = classify_pose(lm)
assert r is not None
assert r.gesture_type in ("arms_spread", "arms_up")
def test_confidence_range(self):
lm = _make_pose_landmarks(arms_spread=True)
r = classify_pose(lm)
if r and r.gesture_type == "arms_spread":
assert 0.55 <= r.confidence <= 1.0
def test_arms_at_rest_not_spread(self):
lm = _make_pose_landmarks()
r = classify_pose(lm)
assert r is None or r.gesture_type not in ("arms_spread", "arms_up")
# ── Body-pose: crouch ─────────────────────────────────────────────────────────
class TestCrouch:
def test_crouching_detected(self):
lm = _make_pose_landmarks(crouching=True)
r = classify_pose(lm)
assert r is not None
assert r.gesture_type == "crouch"
def test_standing_not_crouch(self):
lm = _make_pose_landmarks(crouching=False)
r = classify_pose(lm)
assert r is None or r.gesture_type != "crouch"
def test_confidence_range(self):
lm = _make_pose_landmarks(crouching=True)
r = classify_pose(lm)
assert 0.55 <= r.confidence <= 1.0
def test_source_is_body_pose(self):
lm = _make_pose_landmarks(crouching=True)
r = classify_pose(lm)
assert r.source == "body_pose"
# ── Enabled allowlist filtering ───────────────────────────────────────────────
class TestEnabledFilter:
def test_blocked_gesture_returns_none(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm, enabled={"thumbs_up"})
assert r is None
def test_allowed_gesture_passes(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm, enabled={"stop_palm"})
assert r is not None
assert r.gesture_type == "stop_palm"
def test_none_enabled_means_all(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm, enabled=None)
assert r is not None
def test_pose_filter_arms_up_blocked(self):
lm = _make_pose_landmarks(arms_up=True)
r = classify_pose(lm, enabled={"crouch"})
assert r is None
def test_pose_filter_arms_up_allowed(self):
lm = _make_pose_landmarks(arms_up=True)
r = classify_pose(lm, enabled={"arms_up"})
assert r is not None
assert r.gesture_type == "arms_up"
# ── Edge cases ────────────────────────────────────────────────────────────────
class TestEdgeCases:
def test_empty_landmarks_returns_none(self):
assert classify_hand([]) is None
def test_too_few_landmarks_returns_none(self):
lm = [_lm(0.5, 0.5)] * 10 # only 10, need 21
assert classify_hand(lm) is None
def test_too_few_pose_landmarks_returns_none(self):
lm = [_lm(0.5, 0.5)] * 10
assert classify_pose(lm) is None
def test_no_wave_detector_no_wave(self):
"""Without a WaveDetector, wave is never returned."""
lm = _make_hand_landmarks()
r = classify_hand(lm, wave_det=None)
assert r is None or r.gesture_type != "wave"
def test_all_fingers_down_thumb_neutral_returns_none(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False],
thumb_up=False, thumb_down=False,
)
r = classify_hand(lm)
assert r is None
def test_hand_x_y_populated(self):
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
r = classify_hand(lm)
assert r is not None
assert 0.0 <= r.hand_x <= 1.0
assert 0.0 <= r.hand_y <= 1.0
def test_is_right_hand_default_true(self):
lm = _make_hand_landmarks(
fingers_up=[False, False, False, False], thumb_up=True
)
r = classify_hand(lm)
assert r.is_right_hand is True
def test_wave_confidence_zero_for_no_wave(self):
det = WaveDetector()
# Stationary wrist — no wave
for _ in range(24):
waving, conf = det.push(0.5)
assert waving is False
assert conf == 0.0
# ── Point direction edge cases ────────────────────────────────────────────────
class TestPointDirection:
def _hand_pointing(self, dx: float, dy: float) -> list:
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
# Set index MCP and TIP to produce the desired vector
lm[5] = _lm(0.5, 0.5)
lm[8] = _lm(0.5 + dx, 0.5 + dy)
return lm
def test_pointing_right(self):
lm = self._hand_pointing(dx=0.2, dy=0.0)
assert _point_direction(lm) == "right"
def test_pointing_left(self):
lm = self._hand_pointing(dx=-0.2, dy=0.0)
assert _point_direction(lm) == "left"
def test_pointing_up(self):
lm = self._hand_pointing(dx=0.0, dy=-0.2)
assert _point_direction(lm) == "up"
def test_pointing_down(self):
lm = self._hand_pointing(dx=0.0, dy=0.2)
assert _point_direction(lm) == "down"
def test_pointing_upper_right(self):
lm = self._hand_pointing(dx=0.15, dy=-0.15)
assert _point_direction(lm) == "upper_right"

View File

@ -35,6 +35,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/ConversationResponse.msg" "msg/ConversationResponse.msg"
# Issue #137 voice command NLU # Issue #137 voice command NLU
"msg/VoiceCommand.msg" "msg/VoiceCommand.msg"
# Issue #140 gesture recognition
"msg/Gesture.msg"
"msg/GestureArray.msg"
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
) )

View File

@ -0,0 +1,31 @@
# Gesture.msg — Single detected gesture from hand or body-pose recognition (Issue #140).
# Published as part of GestureArray on /social/gestures.
#
# gesture_type values:
# Hand gestures (MediaPipe Hands):
# wave — lateral wrist oscillation (temporal)
# point — index extended, others curled
# stop_palm — all fingers extended, palm forward
# thumbs_up — thumb up, fist closed
# thumbs_down — thumb down, fist closed
# come_here — beckoning: index curled toward palm (temporal)
# follow — index extended horizontally
# Body-pose gestures (MediaPipe Pose):
# arms_up — both wrists above shoulders (stop / emergency)
# crouch — hips below normal standing threshold (come closer)
# arms_spread — arms extended laterally (spread out / stop)
std_msgs/Header header
string gesture_type # gesture identifier (see above)
int32 person_id # from PersonStateArray (-1 = unidentified)
float32 confidence # detection confidence [0.0, 1.0]
int32 camera_id # source camera (0=front, 1=left, 2=rear, 3=right)
# Normalised image position of the detected gesture anchor [0.0, 1.0]
float32 hand_x
float32 hand_y
bool is_right_hand # true = right hand; false = left hand (hand gestures only)
string direction # for "point": "left"/"right"/"up"/"forward"/"down"
string source # "hand" or "body_pose"

View File

@ -0,0 +1,6 @@
# GestureArray.msg — All gestures detected in a single processing tick (Issue #140).
# Published by gesture_node on /social/gestures.
std_msgs/Header header
Gesture[] gestures