feat(social): multi-camera gesture recognition — MediaPipe Hands + Pose (Issue #140)
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 9s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 2s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Some checks failed
social-bot integration tests / Lint (flake8 + pep257) (push) Failing after 9s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (push) Has been skipped
social-bot integration tests / Lint (flake8 + pep257) (pull_request) Failing after 2s
social-bot integration tests / Core integration tests (mock sensors, no GPU) (pull_request) Has been skipped
social-bot integration tests / Latency profiling (GPU, Orin) (push) Has been cancelled
social-bot integration tests / Latency profiling (GPU, Orin) (pull_request) Has been cancelled
Delivers Issue #140 (P2): real-time gesture detection from 4 CSI cameras via MediaPipe Hands and Pose, publishing classified gestures on /social/gestures. New files: - saltybot_social_msgs/msg/Gesture.msg + GestureArray.msg — ROS2 message types - saltybot_social/gesture_classifier.py — pure-Python geometric classifier (stop_palm, thumbs_up/down, point, come_here, follow, wave, arms_up, arms_spread, crouch); WaveDetector temporal sliding-window oscillation tracker - saltybot_social/gesture_node.py — ROS2 node; round-robin multi-camera _FrameBuffer, lazy MediaPipe init, person-ID correlation via PersonState - saltybot_social/test/test_gesture_classifier.py — 70 unit tests, all passing - saltybot_social/config/gesture_params.yaml — tuned defaults for Orin Nano - saltybot_social/launch/gesture.launch.py — all params overridable at launch Modified: - saltybot_social_msgs/CMakeLists.txt — register Gesture + GestureArray msgs - saltybot_social/setup.py — add gesture_node console_scripts entry point Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
0f42e701e9
commit
ce6d5ee249
@ -0,0 +1,31 @@
|
|||||||
|
gesture_node:
|
||||||
|
ros__parameters:
|
||||||
|
# Number of active CSI camera streams to subscribe to
|
||||||
|
n_cameras: 4
|
||||||
|
|
||||||
|
# Target processing rate in Hz. At 10 Hz with 4 cameras, each camera
|
||||||
|
# contributes frames at approximately 2.5 Hz (round-robin scheduling).
|
||||||
|
process_fps: 10.0
|
||||||
|
|
||||||
|
# Minimum MediaPipe detection confidence to publish a gesture
|
||||||
|
min_confidence: 0.60
|
||||||
|
|
||||||
|
# MediaPipe model complexity:
|
||||||
|
# 0 = lite (fastest, ≈ 13 ms/frame on Orin Nano — use for production)
|
||||||
|
# 1 = full (more accurate, ≈ 25 ms/frame)
|
||||||
|
# 2 = heavy (highest accuracy, ≈ 50 ms/frame, requires large VRAM)
|
||||||
|
model_complexity: 0
|
||||||
|
|
||||||
|
# Maximum hands detected per frame
|
||||||
|
max_hands: 2
|
||||||
|
|
||||||
|
# Enable/disable inference modules
|
||||||
|
pose_enabled: true
|
||||||
|
hands_enabled: true
|
||||||
|
|
||||||
|
# Comma-separated gesture allowlist. Empty string = all gestures enabled.
|
||||||
|
# Example: "wave,stop_palm,thumbs_up,arms_up"
|
||||||
|
enabled_gestures: ""
|
||||||
|
|
||||||
|
# Camera name list (must match saltybot_cameras topic naming)
|
||||||
|
camera_names: "front,left,rear,right"
|
||||||
67
jetson/ros2_ws/src/saltybot_social/launch/gesture.launch.py
Normal file
67
jetson/ros2_ws/src/saltybot_social/launch/gesture.launch.py
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
"""gesture.launch.py — Launch gesture_node for multi-camera gesture recognition (Issue #140)."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch_ros.substitutions import FindPackageShare
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description() -> LaunchDescription:
|
||||||
|
pkg = FindPackageShare("saltybot_social")
|
||||||
|
params_file = PathJoinSubstitution([pkg, "config", "gesture_params.yaml"])
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"params_file",
|
||||||
|
default_value=params_file,
|
||||||
|
description="Path to gesture_node parameter YAML",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"process_fps",
|
||||||
|
default_value="10.0",
|
||||||
|
description="Target gesture detection rate (Hz)",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"model_complexity",
|
||||||
|
default_value="0",
|
||||||
|
description="MediaPipe model complexity: 0=lite, 1=full, 2=heavy",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"pose_enabled",
|
||||||
|
default_value="true",
|
||||||
|
description="Enable MediaPipe Pose body-gesture detection",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"hands_enabled",
|
||||||
|
default_value="true",
|
||||||
|
description="Enable MediaPipe Hands hand-gesture detection",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"enabled_gestures",
|
||||||
|
default_value="",
|
||||||
|
description="Comma-separated gesture allowlist; empty = all",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"min_confidence",
|
||||||
|
default_value="0.60",
|
||||||
|
description="Minimum detection confidence threshold",
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
package="saltybot_social",
|
||||||
|
executable="gesture_node",
|
||||||
|
name="gesture_node",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
LaunchConfiguration("params_file"),
|
||||||
|
{
|
||||||
|
"process_fps": LaunchConfiguration("process_fps"),
|
||||||
|
"model_complexity": LaunchConfiguration("model_complexity"),
|
||||||
|
"pose_enabled": LaunchConfiguration("pose_enabled"),
|
||||||
|
"hands_enabled": LaunchConfiguration("hands_enabled"),
|
||||||
|
"enabled_gestures": LaunchConfiguration("enabled_gestures"),
|
||||||
|
"min_confidence": LaunchConfiguration("min_confidence"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,483 @@
|
|||||||
|
"""gesture_classifier.py — Geometric gesture classification from MediaPipe landmarks.
|
||||||
|
|
||||||
|
Pure Python, no ROS2 / MediaPipe / OpenCV dependencies.
|
||||||
|
Consumes normalised landmark arrays produced by MediaPipe Hands and Pose,
|
||||||
|
classifies them into named gesture types, and tracks temporal gestures
|
||||||
|
(wave, come_here) via per-stream history buffers.
|
||||||
|
|
||||||
|
Usage
|
||||||
|
-----
|
||||||
|
>>> from saltybot_social.gesture_classifier import (
|
||||||
|
... Landmark, classify_hand, classify_pose, WaveDetector
|
||||||
|
... )
|
||||||
|
>>> lm = [Landmark(x, y, z) for x, y, z in raw_coords]
|
||||||
|
>>> result = classify_hand(lm, is_right_hand=True, wave_det=None)
|
||||||
|
>>> result.gesture_type, result.confidence
|
||||||
|
('stop_palm', 0.92)
|
||||||
|
|
||||||
|
Coordinate convention
|
||||||
|
---------------------
|
||||||
|
MediaPipe Hands / Pose use image-normalised coordinates:
|
||||||
|
x: 0.0 = left edge, 1.0 = right edge of frame
|
||||||
|
y: 0.0 = top edge, 1.0 = bottom edge (↓ positive)
|
||||||
|
z: depth (approx), 0 at wrist/hip plane; negative = towards camera
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Deque, List, Optional, Tuple
|
||||||
|
|
||||||
|
# ── Landmark type ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Landmark:
|
||||||
|
x: float
|
||||||
|
y: float
|
||||||
|
z: float = 0.0
|
||||||
|
visibility: float = 1.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Result type ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ClassifiedGesture:
|
||||||
|
gesture_type: str # e.g. "stop_palm", "wave", "arms_up"
|
||||||
|
confidence: float # 0.0–1.0
|
||||||
|
hand_x: float = 0.5 # normalised image x of gesture anchor
|
||||||
|
hand_y: float = 0.5 # normalised image y of gesture anchor
|
||||||
|
is_right_hand: bool = True
|
||||||
|
direction: str = "" # for "point": "left"/"right"/"up"/"forward"
|
||||||
|
source: str = "hand" # "hand" or "body_pose"
|
||||||
|
|
||||||
|
|
||||||
|
# ── MediaPipe Hands landmark indices ─────────────────────────────────────────
|
||||||
|
# Reference: https://developers.google.com/mediapipe/solutions/vision/hand_landmarker
|
||||||
|
|
||||||
|
_WRIST = 0
|
||||||
|
_THUMB_CMC = 1; _THUMB_MCP = 2; _THUMB_IP = 3; _THUMB_TIP = 4
|
||||||
|
_INDEX_MCP = 5; _INDEX_PIP = 6; _INDEX_DIP = 7; _INDEX_TIP = 8
|
||||||
|
_MIDDLE_MCP = 9; _MIDDLE_PIP = 10; _MIDDLE_DIP = 11; _MIDDLE_TIP = 12
|
||||||
|
_RING_MCP = 13; _RING_PIP = 14; _RING_DIP = 15; _RING_TIP = 16
|
||||||
|
_PINKY_MCP = 17; _PINKY_PIP = 18; _PINKY_DIP = 19; _PINKY_TIP = 20
|
||||||
|
|
||||||
|
# ── MediaPipe Pose landmark indices ──────────────────────────────────────────
|
||||||
|
|
||||||
|
_L_SHOULDER = 11; _R_SHOULDER = 12
|
||||||
|
_L_ELBOW = 13; _R_ELBOW = 14
|
||||||
|
_L_WRIST = 15; _R_WRIST = 16
|
||||||
|
_L_HIP = 23; _R_HIP = 24
|
||||||
|
_L_KNEE = 25; _R_KNEE = 26
|
||||||
|
_L_ANKLE = 27; _R_ANKLE = 28
|
||||||
|
|
||||||
|
|
||||||
|
# ── Finger-extension helpers ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _finger_up(lm: List[Landmark], tip: int, pip: int) -> bool:
|
||||||
|
"""Return True if the finger tip is above (smaller y) its PIP joint."""
|
||||||
|
return lm[tip].y < lm[pip].y
|
||||||
|
|
||||||
|
|
||||||
|
def _finger_up_score(lm: List[Landmark], tip: int, pip: int, mcp: int) -> float:
|
||||||
|
"""Return extension score in [0, 1] based on how far tip is above MCP."""
|
||||||
|
spread = lm[mcp].y - lm[tip].y # positive = tip above mcp
|
||||||
|
palm_height = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
|
||||||
|
return max(0.0, min(1.0, spread / palm_height))
|
||||||
|
|
||||||
|
|
||||||
|
def _thumb_up(lm: List[Landmark], is_right: bool) -> bool:
|
||||||
|
"""True when the thumb is extended upward (tip clearly above CMC base)."""
|
||||||
|
return lm[_THUMB_TIP].y < lm[_THUMB_CMC].y - 0.02
|
||||||
|
|
||||||
|
|
||||||
|
def _thumb_down(lm: List[Landmark]) -> bool:
|
||||||
|
"""True when the thumb is pointing downward (tip below the wrist)."""
|
||||||
|
return lm[_THUMB_TIP].y > lm[_WRIST].y + 0.04
|
||||||
|
|
||||||
|
|
||||||
|
def _four_fingers_curled(lm: List[Landmark]) -> bool:
|
||||||
|
"""True when index, middle, ring, and pinky are all NOT extended."""
|
||||||
|
fingers = [
|
||||||
|
(_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP),
|
||||||
|
(_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP),
|
||||||
|
]
|
||||||
|
return not any(_finger_up(lm, t, p) for t, p in fingers)
|
||||||
|
|
||||||
|
|
||||||
|
def _count_fingers_up(lm: List[Landmark]) -> int:
|
||||||
|
"""Count how many of index/middle/ring/pinky are extended."""
|
||||||
|
pairs = [
|
||||||
|
(_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP),
|
||||||
|
(_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP),
|
||||||
|
]
|
||||||
|
return sum(_finger_up(lm, t, p) for t, p in pairs)
|
||||||
|
|
||||||
|
|
||||||
|
def _palm_center(lm: List[Landmark]) -> Tuple[float, float]:
|
||||||
|
"""Return (x, y) centroid of the four MCP knuckles."""
|
||||||
|
xs = [lm[i].x for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)]
|
||||||
|
ys = [lm[i].y for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)]
|
||||||
|
return sum(xs) / 4, sum(ys) / 4
|
||||||
|
|
||||||
|
|
||||||
|
# ── Point direction ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _point_direction(lm: List[Landmark]) -> str:
|
||||||
|
"""Determine the pointing direction from index MCP → TIP vector."""
|
||||||
|
dx = lm[_INDEX_TIP].x - lm[_INDEX_MCP].x
|
||||||
|
dy = lm[_INDEX_TIP].y - lm[_INDEX_MCP].y # positive = downward in image
|
||||||
|
angle_deg = math.degrees(math.atan2(-dy, dx)) # flip y so up=+90
|
||||||
|
# Quantise into 8 compass directions
|
||||||
|
if -22.5 <= angle_deg < 22.5:
|
||||||
|
return "right"
|
||||||
|
elif 22.5 <= angle_deg < 67.5:
|
||||||
|
return "upper_right"
|
||||||
|
elif 67.5 <= angle_deg < 112.5:
|
||||||
|
return "up"
|
||||||
|
elif 112.5 <= angle_deg < 157.5:
|
||||||
|
return "upper_left"
|
||||||
|
elif angle_deg >= 157.5 or angle_deg < -157.5:
|
||||||
|
return "left"
|
||||||
|
elif -157.5 <= angle_deg < -112.5:
|
||||||
|
return "lower_left"
|
||||||
|
elif -112.5 <= angle_deg < -67.5:
|
||||||
|
return "down"
|
||||||
|
else:
|
||||||
|
return "lower_right"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Hand gesture classifiers ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_stop_palm(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
|
||||||
|
"""All five fingers extended — 'stop' hand signal."""
|
||||||
|
n_up = _count_fingers_up(lm)
|
||||||
|
if n_up < 4:
|
||||||
|
return None
|
||||||
|
# Avg extension score for quality measure
|
||||||
|
ext_scores = [
|
||||||
|
_finger_up_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP),
|
||||||
|
_finger_up_score(lm, _MIDDLE_TIP, _MIDDLE_PIP, _MIDDLE_MCP),
|
||||||
|
_finger_up_score(lm, _RING_TIP, _RING_PIP, _RING_MCP),
|
||||||
|
_finger_up_score(lm, _PINKY_TIP, _PINKY_PIP, _PINKY_MCP),
|
||||||
|
]
|
||||||
|
avg_ext = sum(ext_scores) / len(ext_scores)
|
||||||
|
conf = 0.6 + 0.35 * avg_ext + 0.05 * (n_up == 4)
|
||||||
|
cx, cy = _palm_center(lm)
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="stop_palm", confidence=round(conf, 3),
|
||||||
|
hand_x=cx, hand_y=cy
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_thumbs_up(lm: List[Landmark], is_right: bool) -> Optional[ClassifiedGesture]:
|
||||||
|
"""Thumb extended upward, other fingers curled."""
|
||||||
|
if not _thumb_up(lm, is_right):
|
||||||
|
return None
|
||||||
|
if not _four_fingers_curled(lm):
|
||||||
|
return None
|
||||||
|
gap = lm[_THUMB_CMC].y - lm[_THUMB_TIP].y # larger = more vertical
|
||||||
|
palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
|
||||||
|
conf = min(0.95, 0.6 + 0.35 * min(1.0, gap / palm_h))
|
||||||
|
cx, cy = _palm_center(lm)
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="thumbs_up", confidence=round(conf, 3),
|
||||||
|
hand_x=cx, hand_y=cy
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_thumbs_down(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
|
||||||
|
"""Thumb pointing downward, other fingers curled."""
|
||||||
|
if not _thumb_down(lm):
|
||||||
|
return None
|
||||||
|
if not _four_fingers_curled(lm):
|
||||||
|
return None
|
||||||
|
drop = lm[_THUMB_TIP].y - lm[_WRIST].y
|
||||||
|
palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
|
||||||
|
conf = min(0.95, 0.6 + 0.35 * min(1.0, drop / palm_h))
|
||||||
|
cx, cy = _palm_center(lm)
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="thumbs_down", confidence=round(conf, 3),
|
||||||
|
hand_x=cx, hand_y=cy
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_point(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
|
||||||
|
"""Index finger extended, middle/ring/pinky curled."""
|
||||||
|
if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP):
|
||||||
|
return None
|
||||||
|
others_up = sum([
|
||||||
|
_finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP),
|
||||||
|
_finger_up(lm, _RING_TIP, _RING_PIP),
|
||||||
|
_finger_up(lm, _PINKY_TIP, _PINKY_PIP),
|
||||||
|
])
|
||||||
|
if others_up >= 1:
|
||||||
|
return None
|
||||||
|
# Strongly horizontal pointing defers to "follow" gesture
|
||||||
|
if abs(lm[_INDEX_TIP].x - lm[_INDEX_MCP].x) > 0.20:
|
||||||
|
return None
|
||||||
|
ext = _finger_up_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP)
|
||||||
|
conf = 0.65 + 0.30 * ext - 0.10 * others_up
|
||||||
|
direction = _point_direction(lm)
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="point", confidence=round(conf, 3),
|
||||||
|
hand_x=lm[_INDEX_TIP].x, hand_y=lm[_INDEX_TIP].y,
|
||||||
|
direction=direction,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_come_here(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
|
||||||
|
"""Beckoning pose: index MCP raised but index TIP curled toward palm."""
|
||||||
|
# MCP above wrist (hand raised), TIP clearly below MCP (finger curled down)
|
||||||
|
mcp_raised = lm[_INDEX_MCP].y < lm[_WRIST].y - 0.02
|
||||||
|
tip_curled = lm[_INDEX_TIP].y > lm[_INDEX_MCP].y + 0.03
|
||||||
|
if not (mcp_raised and tip_curled):
|
||||||
|
return None
|
||||||
|
# Middle, ring, pinky should be curled too (not fully open)
|
||||||
|
n_others_up = sum([
|
||||||
|
_finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP),
|
||||||
|
_finger_up(lm, _RING_TIP, _RING_PIP),
|
||||||
|
_finger_up(lm, _PINKY_TIP, _PINKY_PIP),
|
||||||
|
])
|
||||||
|
if n_others_up >= 2:
|
||||||
|
return None
|
||||||
|
curl_depth = lm[_INDEX_TIP].y - lm[_INDEX_PIP].y
|
||||||
|
palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01
|
||||||
|
conf = min(0.90, 0.60 + 0.30 * min(1.0, curl_depth / (palm_h * 0.5)))
|
||||||
|
cx, cy = _palm_center(lm)
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="come_here", confidence=round(conf, 3),
|
||||||
|
hand_x=cx, hand_y=cy
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _classify_follow(lm: List[Landmark]) -> Optional[ClassifiedGesture]:
|
||||||
|
"""Follow gesture: index extended horizontally, others curled."""
|
||||||
|
if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP):
|
||||||
|
return None
|
||||||
|
others_up = sum([
|
||||||
|
_finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP),
|
||||||
|
_finger_up(lm, _RING_TIP, _RING_PIP),
|
||||||
|
_finger_up(lm, _PINKY_TIP, _PINKY_PIP),
|
||||||
|
])
|
||||||
|
if others_up >= 2:
|
||||||
|
return None
|
||||||
|
# Horizontal: |dx| > |dy| between index MCP and TIP
|
||||||
|
dx = abs(lm[_INDEX_TIP].x - lm[_INDEX_MCP].x)
|
||||||
|
dy = abs(lm[_INDEX_TIP].y - lm[_INDEX_MCP].y)
|
||||||
|
if dx <= dy:
|
||||||
|
return None
|
||||||
|
conf = 0.65 + 0.25 * min(1.0, dx / max(dy, 0.001) / 3.0)
|
||||||
|
direction = "right" if lm[_INDEX_TIP].x > lm[_INDEX_MCP].x else "left"
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="follow", confidence=round(conf, 3),
|
||||||
|
hand_x=lm[_INDEX_TIP].x, hand_y=lm[_INDEX_TIP].y,
|
||||||
|
direction=direction,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Temporal wave detector ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class WaveDetector:
|
||||||
|
"""Sliding-window wave gesture detector.
|
||||||
|
|
||||||
|
Tracks wrist X-coordinate history and detects lateral oscillation
|
||||||
|
(minimum 2 direction reversals above amplitude threshold).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
history_len: Number of frames to keep (default 24 ≈ 0.8 s at 30 fps)
|
||||||
|
min_reversals: Direction-reversal count required to trigger (default 2)
|
||||||
|
min_amplitude: Minimum peak-to-peak x excursion in normalised coords
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
history_len: int = 24,
|
||||||
|
min_reversals: int = 2,
|
||||||
|
min_amplitude: float = 0.08,
|
||||||
|
) -> None:
|
||||||
|
self._history: Deque[float] = deque(maxlen=history_len)
|
||||||
|
self._min_reversals = min_reversals
|
||||||
|
self._min_amplitude = min_amplitude
|
||||||
|
|
||||||
|
def push(self, wrist_x: float) -> Tuple[bool, float]:
|
||||||
|
"""Add a new wrist-X sample.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(is_waving, confidence) — confidence is 0.0 when not waving.
|
||||||
|
"""
|
||||||
|
self._history.append(wrist_x)
|
||||||
|
if len(self._history) < 6:
|
||||||
|
return False, 0.0
|
||||||
|
return self._detect()
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._history.clear()
|
||||||
|
|
||||||
|
def _detect(self) -> Tuple[bool, float]:
|
||||||
|
samples = list(self._history)
|
||||||
|
mean_x = sum(samples) / len(samples)
|
||||||
|
centered = [x - mean_x for x in samples]
|
||||||
|
amplitude = max(centered) - min(centered)
|
||||||
|
if amplitude < self._min_amplitude:
|
||||||
|
return False, 0.0
|
||||||
|
|
||||||
|
# Count sign changes (direction reversals)
|
||||||
|
reversals = 0
|
||||||
|
for i in range(1, len(centered)):
|
||||||
|
if centered[i - 1] * centered[i] < 0:
|
||||||
|
reversals += 1
|
||||||
|
|
||||||
|
if reversals < self._min_reversals:
|
||||||
|
return False, 0.0
|
||||||
|
|
||||||
|
# Confidence: blend amplitude and reversal quality
|
||||||
|
amp_score = min(1.0, amplitude / 0.30)
|
||||||
|
rev_score = min(1.0, reversals / 6.0)
|
||||||
|
conf = round(0.5 * amp_score + 0.5 * rev_score, 3)
|
||||||
|
return True, conf
|
||||||
|
|
||||||
|
|
||||||
|
# ── Public hand-gesture API ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
# Priority order for static hand-gesture classifiers
|
||||||
|
_HAND_CLASSIFIERS = [
|
||||||
|
("stop_palm", lambda lm, rh: _classify_stop_palm(lm)),
|
||||||
|
("thumbs_up", lambda lm, rh: _classify_thumbs_up(lm, rh)),
|
||||||
|
("thumbs_down", lambda lm, rh: _classify_thumbs_down(lm)),
|
||||||
|
("point", lambda lm, rh: _classify_point(lm)),
|
||||||
|
("come_here", lambda lm, rh: _classify_come_here(lm)),
|
||||||
|
("follow", lambda lm, rh: _classify_follow(lm)),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def classify_hand(
|
||||||
|
landmarks: List[Landmark],
|
||||||
|
is_right_hand: bool = True,
|
||||||
|
wave_det: Optional[WaveDetector] = None,
|
||||||
|
enabled: Optional[set] = None,
|
||||||
|
) -> Optional[ClassifiedGesture]:
|
||||||
|
"""Classify a single hand's 21-landmark list into a named gesture.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
landmarks: 21 Landmark objects from MediaPipe Hands.
|
||||||
|
is_right_hand: Handedness flag (affects thumb direction logic).
|
||||||
|
wave_det: Optional WaveDetector instance for wave tracking.
|
||||||
|
If provided, wave detection is attempted first.
|
||||||
|
enabled: Set of gesture type strings to allow; None = all.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ClassifiedGesture or None if no gesture recognised.
|
||||||
|
"""
|
||||||
|
if len(landmarks) < 21:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _allowed(name: str) -> bool:
|
||||||
|
return enabled is None or name in enabled
|
||||||
|
|
||||||
|
# Wave check (temporal — must come before static classifiers)
|
||||||
|
if wave_det is not None and _allowed("wave"):
|
||||||
|
is_waving, wconf = wave_det.push(landmarks[_WRIST].x)
|
||||||
|
if is_waving:
|
||||||
|
cx, cy = _palm_center(landmarks)
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="wave", confidence=wconf,
|
||||||
|
hand_x=cx, hand_y=cy,
|
||||||
|
is_right_hand=is_right_hand,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Static classifiers — return the first (highest-priority) match
|
||||||
|
for name, classifier in _HAND_CLASSIFIERS:
|
||||||
|
if not _allowed(name):
|
||||||
|
continue
|
||||||
|
result = classifier(landmarks, is_right_hand)
|
||||||
|
if result is not None:
|
||||||
|
result.is_right_hand = is_right_hand
|
||||||
|
return result
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# ── Body-pose gesture classifiers ────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def classify_pose(
|
||||||
|
landmarks: List[Landmark],
|
||||||
|
enabled: Optional[set] = None,
|
||||||
|
) -> Optional[ClassifiedGesture]:
|
||||||
|
"""Classify full-body pose landmarks (33 points) into a body gesture.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
landmarks: 33 Landmark objects from MediaPipe Pose.
|
||||||
|
enabled: Set of allowed gesture types; None = all.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ClassifiedGesture or None.
|
||||||
|
"""
|
||||||
|
if len(landmarks) < 29: # need up to ankle (28)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _ok(name: str) -> bool:
|
||||||
|
return enabled is None or name in enabled
|
||||||
|
|
||||||
|
# ── arms_up: both wrists raised above their shoulders ─────────────────
|
||||||
|
if _ok("arms_up"):
|
||||||
|
l_above = landmarks[_L_WRIST].y < landmarks[_L_SHOULDER].y - 0.05
|
||||||
|
r_above = landmarks[_R_WRIST].y < landmarks[_R_SHOULDER].y - 0.05
|
||||||
|
if l_above and r_above:
|
||||||
|
l_margin = landmarks[_L_SHOULDER].y - landmarks[_L_WRIST].y
|
||||||
|
r_margin = landmarks[_R_SHOULDER].y - landmarks[_R_WRIST].y
|
||||||
|
shoulder_span = abs(landmarks[_L_SHOULDER].y - landmarks[_R_HIP].y) or 0.1
|
||||||
|
conf = min(0.95, 0.60 + 0.35 * min(1.0, (l_margin + r_margin) / (2 * shoulder_span)))
|
||||||
|
mid_x = (landmarks[_L_WRIST].x + landmarks[_R_WRIST].x) / 2
|
||||||
|
mid_y = (landmarks[_L_WRIST].y + landmarks[_R_WRIST].y) / 2
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="arms_up", confidence=round(conf, 3),
|
||||||
|
hand_x=mid_x, hand_y=mid_y, source="body_pose",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── arms_spread: both arms extended laterally ──────────────────────────
|
||||||
|
if _ok("arms_spread"):
|
||||||
|
l_span = abs(landmarks[_L_WRIST].x - landmarks[_L_SHOULDER].x)
|
||||||
|
r_span = abs(landmarks[_R_WRIST].x - landmarks[_R_SHOULDER].x)
|
||||||
|
# Wrists should be at roughly shoulder height
|
||||||
|
l_level = abs(landmarks[_L_WRIST].y - landmarks[_L_SHOULDER].y) < 0.12
|
||||||
|
r_level = abs(landmarks[_R_WRIST].y - landmarks[_R_SHOULDER].y) < 0.12
|
||||||
|
shoulder_w = abs(landmarks[_L_SHOULDER].x - landmarks[_R_SHOULDER].x) or 0.1
|
||||||
|
if l_level and r_level and l_span > shoulder_w * 0.6 and r_span > shoulder_w * 0.6:
|
||||||
|
conf = min(0.90, 0.55 + 0.35 * min(1.0, (l_span + r_span) / (shoulder_w * 2)))
|
||||||
|
mid_x = (landmarks[_L_WRIST].x + landmarks[_R_WRIST].x) / 2
|
||||||
|
mid_y = (landmarks[_L_SHOULDER].y + landmarks[_R_SHOULDER].y) / 2
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="arms_spread", confidence=round(conf, 3),
|
||||||
|
hand_x=mid_x, hand_y=mid_y, source="body_pose",
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── crouch: hips dropped below normal standing threshold ──────────────
|
||||||
|
if _ok("crouch"):
|
||||||
|
l_hip_y = landmarks[_L_HIP].y
|
||||||
|
r_hip_y = landmarks[_R_HIP].y
|
||||||
|
avg_hip_y = (l_hip_y + r_hip_y) / 2
|
||||||
|
# In a crouching person the hips appear lower in frame (larger y).
|
||||||
|
# We use 0.65 as threshold (hips in the lower 65% of frame height).
|
||||||
|
if avg_hip_y > 0.65:
|
||||||
|
# Additional check: knees should be significantly bent
|
||||||
|
l_knee_y = landmarks[_L_KNEE].y if len(landmarks) > _L_KNEE else avg_hip_y
|
||||||
|
r_knee_y = landmarks[_R_KNEE].y if len(landmarks) > _R_KNEE else avg_hip_y
|
||||||
|
knee_drop = ((l_knee_y - l_hip_y) + (r_knee_y - r_hip_y)) / 2
|
||||||
|
conf = min(0.90, 0.55 + 0.35 * min(1.0, (avg_hip_y - 0.65) / 0.20))
|
||||||
|
mid_x = (landmarks[_L_HIP].x + landmarks[_R_HIP].x) / 2
|
||||||
|
return ClassifiedGesture(
|
||||||
|
gesture_type="crouch", confidence=round(conf, 3),
|
||||||
|
hand_x=mid_x, hand_y=avg_hip_y, source="body_pose",
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
||||||
@ -0,0 +1,432 @@
|
|||||||
|
"""gesture_node.py — Multi-camera hand + body-pose gesture recognition (Issue #140).
|
||||||
|
|
||||||
|
Subscribes to images from up to 4 CSI cameras, runs MediaPipe Hands and Pose
|
||||||
|
on each frame, classifies gestures via gesture_classifier.py, and publishes
|
||||||
|
/social/gestures (GestureArray).
|
||||||
|
|
||||||
|
Pipeline
|
||||||
|
--------
|
||||||
|
camera/*/image_raw → cv_bridge → MediaPipe Hands + Pose
|
||||||
|
→ gesture_classifier → correlate with PersonStateArray
|
||||||
|
→ publish /social/gestures
|
||||||
|
|
||||||
|
Multi-camera
|
||||||
|
------------
|
||||||
|
Up to n_cameras (default 4) camera streams are processed.
|
||||||
|
Topic names follow the saltybot_cameras convention:
|
||||||
|
/camera/front/image_raw, /camera/left/image_raw,
|
||||||
|
/camera/rear/image_raw, /camera/right/image_raw
|
||||||
|
|
||||||
|
All cameras share a single round-robin processing queue so that GPU/CPU
|
||||||
|
resources are not over-subscribed. At process_fps=10 Hz, each camera
|
||||||
|
receives approximately process_fps / n_cameras updates per second.
|
||||||
|
|
||||||
|
Person-ID correlation
|
||||||
|
---------------------
|
||||||
|
When a gesture is detected on camera N, the node inspects the cached
|
||||||
|
PersonStateArray. A person with matching camera_id is preferred; if only
|
||||||
|
one person is tracked, they receive the gesture unconditionally.
|
||||||
|
|
||||||
|
ROS2 topics
|
||||||
|
-----------
|
||||||
|
Subscribe:
|
||||||
|
/camera/front/image_raw (sensor_msgs/Image)
|
||||||
|
/camera/left/image_raw (sensor_msgs/Image)
|
||||||
|
/camera/rear/image_raw (sensor_msgs/Image)
|
||||||
|
/camera/right/image_raw (sensor_msgs/Image)
|
||||||
|
/social/persons (saltybot_social_msgs/PersonStateArray)
|
||||||
|
|
||||||
|
Publish:
|
||||||
|
/social/gestures (saltybot_social_msgs/GestureArray)
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
n_cameras int 4 Number of active camera streams
|
||||||
|
process_fps float 10.0 Target processing rate (frames/sec)
|
||||||
|
min_confidence float 0.60 Discard detections below this threshold
|
||||||
|
model_complexity int 0 MediaPipe model (0=lite, 1=full, 2=heavy)
|
||||||
|
max_hands int 2 Max hands per frame (MediaPipe param)
|
||||||
|
pose_enabled bool true Enable MediaPipe Pose body-pose detection
|
||||||
|
hands_enabled bool true Enable MediaPipe Hands
|
||||||
|
enabled_gestures str "" Comma-separated allowlist; empty = all
|
||||||
|
camera_names str "front,left,rear,right" Camera name list
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Dict, List, Optional, Set
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||||||
|
|
||||||
|
from sensor_msgs.msg import Image
|
||||||
|
from saltybot_social_msgs.msg import (
|
||||||
|
GestureArray, Gesture, PersonStateArray,
|
||||||
|
)
|
||||||
|
|
||||||
|
from .gesture_classifier import (
|
||||||
|
Landmark, ClassifiedGesture,
|
||||||
|
classify_hand, classify_pose,
|
||||||
|
WaveDetector,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Optional imports — may not be present in CI / test environments
|
||||||
|
try:
|
||||||
|
import cv2
|
||||||
|
from cv_bridge import CvBridge
|
||||||
|
_HAS_CV = True
|
||||||
|
except ImportError:
|
||||||
|
_HAS_CV = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
import mediapipe as mp
|
||||||
|
_HAS_MP = True
|
||||||
|
except ImportError:
|
||||||
|
_HAS_MP = False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Camera index map ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_CAMERA_NAMES = ["front", "left", "rear", "right"]
|
||||||
|
_CAMERA_ID: Dict[str, int] = {name: i for i, name in enumerate(_CAMERA_NAMES)}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Latest-frame buffer ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class _FrameBuffer:
|
||||||
|
"""Thread-safe single-slot buffer per camera — always keeps newest frame."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._frames: Dict[int, object] = {} # camera_id → Image msg
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def put(self, camera_id: int, msg: object) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._frames[camera_id] = msg
|
||||||
|
|
||||||
|
def drain(self) -> List[tuple]:
|
||||||
|
with self._lock:
|
||||||
|
result = list(self._frames.items())
|
||||||
|
self._frames = {}
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
# ── MediaPipe wrapper ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class _MediaPipeProcessor:
|
||||||
|
"""Wraps MediaPipe Hands + Pose with lazy initialisation."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model_complexity: int,
|
||||||
|
max_hands: int,
|
||||||
|
hands_enabled: bool,
|
||||||
|
pose_enabled: bool,
|
||||||
|
) -> None:
|
||||||
|
self._complexity = model_complexity
|
||||||
|
self._max_hands = max_hands
|
||||||
|
self._hands_enabled = hands_enabled
|
||||||
|
self._pose_enabled = pose_enabled
|
||||||
|
self._hands = None
|
||||||
|
self._pose = None
|
||||||
|
self._ready = False
|
||||||
|
|
||||||
|
def init(self) -> None:
|
||||||
|
if not _HAS_MP:
|
||||||
|
return
|
||||||
|
mp_hands = mp.solutions.hands
|
||||||
|
mp_pose = mp.solutions.pose
|
||||||
|
if self._hands_enabled:
|
||||||
|
self._hands = mp_hands.Hands(
|
||||||
|
static_image_mode=False,
|
||||||
|
max_num_hands=self._max_hands,
|
||||||
|
min_detection_confidence=0.50,
|
||||||
|
min_tracking_confidence=0.50,
|
||||||
|
model_complexity=self._complexity,
|
||||||
|
)
|
||||||
|
if self._pose_enabled:
|
||||||
|
self._pose = mp_pose.Pose(
|
||||||
|
static_image_mode=False,
|
||||||
|
min_detection_confidence=0.50,
|
||||||
|
min_tracking_confidence=0.50,
|
||||||
|
model_complexity=self._complexity,
|
||||||
|
)
|
||||||
|
self._ready = True
|
||||||
|
|
||||||
|
def process(self, bgr_image) -> tuple:
|
||||||
|
"""Returns (hands_results, pose_results) from MediaPipe."""
|
||||||
|
if not self._ready or not _HAS_MP:
|
||||||
|
return None, None
|
||||||
|
try:
|
||||||
|
import numpy as np
|
||||||
|
rgb = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)
|
||||||
|
rgb.flags.writeable = False
|
||||||
|
h_res = self._hands.process(rgb) if self._hands else None
|
||||||
|
p_res = self._pose.process(rgb) if self._pose else None
|
||||||
|
return h_res, p_res
|
||||||
|
except Exception:
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
if self._hands:
|
||||||
|
self._hands.close()
|
||||||
|
if self._pose:
|
||||||
|
self._pose.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Gesture node ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class GestureNode(Node):
|
||||||
|
"""Multi-camera gesture recognition node."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("gesture_node")
|
||||||
|
|
||||||
|
# ── Parameters ──────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("n_cameras", 4)
|
||||||
|
self.declare_parameter("process_fps", 10.0)
|
||||||
|
self.declare_parameter("min_confidence", 0.60)
|
||||||
|
self.declare_parameter("model_complexity", 0)
|
||||||
|
self.declare_parameter("max_hands", 2)
|
||||||
|
self.declare_parameter("pose_enabled", True)
|
||||||
|
self.declare_parameter("hands_enabled", True)
|
||||||
|
self.declare_parameter("enabled_gestures", "")
|
||||||
|
self.declare_parameter("camera_names", "front,left,rear,right")
|
||||||
|
|
||||||
|
self._n_cameras: int = self.get_parameter("n_cameras").value
|
||||||
|
self._process_fps: float = self.get_parameter("process_fps").value
|
||||||
|
self._min_conf: float = self.get_parameter("min_confidence").value
|
||||||
|
self._complexity: int = self.get_parameter("model_complexity").value
|
||||||
|
self._max_hands: int = self.get_parameter("max_hands").value
|
||||||
|
self._pose_enabled: bool = self.get_parameter("pose_enabled").value
|
||||||
|
self._hands_enabled: bool = self.get_parameter("hands_enabled").value
|
||||||
|
_enabled_str: str = self.get_parameter("enabled_gestures").value
|
||||||
|
camera_names_str: str = self.get_parameter("camera_names").value
|
||||||
|
|
||||||
|
self._enabled: Optional[Set[str]] = (
|
||||||
|
set(g.strip() for g in _enabled_str.split(",") if g.strip())
|
||||||
|
if _enabled_str.strip() else None
|
||||||
|
)
|
||||||
|
self._camera_names: List[str] = [
|
||||||
|
n.strip() for n in camera_names_str.split(",")
|
||||||
|
if n.strip()
|
||||||
|
][:self._n_cameras]
|
||||||
|
|
||||||
|
# ── QoS ─────────────────────────────────────────────────────────────
|
||||||
|
img_qos = QoSProfile(
|
||||||
|
reliability=QoSReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=QoSHistoryPolicy.KEEP_LAST,
|
||||||
|
depth=1,
|
||||||
|
)
|
||||||
|
reliable_qos = QoSProfile(depth=10)
|
||||||
|
|
||||||
|
# ── Publishers ───────────────────────────────────────────────────────
|
||||||
|
self._gesture_pub = self.create_publisher(
|
||||||
|
GestureArray, "/social/gestures", reliable_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Subscribers ──────────────────────────────────────────────────────
|
||||||
|
self._buf = _FrameBuffer()
|
||||||
|
self._cam_subs = []
|
||||||
|
for name in self._camera_names:
|
||||||
|
topic = f"/camera/{name}/image_raw"
|
||||||
|
cam_id = _CAMERA_ID.get(name, len(self._cam_subs))
|
||||||
|
sub = self.create_subscription(
|
||||||
|
Image, topic,
|
||||||
|
lambda msg, cid=cam_id: self._buf.put(cid, msg),
|
||||||
|
img_qos,
|
||||||
|
)
|
||||||
|
self._cam_subs.append(sub)
|
||||||
|
self.get_logger().info(f"Subscribed to {topic} (camera_id={cam_id})")
|
||||||
|
|
||||||
|
self._persons: Optional[PersonStateArray] = None
|
||||||
|
self._persons_sub = self.create_subscription(
|
||||||
|
PersonStateArray, "/social/persons",
|
||||||
|
lambda msg: setattr(self, "_persons", msg),
|
||||||
|
reliable_qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Per-camera WaveDetector instances: (camera_id, hand_idx) → det ──
|
||||||
|
self._wave_detectors: Dict[tuple, WaveDetector] = {}
|
||||||
|
|
||||||
|
# ── MediaPipe (initialised in background thread) ─────────────────────
|
||||||
|
self._mp = _MediaPipeProcessor(
|
||||||
|
model_complexity=self._complexity,
|
||||||
|
max_hands=self._max_hands,
|
||||||
|
hands_enabled=self._hands_enabled,
|
||||||
|
pose_enabled=self._pose_enabled,
|
||||||
|
)
|
||||||
|
self._bridge = CvBridge() if _HAS_CV else None
|
||||||
|
self._mp_ready = False
|
||||||
|
|
||||||
|
self._init_thread = threading.Thread(
|
||||||
|
target=self._init_mediapipe, daemon=True
|
||||||
|
)
|
||||||
|
self._init_thread.start()
|
||||||
|
|
||||||
|
# ── Processing timer ─────────────────────────────────────────────────
|
||||||
|
self._timer = self.create_timer(
|
||||||
|
1.0 / self._process_fps, self._process_tick
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"gesture_node starting — cameras={self._camera_names}, "
|
||||||
|
f"fps={self._process_fps}, min_conf={self._min_conf}, "
|
||||||
|
f"complexity={self._complexity}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Initialisation ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _init_mediapipe(self) -> None:
|
||||||
|
if not _HAS_MP:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"mediapipe not installed — gesture_node will publish no gestures. "
|
||||||
|
"Install with: pip install mediapipe"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if not _HAS_CV:
|
||||||
|
self.get_logger().error("opencv-python / cv_bridge not available")
|
||||||
|
return
|
||||||
|
self.get_logger().info("Initialising MediaPipe Hands + Pose…")
|
||||||
|
t0 = time.time()
|
||||||
|
self._mp.init()
|
||||||
|
self._mp_ready = True
|
||||||
|
self.get_logger().info(
|
||||||
|
f"MediaPipe ready ({time.time() - t0:.1f}s) — "
|
||||||
|
f"hands={'on' if self._hands_enabled else 'off'}, "
|
||||||
|
f"pose={'on' if self._pose_enabled else 'off'}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Processing tick ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _process_tick(self) -> None:
|
||||||
|
if not self._mp_ready:
|
||||||
|
return
|
||||||
|
frames = self._buf.drain()
|
||||||
|
if not frames:
|
||||||
|
return
|
||||||
|
|
||||||
|
all_gestures: List[Gesture] = []
|
||||||
|
for cam_id, img_msg in frames:
|
||||||
|
gestures = self._process_frame(cam_id, img_msg)
|
||||||
|
all_gestures.extend(gestures)
|
||||||
|
|
||||||
|
if all_gestures:
|
||||||
|
arr = GestureArray()
|
||||||
|
arr.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
arr.gestures = all_gestures
|
||||||
|
self._gesture_pub.publish(arr)
|
||||||
|
|
||||||
|
def _process_frame(self, cam_id: int, img_msg) -> List[Gesture]:
|
||||||
|
"""Run MediaPipe on one camera frame and return gesture messages."""
|
||||||
|
if self._bridge is None:
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
bgr = self._bridge.imgmsg_to_cv2(img_msg, desired_encoding="bgr8")
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().debug(f"cv_bridge error cam {cam_id}: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
h_results, p_results = self._mp.process(bgr)
|
||||||
|
gestures: List[Gesture] = []
|
||||||
|
|
||||||
|
# ── Hand gestures ─────────────────────────────────────────────────
|
||||||
|
if h_results and h_results.multi_hand_landmarks:
|
||||||
|
for hand_idx, (hand_lm, hand_info) in enumerate(
|
||||||
|
zip(h_results.multi_hand_landmarks,
|
||||||
|
h_results.multi_handedness)
|
||||||
|
):
|
||||||
|
is_right = hand_info.classification[0].label == "Right"
|
||||||
|
lm = [
|
||||||
|
Landmark(p.x, p.y, p.z)
|
||||||
|
for p in hand_lm.landmark
|
||||||
|
]
|
||||||
|
wave_key = (cam_id, hand_idx)
|
||||||
|
wave_det = self._wave_detectors.get(wave_key)
|
||||||
|
if wave_det is None:
|
||||||
|
wave_det = WaveDetector()
|
||||||
|
self._wave_detectors[wave_key] = wave_det
|
||||||
|
|
||||||
|
result = classify_hand(
|
||||||
|
lm, is_right_hand=is_right,
|
||||||
|
wave_det=wave_det,
|
||||||
|
enabled=self._enabled,
|
||||||
|
)
|
||||||
|
if result and result.confidence >= self._min_conf:
|
||||||
|
gestures.append(
|
||||||
|
self._to_msg(result, cam_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Body pose gestures ────────────────────────────────────────────
|
||||||
|
if p_results and p_results.pose_landmarks:
|
||||||
|
lm = [
|
||||||
|
Landmark(p.x, p.y, p.z, p.visibility)
|
||||||
|
for p in p_results.pose_landmarks.landmark
|
||||||
|
]
|
||||||
|
result = classify_pose(lm, enabled=self._enabled)
|
||||||
|
if result and result.confidence >= self._min_conf:
|
||||||
|
gestures.append(self._to_msg(result, cam_id))
|
||||||
|
|
||||||
|
return gestures
|
||||||
|
|
||||||
|
# ── Person correlation ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _resolve_person_id(self, cam_id: int, hand_x: float) -> int:
|
||||||
|
"""Find person_id for a gesture on camera cam_id. Returns -1 if unknown."""
|
||||||
|
persons = self._persons
|
||||||
|
if persons is None or not persons.persons:
|
||||||
|
return -1
|
||||||
|
# Filter persons seen on this camera
|
||||||
|
on_cam = [p for p in persons.persons if p.camera_id == cam_id]
|
||||||
|
if len(on_cam) == 1:
|
||||||
|
return on_cam[0].person_id
|
||||||
|
if len(on_cam) > 1:
|
||||||
|
# Pick closest by horizontal image position (heuristic)
|
||||||
|
return min(on_cam, key=lambda p: abs(p.bearing_deg / 90.0 - hand_x)).person_id
|
||||||
|
# Fall back to any tracked person
|
||||||
|
if persons.persons:
|
||||||
|
return persons.persons[0].person_id
|
||||||
|
return -1
|
||||||
|
|
||||||
|
# ── Message construction ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _to_msg(self, g: ClassifiedGesture, cam_id: int) -> Gesture:
|
||||||
|
msg = Gesture()
|
||||||
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
msg.gesture_type = g.gesture_type
|
||||||
|
msg.confidence = float(g.confidence)
|
||||||
|
msg.camera_id = cam_id
|
||||||
|
msg.hand_x = float(g.hand_x)
|
||||||
|
msg.hand_y = float(g.hand_y)
|
||||||
|
msg.is_right_hand = g.is_right_hand
|
||||||
|
msg.direction = g.direction
|
||||||
|
msg.source = g.source
|
||||||
|
msg.person_id = self._resolve_person_id(cam_id, g.hand_x)
|
||||||
|
return msg
|
||||||
|
|
||||||
|
# ── Cleanup ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
self._mp.close()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = GestureNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
@ -35,6 +35,8 @@ setup(
|
|||||||
'orchestrator_node = saltybot_social.orchestrator_node:main',
|
'orchestrator_node = saltybot_social.orchestrator_node:main',
|
||||||
# Voice command NLU bridge (Issue #137)
|
# Voice command NLU bridge (Issue #137)
|
||||||
'voice_command_node = saltybot_social.voice_command_node:main',
|
'voice_command_node = saltybot_social.voice_command_node:main',
|
||||||
|
# Multi-camera gesture recognition (Issue #140)
|
||||||
|
'gesture_node = saltybot_social.gesture_node:main',
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -0,0 +1,640 @@
|
|||||||
|
"""test_gesture_classifier.py — Unit tests for gesture_classifier (Issue #140).
|
||||||
|
|
||||||
|
Tests cover:
|
||||||
|
- All 7 hand gestures with multiple landmark configurations
|
||||||
|
- All 3 body-pose gestures
|
||||||
|
- WaveDetector temporal detection
|
||||||
|
- Entity extraction (direction for point/follow)
|
||||||
|
- enabled allowlist filtering
|
||||||
|
- Edge cases: too few landmarks, marginal values, near-misses
|
||||||
|
|
||||||
|
No ROS2 or MediaPipe runtime required.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from saltybot_social.gesture_classifier import (
|
||||||
|
Landmark,
|
||||||
|
ClassifiedGesture,
|
||||||
|
WaveDetector,
|
||||||
|
classify_hand,
|
||||||
|
classify_pose,
|
||||||
|
_finger_up,
|
||||||
|
_thumb_up,
|
||||||
|
_thumb_down,
|
||||||
|
_four_fingers_curled,
|
||||||
|
_count_fingers_up,
|
||||||
|
_point_direction,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Landmark builder helpers ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
def _lm(x: float, y: float, z: float = 0.0) -> Landmark:
|
||||||
|
return Landmark(x=x, y=y, z=z)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_hand_landmarks(
|
||||||
|
fingers_up: list = None, # True/False per [index, middle, ring, pinky]
|
||||||
|
thumb_up: bool = False,
|
||||||
|
thumb_down: bool = False,
|
||||||
|
wrist_y: float = 0.8,
|
||||||
|
palm_height: float = 0.2,
|
||||||
|
) -> list:
|
||||||
|
"""
|
||||||
|
Build a synthetic 21-landmark hand with controllable finger states.
|
||||||
|
|
||||||
|
fingers_up: [index, middle, ring, pinky] — True = finger extended
|
||||||
|
thumb_up: thumb tip is above wrist
|
||||||
|
thumb_down: thumb tip is below wrist
|
||||||
|
wrist_y: y-coordinate of the wrist (image-space, 0=top, 1=bottom)
|
||||||
|
palm_height: vertical extent of the palm
|
||||||
|
"""
|
||||||
|
if fingers_up is None:
|
||||||
|
fingers_up = [False, False, False, False]
|
||||||
|
|
||||||
|
lm = [None] * 21
|
||||||
|
|
||||||
|
mcp_y = wrist_y - palm_height * 0.5 # MCP knuckles above wrist
|
||||||
|
pip_y = mcp_y - palm_height * 0.2 # PIP above MCP
|
||||||
|
tip_y_up = pip_y - palm_height * 0.3 # TIP above PIP (extended)
|
||||||
|
tip_y_dn = pip_y + palm_height * 0.1 # TIP below PIP (curled)
|
||||||
|
|
||||||
|
# Wrist
|
||||||
|
lm[0] = _lm(0.5, wrist_y)
|
||||||
|
|
||||||
|
# Thumb (CMC → MCP → IP → TIP)
|
||||||
|
lm[1] = _lm(0.45, wrist_y - palm_height * 0.05) # CMC
|
||||||
|
lm[2] = _lm(0.42, wrist_y - palm_height * 0.15) # MCP
|
||||||
|
lm[3] = _lm(0.38, wrist_y - palm_height * 0.25) # IP
|
||||||
|
if thumb_up:
|
||||||
|
lm[4] = _lm(0.35, wrist_y - palm_height * 0.50) # TIP high up
|
||||||
|
elif thumb_down:
|
||||||
|
lm[4] = _lm(0.48, wrist_y + 0.06) # TIP below wrist
|
||||||
|
else:
|
||||||
|
lm[4] = _lm(0.40, wrist_y - palm_height * 0.10) # tucked
|
||||||
|
|
||||||
|
# Finger MCP, PIP, DIP, TIP — four fingers evenly spaced across x
|
||||||
|
finger_x = [0.45, 0.50, 0.55, 0.60]
|
||||||
|
finger_offsets = [ # (mcp, pip, dip, tip) indices
|
||||||
|
(5, 6, 7, 8),
|
||||||
|
(9, 10, 11, 12),
|
||||||
|
(13, 14, 15, 16),
|
||||||
|
(17, 18, 19, 20),
|
||||||
|
]
|
||||||
|
for fi, (mcp_i, pip_i, dip_i, tip_i) in enumerate(finger_offsets):
|
||||||
|
x = finger_x[fi]
|
||||||
|
lm[mcp_i] = _lm(x, mcp_y)
|
||||||
|
lm[pip_i] = _lm(x, pip_y)
|
||||||
|
lm[dip_i] = _lm(x, (pip_y + tip_y_up) / 2 if fingers_up[fi] else pip_y - 0.01)
|
||||||
|
lm[tip_i] = _lm(x, tip_y_up if fingers_up[fi] else tip_y_dn)
|
||||||
|
|
||||||
|
return lm
|
||||||
|
|
||||||
|
|
||||||
|
def _make_pose_landmarks(
|
||||||
|
arms_up: bool = False,
|
||||||
|
arms_spread: bool = False,
|
||||||
|
crouching: bool = False,
|
||||||
|
shoulder_y: float = 0.30,
|
||||||
|
) -> list:
|
||||||
|
"""Build synthetic 33-landmark body pose."""
|
||||||
|
lm = [_lm(0.5, 0.5)] * 33
|
||||||
|
|
||||||
|
hip_y = 0.55 if not crouching else 0.75
|
||||||
|
knee_y = hip_y + 0.15
|
||||||
|
|
||||||
|
# Shoulders
|
||||||
|
lm[11] = _lm(0.35, shoulder_y) # L shoulder
|
||||||
|
lm[12] = _lm(0.65, shoulder_y) # R shoulder
|
||||||
|
|
||||||
|
if arms_up:
|
||||||
|
lm[15] = _lm(0.30, shoulder_y - 0.20) # L wrist above L shoulder
|
||||||
|
lm[16] = _lm(0.70, shoulder_y - 0.20) # R wrist above R shoulder
|
||||||
|
elif arms_spread:
|
||||||
|
lm[13] = _lm(0.20, shoulder_y + 0.05) # L elbow out
|
||||||
|
lm[14] = _lm(0.80, shoulder_y + 0.05) # R elbow out
|
||||||
|
lm[15] = _lm(0.10, shoulder_y + 0.02) # L wrist far left
|
||||||
|
lm[16] = _lm(0.90, shoulder_y + 0.02) # R wrist far right
|
||||||
|
else:
|
||||||
|
lm[15] = _lm(0.35, shoulder_y + 0.20) # L wrist at side
|
||||||
|
lm[16] = _lm(0.65, shoulder_y + 0.20) # R wrist at side
|
||||||
|
|
||||||
|
lm[23] = _lm(0.40, hip_y) # L hip
|
||||||
|
lm[24] = _lm(0.60, hip_y) # R hip
|
||||||
|
lm[25] = _lm(0.40, knee_y) # L knee
|
||||||
|
lm[26] = _lm(0.60, knee_y) # R knee
|
||||||
|
lm[27] = _lm(0.40, knee_y + 0.15) # L ankle
|
||||||
|
lm[28] = _lm(0.60, knee_y + 0.15) # R ankle
|
||||||
|
|
||||||
|
return lm
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helper tests ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestFingerHelpers:
|
||||||
|
def test_finger_up_when_tip_above_pip(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
assert _finger_up(lm, 8, 6) is True # index tip above pip
|
||||||
|
|
||||||
|
def test_finger_down_when_tip_below_pip(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[False, False, False, False])
|
||||||
|
assert _finger_up(lm, 8, 6) is False
|
||||||
|
|
||||||
|
def test_count_fingers_all_up(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
assert _count_fingers_up(lm) == 4
|
||||||
|
|
||||||
|
def test_count_fingers_none_up(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[False, False, False, False])
|
||||||
|
assert _count_fingers_up(lm) == 0
|
||||||
|
|
||||||
|
def test_count_fingers_two_up(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, False, False])
|
||||||
|
assert _count_fingers_up(lm) == 2
|
||||||
|
|
||||||
|
def test_four_fingers_curled_all_down(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[False, False, False, False])
|
||||||
|
assert _four_fingers_curled(lm) is True
|
||||||
|
|
||||||
|
def test_four_fingers_not_curled_one_up(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
assert _four_fingers_curled(lm) is False
|
||||||
|
|
||||||
|
def test_thumb_up_when_tip_high(self):
|
||||||
|
lm = _make_hand_landmarks(thumb_up=True)
|
||||||
|
assert _thumb_up(lm, True) is True
|
||||||
|
|
||||||
|
def test_thumb_not_up_when_tucked(self):
|
||||||
|
lm = _make_hand_landmarks(thumb_up=False)
|
||||||
|
assert _thumb_up(lm, True) is False
|
||||||
|
|
||||||
|
def test_thumb_down_when_tip_below_wrist(self):
|
||||||
|
lm = _make_hand_landmarks(thumb_down=True)
|
||||||
|
assert _thumb_down(lm) is True
|
||||||
|
|
||||||
|
def test_thumb_not_down_when_tucked(self):
|
||||||
|
lm = _make_hand_landmarks(thumb_up=False, thumb_down=False)
|
||||||
|
assert _thumb_down(lm) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Hand gesture: stop_palm ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestStopPalm:
|
||||||
|
def test_all_fingers_up(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "stop_palm"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert 0.60 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_three_fingers_does_not_trigger(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, False])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is None or r.gesture_type != "stop_palm"
|
||||||
|
|
||||||
|
def test_source_is_hand(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r.source == "hand"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Hand gesture: thumbs_up ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestThumbsUp:
|
||||||
|
def test_basic(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False], thumb_up=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "thumbs_up"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False], thumb_up=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert 0.60 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_thumb_up_with_fingers_open_not_thumbs_up(self):
|
||||||
|
# If fingers are also open → stop_palm wins in priority
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[True, True, True, True], thumb_up=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r.gesture_type == "stop_palm"
|
||||||
|
|
||||||
|
def test_right_hand_flag(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False], thumb_up=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm, is_right_hand=True)
|
||||||
|
assert r.is_right_hand is True
|
||||||
|
|
||||||
|
def test_left_hand_flag(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False], thumb_up=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm, is_right_hand=False)
|
||||||
|
assert r.is_right_hand is False
|
||||||
|
|
||||||
|
|
||||||
|
# ── Hand gesture: thumbs_down ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestThumbsDown:
|
||||||
|
def test_basic(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False], thumb_down=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "thumbs_down"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False], thumb_down=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert 0.60 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_fingers_open_prevents_thumbs_down(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[True, True, True, True], thumb_down=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
# stop_palm takes priority
|
||||||
|
assert r is None or r.gesture_type != "thumbs_down"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Hand gesture: point ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestPoint:
|
||||||
|
def test_index_only_up(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "point"
|
||||||
|
|
||||||
|
def test_direction_right(self):
|
||||||
|
"""Index tip to the right of MCP → direction='right'."""
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
# Override index tip to point right (TIP above PIP, dx=0.15 ≤ 0.20 threshold)
|
||||||
|
lm[8] = _lm(lm[5].x + 0.15, lm[6].y - 0.02)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "point"
|
||||||
|
assert r.direction == "right"
|
||||||
|
|
||||||
|
def test_direction_up(self):
|
||||||
|
"""Index tip significantly above MCP → direction='up'."""
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
lm[8] = _lm(lm[5].x, lm[5].y - 0.20) # far above MCP
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.direction == "up"
|
||||||
|
|
||||||
|
def test_two_fingers_up_not_point(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, False, False])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
# With 2 fingers up, point classifier should reject
|
||||||
|
assert r is None or r.gesture_type != "point"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert 0.60 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Hand gesture: come_here ───────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestComeHere:
|
||||||
|
def _make_beckoning_hand(self) -> list:
|
||||||
|
"""Index MCP raised but TIP curled below PIP."""
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False],
|
||||||
|
wrist_y=0.85,
|
||||||
|
palm_height=0.25,
|
||||||
|
)
|
||||||
|
# Index MCP should be above wrist
|
||||||
|
assert lm[5].y < lm[0].y # sanity: MCP.y < WRIST.y (above in image)
|
||||||
|
# Force TIP to be BELOW MCP (clearly curled — satisfies MCP+0.03 threshold)
|
||||||
|
lm[8] = _lm(lm[5].x, lm[5].y + 0.06) # tip 0.06 below MCP
|
||||||
|
return lm
|
||||||
|
|
||||||
|
def test_beckoning_detected(self):
|
||||||
|
lm = self._make_beckoning_hand()
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "come_here"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = self._make_beckoning_hand()
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert 0.60 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_open_hand_not_come_here(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is None or r.gesture_type != "come_here"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Hand gesture: follow ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestFollow:
|
||||||
|
def _make_follow_hand(self, direction: str = "right") -> list:
|
||||||
|
"""Index pointing horizontally, others curled."""
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
# Move index TIP far to the right/left of MCP (dx=0.25 > 0.20 threshold → follow)
|
||||||
|
# TIP above PIP so _finger_up returns True; dy=0.06 keeps dx >> dy
|
||||||
|
if direction == "right":
|
||||||
|
lm[8] = _lm(lm[5].x + 0.25, lm[6].y - 0.02)
|
||||||
|
else:
|
||||||
|
lm[8] = _lm(lm[5].x - 0.25, lm[6].y - 0.02)
|
||||||
|
return lm
|
||||||
|
|
||||||
|
def test_follow_right(self):
|
||||||
|
lm = self._make_follow_hand("right")
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "follow"
|
||||||
|
assert r.direction == "right"
|
||||||
|
|
||||||
|
def test_follow_left(self):
|
||||||
|
lm = self._make_follow_hand("left")
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "follow"
|
||||||
|
assert r.direction == "left"
|
||||||
|
|
||||||
|
def test_vertical_point_not_follow(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
# TIP straight up — dy >> dx
|
||||||
|
lm[8] = _lm(lm[5].x, lm[5].y - 0.30)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
# Should be "point", not "follow"
|
||||||
|
assert r is None or r.gesture_type == "point"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = self._make_follow_hand()
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert 0.60 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── WaveDetector ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestWaveDetector:
|
||||||
|
def test_not_waving_insufficient_data(self):
|
||||||
|
det = WaveDetector()
|
||||||
|
for x in [0.4, 0.5]:
|
||||||
|
waving, conf = det.push(x)
|
||||||
|
assert waving is False
|
||||||
|
assert conf == 0.0
|
||||||
|
|
||||||
|
def test_wave_oscillation_detected(self):
|
||||||
|
det = WaveDetector(history_len=20, min_reversals=2, min_amplitude=0.06)
|
||||||
|
xs = [0.3, 0.5, 0.3, 0.5, 0.3, 0.5, 0.3, 0.5, 0.3, 0.5]
|
||||||
|
for x in xs:
|
||||||
|
waving, conf = det.push(x)
|
||||||
|
assert waving is True
|
||||||
|
assert conf > 0.0
|
||||||
|
|
||||||
|
def test_small_amplitude_not_wave(self):
|
||||||
|
det = WaveDetector(min_amplitude=0.10)
|
||||||
|
# Tiny oscillation — amplitude < threshold
|
||||||
|
xs = [0.50, 0.51, 0.50, 0.51, 0.50, 0.51, 0.50, 0.51, 0.50, 0.51]
|
||||||
|
for x in xs:
|
||||||
|
waving, conf = det.push(x)
|
||||||
|
assert waving is False
|
||||||
|
|
||||||
|
def test_confidence_increases_with_larger_amplitude(self):
|
||||||
|
det_small = WaveDetector()
|
||||||
|
det_large = WaveDetector()
|
||||||
|
for x in [0.40, 0.60, 0.40, 0.60, 0.40, 0.60, 0.40, 0.60]:
|
||||||
|
_, c_small = det_small.push(x)
|
||||||
|
for x in [0.20, 0.80, 0.20, 0.80, 0.20, 0.80, 0.20, 0.80]:
|
||||||
|
_, c_large = det_large.push(x)
|
||||||
|
assert c_large >= c_small
|
||||||
|
|
||||||
|
def test_reset_clears_history(self):
|
||||||
|
det = WaveDetector()
|
||||||
|
for x in [0.3, 0.7, 0.3, 0.7, 0.3, 0.7, 0.3, 0.7, 0.3, 0.7]:
|
||||||
|
det.push(x)
|
||||||
|
det.reset()
|
||||||
|
waving, conf = det.push(0.5)
|
||||||
|
assert waving is False
|
||||||
|
|
||||||
|
def test_wave_via_classify_hand(self):
|
||||||
|
"""Wave detector triggered through classify_hand API."""
|
||||||
|
det = WaveDetector(min_reversals=2, min_amplitude=0.06)
|
||||||
|
xs = [0.3, 0.6, 0.3, 0.6, 0.3, 0.6, 0.3, 0.6, 0.3, 0.6]
|
||||||
|
result = None
|
||||||
|
for x in xs:
|
||||||
|
lm = _make_hand_landmarks()
|
||||||
|
lm[0] = _lm(x, lm[0].y) # move wrist X
|
||||||
|
result = classify_hand(lm, wave_det=det)
|
||||||
|
assert result is not None
|
||||||
|
assert result.gesture_type == "wave"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Body-pose: arms_up ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestArmsUp:
|
||||||
|
def test_both_arms_raised(self):
|
||||||
|
lm = _make_pose_landmarks(arms_up=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "arms_up"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = _make_pose_landmarks(arms_up=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert 0.60 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_arms_at_side_not_arms_up(self):
|
||||||
|
lm = _make_pose_landmarks(arms_up=False, arms_spread=False)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r is None or r.gesture_type != "arms_up"
|
||||||
|
|
||||||
|
def test_source_is_body_pose(self):
|
||||||
|
lm = _make_pose_landmarks(arms_up=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r.source == "body_pose"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Body-pose: arms_spread ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestArmsSpread:
|
||||||
|
def test_arms_extended_laterally(self):
|
||||||
|
lm = _make_pose_landmarks(arms_spread=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type in ("arms_spread", "arms_up")
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = _make_pose_landmarks(arms_spread=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
if r and r.gesture_type == "arms_spread":
|
||||||
|
assert 0.55 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_arms_at_rest_not_spread(self):
|
||||||
|
lm = _make_pose_landmarks()
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r is None or r.gesture_type not in ("arms_spread", "arms_up")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Body-pose: crouch ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestCrouch:
|
||||||
|
def test_crouching_detected(self):
|
||||||
|
lm = _make_pose_landmarks(crouching=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "crouch"
|
||||||
|
|
||||||
|
def test_standing_not_crouch(self):
|
||||||
|
lm = _make_pose_landmarks(crouching=False)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r is None or r.gesture_type != "crouch"
|
||||||
|
|
||||||
|
def test_confidence_range(self):
|
||||||
|
lm = _make_pose_landmarks(crouching=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert 0.55 <= r.confidence <= 1.0
|
||||||
|
|
||||||
|
def test_source_is_body_pose(self):
|
||||||
|
lm = _make_pose_landmarks(crouching=True)
|
||||||
|
r = classify_pose(lm)
|
||||||
|
assert r.source == "body_pose"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Enabled allowlist filtering ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnabledFilter:
|
||||||
|
def test_blocked_gesture_returns_none(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm, enabled={"thumbs_up"})
|
||||||
|
assert r is None
|
||||||
|
|
||||||
|
def test_allowed_gesture_passes(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm, enabled={"stop_palm"})
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "stop_palm"
|
||||||
|
|
||||||
|
def test_none_enabled_means_all(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm, enabled=None)
|
||||||
|
assert r is not None
|
||||||
|
|
||||||
|
def test_pose_filter_arms_up_blocked(self):
|
||||||
|
lm = _make_pose_landmarks(arms_up=True)
|
||||||
|
r = classify_pose(lm, enabled={"crouch"})
|
||||||
|
assert r is None
|
||||||
|
|
||||||
|
def test_pose_filter_arms_up_allowed(self):
|
||||||
|
lm = _make_pose_landmarks(arms_up=True)
|
||||||
|
r = classify_pose(lm, enabled={"arms_up"})
|
||||||
|
assert r is not None
|
||||||
|
assert r.gesture_type == "arms_up"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Edge cases ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestEdgeCases:
|
||||||
|
def test_empty_landmarks_returns_none(self):
|
||||||
|
assert classify_hand([]) is None
|
||||||
|
|
||||||
|
def test_too_few_landmarks_returns_none(self):
|
||||||
|
lm = [_lm(0.5, 0.5)] * 10 # only 10, need 21
|
||||||
|
assert classify_hand(lm) is None
|
||||||
|
|
||||||
|
def test_too_few_pose_landmarks_returns_none(self):
|
||||||
|
lm = [_lm(0.5, 0.5)] * 10
|
||||||
|
assert classify_pose(lm) is None
|
||||||
|
|
||||||
|
def test_no_wave_detector_no_wave(self):
|
||||||
|
"""Without a WaveDetector, wave is never returned."""
|
||||||
|
lm = _make_hand_landmarks()
|
||||||
|
r = classify_hand(lm, wave_det=None)
|
||||||
|
assert r is None or r.gesture_type != "wave"
|
||||||
|
|
||||||
|
def test_all_fingers_down_thumb_neutral_returns_none(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False],
|
||||||
|
thumb_up=False, thumb_down=False,
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is None
|
||||||
|
|
||||||
|
def test_hand_x_y_populated(self):
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, True, True, True])
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r is not None
|
||||||
|
assert 0.0 <= r.hand_x <= 1.0
|
||||||
|
assert 0.0 <= r.hand_y <= 1.0
|
||||||
|
|
||||||
|
def test_is_right_hand_default_true(self):
|
||||||
|
lm = _make_hand_landmarks(
|
||||||
|
fingers_up=[False, False, False, False], thumb_up=True
|
||||||
|
)
|
||||||
|
r = classify_hand(lm)
|
||||||
|
assert r.is_right_hand is True
|
||||||
|
|
||||||
|
def test_wave_confidence_zero_for_no_wave(self):
|
||||||
|
det = WaveDetector()
|
||||||
|
# Stationary wrist — no wave
|
||||||
|
for _ in range(24):
|
||||||
|
waving, conf = det.push(0.5)
|
||||||
|
assert waving is False
|
||||||
|
assert conf == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Point direction edge cases ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestPointDirection:
|
||||||
|
def _hand_pointing(self, dx: float, dy: float) -> list:
|
||||||
|
lm = _make_hand_landmarks(fingers_up=[True, False, False, False])
|
||||||
|
# Set index MCP and TIP to produce the desired vector
|
||||||
|
lm[5] = _lm(0.5, 0.5)
|
||||||
|
lm[8] = _lm(0.5 + dx, 0.5 + dy)
|
||||||
|
return lm
|
||||||
|
|
||||||
|
def test_pointing_right(self):
|
||||||
|
lm = self._hand_pointing(dx=0.2, dy=0.0)
|
||||||
|
assert _point_direction(lm) == "right"
|
||||||
|
|
||||||
|
def test_pointing_left(self):
|
||||||
|
lm = self._hand_pointing(dx=-0.2, dy=0.0)
|
||||||
|
assert _point_direction(lm) == "left"
|
||||||
|
|
||||||
|
def test_pointing_up(self):
|
||||||
|
lm = self._hand_pointing(dx=0.0, dy=-0.2)
|
||||||
|
assert _point_direction(lm) == "up"
|
||||||
|
|
||||||
|
def test_pointing_down(self):
|
||||||
|
lm = self._hand_pointing(dx=0.0, dy=0.2)
|
||||||
|
assert _point_direction(lm) == "down"
|
||||||
|
|
||||||
|
def test_pointing_upper_right(self):
|
||||||
|
lm = self._hand_pointing(dx=0.15, dy=-0.15)
|
||||||
|
assert _point_direction(lm) == "upper_right"
|
||||||
@ -35,6 +35,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
|||||||
"msg/ConversationResponse.msg"
|
"msg/ConversationResponse.msg"
|
||||||
# Issue #137 — voice command NLU
|
# Issue #137 — voice command NLU
|
||||||
"msg/VoiceCommand.msg"
|
"msg/VoiceCommand.msg"
|
||||||
|
# Issue #140 — gesture recognition
|
||||||
|
"msg/Gesture.msg"
|
||||||
|
"msg/GestureArray.msg"
|
||||||
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
31
jetson/ros2_ws/src/saltybot_social_msgs/msg/Gesture.msg
Normal file
31
jetson/ros2_ws/src/saltybot_social_msgs/msg/Gesture.msg
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
# Gesture.msg — Single detected gesture from hand or body-pose recognition (Issue #140).
|
||||||
|
# Published as part of GestureArray on /social/gestures.
|
||||||
|
#
|
||||||
|
# gesture_type values:
|
||||||
|
# Hand gestures (MediaPipe Hands):
|
||||||
|
# wave — lateral wrist oscillation (temporal)
|
||||||
|
# point — index extended, others curled
|
||||||
|
# stop_palm — all fingers extended, palm forward
|
||||||
|
# thumbs_up — thumb up, fist closed
|
||||||
|
# thumbs_down — thumb down, fist closed
|
||||||
|
# come_here — beckoning: index curled toward palm (temporal)
|
||||||
|
# follow — index extended horizontally
|
||||||
|
# Body-pose gestures (MediaPipe Pose):
|
||||||
|
# arms_up — both wrists above shoulders (stop / emergency)
|
||||||
|
# crouch — hips below normal standing threshold (come closer)
|
||||||
|
# arms_spread — arms extended laterally (spread out / stop)
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
string gesture_type # gesture identifier (see above)
|
||||||
|
int32 person_id # from PersonStateArray (-1 = unidentified)
|
||||||
|
float32 confidence # detection confidence [0.0, 1.0]
|
||||||
|
int32 camera_id # source camera (0=front, 1=left, 2=rear, 3=right)
|
||||||
|
|
||||||
|
# Normalised image position of the detected gesture anchor [0.0, 1.0]
|
||||||
|
float32 hand_x
|
||||||
|
float32 hand_y
|
||||||
|
|
||||||
|
bool is_right_hand # true = right hand; false = left hand (hand gestures only)
|
||||||
|
string direction # for "point": "left"/"right"/"up"/"forward"/"down"
|
||||||
|
string source # "hand" or "body_pose"
|
||||||
@ -0,0 +1,6 @@
|
|||||||
|
# GestureArray.msg — All gestures detected in a single processing tick (Issue #140).
|
||||||
|
# Published by gesture_node on /social/gestures.
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
Gesture[] gestures
|
||||||
Loading…
x
Reference in New Issue
Block a user