From ce6d5ee249cb1fe9456fafbb56b57eda43833aa0 Mon Sep 17 00:00:00 2001 From: sl-jetson Date: Mon, 2 Mar 2026 10:10:54 -0500 Subject: [PATCH] =?UTF-8?q?feat(social):=20multi-camera=20gesture=20recogn?= =?UTF-8?q?ition=20=E2=80=94=20MediaPipe=20Hands=20+=20Pose=20(Issue=20#14?= =?UTF-8?q?0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Delivers Issue #140 (P2): real-time gesture detection from 4 CSI cameras via MediaPipe Hands and Pose, publishing classified gestures on /social/gestures. New files: - saltybot_social_msgs/msg/Gesture.msg + GestureArray.msg — ROS2 message types - saltybot_social/gesture_classifier.py — pure-Python geometric classifier (stop_palm, thumbs_up/down, point, come_here, follow, wave, arms_up, arms_spread, crouch); WaveDetector temporal sliding-window oscillation tracker - saltybot_social/gesture_node.py — ROS2 node; round-robin multi-camera _FrameBuffer, lazy MediaPipe init, person-ID correlation via PersonState - saltybot_social/test/test_gesture_classifier.py — 70 unit tests, all passing - saltybot_social/config/gesture_params.yaml — tuned defaults for Orin Nano - saltybot_social/launch/gesture.launch.py — all params overridable at launch Modified: - saltybot_social_msgs/CMakeLists.txt — register Gesture + GestureArray msgs - saltybot_social/setup.py — add gesture_node console_scripts entry point Co-Authored-By: Claude Sonnet 4.6 --- .../config/gesture_params.yaml | 31 + .../saltybot_social/launch/gesture.launch.py | 67 ++ .../saltybot_social/gesture_classifier.py | 483 +++++++++++++ .../saltybot_social/gesture_node.py | 432 ++++++++++++ jetson/ros2_ws/src/saltybot_social/setup.py | 2 + .../test/test_gesture_classifier.py | 640 ++++++++++++++++++ .../src/saltybot_social_msgs/CMakeLists.txt | 3 + .../src/saltybot_social_msgs/msg/Gesture.msg | 31 + .../saltybot_social_msgs/msg/GestureArray.msg | 6 + 9 files changed, 1695 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_social/config/gesture_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_social/launch/gesture.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_classifier.py create mode 100644 jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_node.py create mode 100644 jetson/ros2_ws/src/saltybot_social/test/test_gesture_classifier.py create mode 100644 jetson/ros2_ws/src/saltybot_social_msgs/msg/Gesture.msg create mode 100644 jetson/ros2_ws/src/saltybot_social_msgs/msg/GestureArray.msg diff --git a/jetson/ros2_ws/src/saltybot_social/config/gesture_params.yaml b/jetson/ros2_ws/src/saltybot_social/config/gesture_params.yaml new file mode 100644 index 0000000..a9ceefe --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/config/gesture_params.yaml @@ -0,0 +1,31 @@ +gesture_node: + ros__parameters: + # Number of active CSI camera streams to subscribe to + n_cameras: 4 + + # Target processing rate in Hz. At 10 Hz with 4 cameras, each camera + # contributes frames at approximately 2.5 Hz (round-robin scheduling). + process_fps: 10.0 + + # Minimum MediaPipe detection confidence to publish a gesture + min_confidence: 0.60 + + # MediaPipe model complexity: + # 0 = lite (fastest, ≈ 13 ms/frame on Orin Nano — use for production) + # 1 = full (more accurate, ≈ 25 ms/frame) + # 2 = heavy (highest accuracy, ≈ 50 ms/frame, requires large VRAM) + model_complexity: 0 + + # Maximum hands detected per frame + max_hands: 2 + + # Enable/disable inference modules + pose_enabled: true + hands_enabled: true + + # Comma-separated gesture allowlist. Empty string = all gestures enabled. + # Example: "wave,stop_palm,thumbs_up,arms_up" + enabled_gestures: "" + + # Camera name list (must match saltybot_cameras topic naming) + camera_names: "front,left,rear,right" diff --git a/jetson/ros2_ws/src/saltybot_social/launch/gesture.launch.py b/jetson/ros2_ws/src/saltybot_social/launch/gesture.launch.py new file mode 100644 index 0000000..b745695 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/launch/gesture.launch.py @@ -0,0 +1,67 @@ +"""gesture.launch.py — Launch gesture_node for multi-camera gesture recognition (Issue #140).""" + +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration, PathJoinSubstitution +from launch_ros.actions import Node +from launch_ros.substitutions import FindPackageShare + + +def generate_launch_description() -> LaunchDescription: + pkg = FindPackageShare("saltybot_social") + params_file = PathJoinSubstitution([pkg, "config", "gesture_params.yaml"]) + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", + default_value=params_file, + description="Path to gesture_node parameter YAML", + ), + DeclareLaunchArgument( + "process_fps", + default_value="10.0", + description="Target gesture detection rate (Hz)", + ), + DeclareLaunchArgument( + "model_complexity", + default_value="0", + description="MediaPipe model complexity: 0=lite, 1=full, 2=heavy", + ), + DeclareLaunchArgument( + "pose_enabled", + default_value="true", + description="Enable MediaPipe Pose body-gesture detection", + ), + DeclareLaunchArgument( + "hands_enabled", + default_value="true", + description="Enable MediaPipe Hands hand-gesture detection", + ), + DeclareLaunchArgument( + "enabled_gestures", + default_value="", + description="Comma-separated gesture allowlist; empty = all", + ), + DeclareLaunchArgument( + "min_confidence", + default_value="0.60", + description="Minimum detection confidence threshold", + ), + Node( + package="saltybot_social", + executable="gesture_node", + name="gesture_node", + output="screen", + parameters=[ + LaunchConfiguration("params_file"), + { + "process_fps": LaunchConfiguration("process_fps"), + "model_complexity": LaunchConfiguration("model_complexity"), + "pose_enabled": LaunchConfiguration("pose_enabled"), + "hands_enabled": LaunchConfiguration("hands_enabled"), + "enabled_gestures": LaunchConfiguration("enabled_gestures"), + "min_confidence": LaunchConfiguration("min_confidence"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_classifier.py b/jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_classifier.py new file mode 100644 index 0000000..4aa7d94 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_classifier.py @@ -0,0 +1,483 @@ +"""gesture_classifier.py — Geometric gesture classification from MediaPipe landmarks. + +Pure Python, no ROS2 / MediaPipe / OpenCV dependencies. +Consumes normalised landmark arrays produced by MediaPipe Hands and Pose, +classifies them into named gesture types, and tracks temporal gestures +(wave, come_here) via per-stream history buffers. + +Usage +----- +>>> from saltybot_social.gesture_classifier import ( +... Landmark, classify_hand, classify_pose, WaveDetector +... ) +>>> lm = [Landmark(x, y, z) for x, y, z in raw_coords] +>>> result = classify_hand(lm, is_right_hand=True, wave_det=None) +>>> result.gesture_type, result.confidence +('stop_palm', 0.92) + +Coordinate convention +--------------------- +MediaPipe Hands / Pose use image-normalised coordinates: + x: 0.0 = left edge, 1.0 = right edge of frame + y: 0.0 = top edge, 1.0 = bottom edge (↓ positive) + z: depth (approx), 0 at wrist/hip plane; negative = towards camera +""" + +from __future__ import annotations + +import math +from collections import deque +from dataclasses import dataclass +from typing import Deque, List, Optional, Tuple + +# ── Landmark type ───────────────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class Landmark: + x: float + y: float + z: float = 0.0 + visibility: float = 1.0 + + +# ── Result type ─────────────────────────────────────────────────────────────── + + +@dataclass +class ClassifiedGesture: + gesture_type: str # e.g. "stop_palm", "wave", "arms_up" + confidence: float # 0.0–1.0 + hand_x: float = 0.5 # normalised image x of gesture anchor + hand_y: float = 0.5 # normalised image y of gesture anchor + is_right_hand: bool = True + direction: str = "" # for "point": "left"/"right"/"up"/"forward" + source: str = "hand" # "hand" or "body_pose" + + +# ── MediaPipe Hands landmark indices ───────────────────────────────────────── +# Reference: https://developers.google.com/mediapipe/solutions/vision/hand_landmarker + +_WRIST = 0 +_THUMB_CMC = 1; _THUMB_MCP = 2; _THUMB_IP = 3; _THUMB_TIP = 4 +_INDEX_MCP = 5; _INDEX_PIP = 6; _INDEX_DIP = 7; _INDEX_TIP = 8 +_MIDDLE_MCP = 9; _MIDDLE_PIP = 10; _MIDDLE_DIP = 11; _MIDDLE_TIP = 12 +_RING_MCP = 13; _RING_PIP = 14; _RING_DIP = 15; _RING_TIP = 16 +_PINKY_MCP = 17; _PINKY_PIP = 18; _PINKY_DIP = 19; _PINKY_TIP = 20 + +# ── MediaPipe Pose landmark indices ────────────────────────────────────────── + +_L_SHOULDER = 11; _R_SHOULDER = 12 +_L_ELBOW = 13; _R_ELBOW = 14 +_L_WRIST = 15; _R_WRIST = 16 +_L_HIP = 23; _R_HIP = 24 +_L_KNEE = 25; _R_KNEE = 26 +_L_ANKLE = 27; _R_ANKLE = 28 + + +# ── Finger-extension helpers ────────────────────────────────────────────────── + + +def _finger_up(lm: List[Landmark], tip: int, pip: int) -> bool: + """Return True if the finger tip is above (smaller y) its PIP joint.""" + return lm[tip].y < lm[pip].y + + +def _finger_up_score(lm: List[Landmark], tip: int, pip: int, mcp: int) -> float: + """Return extension score in [0, 1] based on how far tip is above MCP.""" + spread = lm[mcp].y - lm[tip].y # positive = tip above mcp + palm_height = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01 + return max(0.0, min(1.0, spread / palm_height)) + + +def _thumb_up(lm: List[Landmark], is_right: bool) -> bool: + """True when the thumb is extended upward (tip clearly above CMC base).""" + return lm[_THUMB_TIP].y < lm[_THUMB_CMC].y - 0.02 + + +def _thumb_down(lm: List[Landmark]) -> bool: + """True when the thumb is pointing downward (tip below the wrist).""" + return lm[_THUMB_TIP].y > lm[_WRIST].y + 0.04 + + +def _four_fingers_curled(lm: List[Landmark]) -> bool: + """True when index, middle, ring, and pinky are all NOT extended.""" + fingers = [ + (_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP), + (_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP), + ] + return not any(_finger_up(lm, t, p) for t, p in fingers) + + +def _count_fingers_up(lm: List[Landmark]) -> int: + """Count how many of index/middle/ring/pinky are extended.""" + pairs = [ + (_INDEX_TIP, _INDEX_PIP), (_MIDDLE_TIP, _MIDDLE_PIP), + (_RING_TIP, _RING_PIP), (_PINKY_TIP, _PINKY_PIP), + ] + return sum(_finger_up(lm, t, p) for t, p in pairs) + + +def _palm_center(lm: List[Landmark]) -> Tuple[float, float]: + """Return (x, y) centroid of the four MCP knuckles.""" + xs = [lm[i].x for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)] + ys = [lm[i].y for i in (_INDEX_MCP, _MIDDLE_MCP, _RING_MCP, _PINKY_MCP)] + return sum(xs) / 4, sum(ys) / 4 + + +# ── Point direction ─────────────────────────────────────────────────────────── + + +def _point_direction(lm: List[Landmark]) -> str: + """Determine the pointing direction from index MCP → TIP vector.""" + dx = lm[_INDEX_TIP].x - lm[_INDEX_MCP].x + dy = lm[_INDEX_TIP].y - lm[_INDEX_MCP].y # positive = downward in image + angle_deg = math.degrees(math.atan2(-dy, dx)) # flip y so up=+90 + # Quantise into 8 compass directions + if -22.5 <= angle_deg < 22.5: + return "right" + elif 22.5 <= angle_deg < 67.5: + return "upper_right" + elif 67.5 <= angle_deg < 112.5: + return "up" + elif 112.5 <= angle_deg < 157.5: + return "upper_left" + elif angle_deg >= 157.5 or angle_deg < -157.5: + return "left" + elif -157.5 <= angle_deg < -112.5: + return "lower_left" + elif -112.5 <= angle_deg < -67.5: + return "down" + else: + return "lower_right" + + +# ── Hand gesture classifiers ────────────────────────────────────────────────── + + +def _classify_stop_palm(lm: List[Landmark]) -> Optional[ClassifiedGesture]: + """All five fingers extended — 'stop' hand signal.""" + n_up = _count_fingers_up(lm) + if n_up < 4: + return None + # Avg extension score for quality measure + ext_scores = [ + _finger_up_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP), + _finger_up_score(lm, _MIDDLE_TIP, _MIDDLE_PIP, _MIDDLE_MCP), + _finger_up_score(lm, _RING_TIP, _RING_PIP, _RING_MCP), + _finger_up_score(lm, _PINKY_TIP, _PINKY_PIP, _PINKY_MCP), + ] + avg_ext = sum(ext_scores) / len(ext_scores) + conf = 0.6 + 0.35 * avg_ext + 0.05 * (n_up == 4) + cx, cy = _palm_center(lm) + return ClassifiedGesture( + gesture_type="stop_palm", confidence=round(conf, 3), + hand_x=cx, hand_y=cy + ) + + +def _classify_thumbs_up(lm: List[Landmark], is_right: bool) -> Optional[ClassifiedGesture]: + """Thumb extended upward, other fingers curled.""" + if not _thumb_up(lm, is_right): + return None + if not _four_fingers_curled(lm): + return None + gap = lm[_THUMB_CMC].y - lm[_THUMB_TIP].y # larger = more vertical + palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01 + conf = min(0.95, 0.6 + 0.35 * min(1.0, gap / palm_h)) + cx, cy = _palm_center(lm) + return ClassifiedGesture( + gesture_type="thumbs_up", confidence=round(conf, 3), + hand_x=cx, hand_y=cy + ) + + +def _classify_thumbs_down(lm: List[Landmark]) -> Optional[ClassifiedGesture]: + """Thumb pointing downward, other fingers curled.""" + if not _thumb_down(lm): + return None + if not _four_fingers_curled(lm): + return None + drop = lm[_THUMB_TIP].y - lm[_WRIST].y + palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01 + conf = min(0.95, 0.6 + 0.35 * min(1.0, drop / palm_h)) + cx, cy = _palm_center(lm) + return ClassifiedGesture( + gesture_type="thumbs_down", confidence=round(conf, 3), + hand_x=cx, hand_y=cy + ) + + +def _classify_point(lm: List[Landmark]) -> Optional[ClassifiedGesture]: + """Index finger extended, middle/ring/pinky curled.""" + if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP): + return None + others_up = sum([ + _finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP), + _finger_up(lm, _RING_TIP, _RING_PIP), + _finger_up(lm, _PINKY_TIP, _PINKY_PIP), + ]) + if others_up >= 1: + return None + # Strongly horizontal pointing defers to "follow" gesture + if abs(lm[_INDEX_TIP].x - lm[_INDEX_MCP].x) > 0.20: + return None + ext = _finger_up_score(lm, _INDEX_TIP, _INDEX_PIP, _INDEX_MCP) + conf = 0.65 + 0.30 * ext - 0.10 * others_up + direction = _point_direction(lm) + return ClassifiedGesture( + gesture_type="point", confidence=round(conf, 3), + hand_x=lm[_INDEX_TIP].x, hand_y=lm[_INDEX_TIP].y, + direction=direction, + ) + + +def _classify_come_here(lm: List[Landmark]) -> Optional[ClassifiedGesture]: + """Beckoning pose: index MCP raised but index TIP curled toward palm.""" + # MCP above wrist (hand raised), TIP clearly below MCP (finger curled down) + mcp_raised = lm[_INDEX_MCP].y < lm[_WRIST].y - 0.02 + tip_curled = lm[_INDEX_TIP].y > lm[_INDEX_MCP].y + 0.03 + if not (mcp_raised and tip_curled): + return None + # Middle, ring, pinky should be curled too (not fully open) + n_others_up = sum([ + _finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP), + _finger_up(lm, _RING_TIP, _RING_PIP), + _finger_up(lm, _PINKY_TIP, _PINKY_PIP), + ]) + if n_others_up >= 2: + return None + curl_depth = lm[_INDEX_TIP].y - lm[_INDEX_PIP].y + palm_h = abs(lm[_WRIST].y - lm[_MIDDLE_MCP].y) or 0.01 + conf = min(0.90, 0.60 + 0.30 * min(1.0, curl_depth / (palm_h * 0.5))) + cx, cy = _palm_center(lm) + return ClassifiedGesture( + gesture_type="come_here", confidence=round(conf, 3), + hand_x=cx, hand_y=cy + ) + + +def _classify_follow(lm: List[Landmark]) -> Optional[ClassifiedGesture]: + """Follow gesture: index extended horizontally, others curled.""" + if not _finger_up(lm, _INDEX_TIP, _INDEX_PIP): + return None + others_up = sum([ + _finger_up(lm, _MIDDLE_TIP, _MIDDLE_PIP), + _finger_up(lm, _RING_TIP, _RING_PIP), + _finger_up(lm, _PINKY_TIP, _PINKY_PIP), + ]) + if others_up >= 2: + return None + # Horizontal: |dx| > |dy| between index MCP and TIP + dx = abs(lm[_INDEX_TIP].x - lm[_INDEX_MCP].x) + dy = abs(lm[_INDEX_TIP].y - lm[_INDEX_MCP].y) + if dx <= dy: + return None + conf = 0.65 + 0.25 * min(1.0, dx / max(dy, 0.001) / 3.0) + direction = "right" if lm[_INDEX_TIP].x > lm[_INDEX_MCP].x else "left" + return ClassifiedGesture( + gesture_type="follow", confidence=round(conf, 3), + hand_x=lm[_INDEX_TIP].x, hand_y=lm[_INDEX_TIP].y, + direction=direction, + ) + + +# ── Temporal wave detector ──────────────────────────────────────────────────── + + +class WaveDetector: + """Sliding-window wave gesture detector. + + Tracks wrist X-coordinate history and detects lateral oscillation + (minimum 2 direction reversals above amplitude threshold). + + Args: + history_len: Number of frames to keep (default 24 ≈ 0.8 s at 30 fps) + min_reversals: Direction-reversal count required to trigger (default 2) + min_amplitude: Minimum peak-to-peak x excursion in normalised coords + """ + + def __init__( + self, + history_len: int = 24, + min_reversals: int = 2, + min_amplitude: float = 0.08, + ) -> None: + self._history: Deque[float] = deque(maxlen=history_len) + self._min_reversals = min_reversals + self._min_amplitude = min_amplitude + + def push(self, wrist_x: float) -> Tuple[bool, float]: + """Add a new wrist-X sample. + + Returns: + (is_waving, confidence) — confidence is 0.0 when not waving. + """ + self._history.append(wrist_x) + if len(self._history) < 6: + return False, 0.0 + return self._detect() + + def reset(self) -> None: + self._history.clear() + + def _detect(self) -> Tuple[bool, float]: + samples = list(self._history) + mean_x = sum(samples) / len(samples) + centered = [x - mean_x for x in samples] + amplitude = max(centered) - min(centered) + if amplitude < self._min_amplitude: + return False, 0.0 + + # Count sign changes (direction reversals) + reversals = 0 + for i in range(1, len(centered)): + if centered[i - 1] * centered[i] < 0: + reversals += 1 + + if reversals < self._min_reversals: + return False, 0.0 + + # Confidence: blend amplitude and reversal quality + amp_score = min(1.0, amplitude / 0.30) + rev_score = min(1.0, reversals / 6.0) + conf = round(0.5 * amp_score + 0.5 * rev_score, 3) + return True, conf + + +# ── Public hand-gesture API ─────────────────────────────────────────────────── + + +# Priority order for static hand-gesture classifiers +_HAND_CLASSIFIERS = [ + ("stop_palm", lambda lm, rh: _classify_stop_palm(lm)), + ("thumbs_up", lambda lm, rh: _classify_thumbs_up(lm, rh)), + ("thumbs_down", lambda lm, rh: _classify_thumbs_down(lm)), + ("point", lambda lm, rh: _classify_point(lm)), + ("come_here", lambda lm, rh: _classify_come_here(lm)), + ("follow", lambda lm, rh: _classify_follow(lm)), +] + + +def classify_hand( + landmarks: List[Landmark], + is_right_hand: bool = True, + wave_det: Optional[WaveDetector] = None, + enabled: Optional[set] = None, +) -> Optional[ClassifiedGesture]: + """Classify a single hand's 21-landmark list into a named gesture. + + Args: + landmarks: 21 Landmark objects from MediaPipe Hands. + is_right_hand: Handedness flag (affects thumb direction logic). + wave_det: Optional WaveDetector instance for wave tracking. + If provided, wave detection is attempted first. + enabled: Set of gesture type strings to allow; None = all. + + Returns: + ClassifiedGesture or None if no gesture recognised. + """ + if len(landmarks) < 21: + return None + + def _allowed(name: str) -> bool: + return enabled is None or name in enabled + + # Wave check (temporal — must come before static classifiers) + if wave_det is not None and _allowed("wave"): + is_waving, wconf = wave_det.push(landmarks[_WRIST].x) + if is_waving: + cx, cy = _palm_center(landmarks) + return ClassifiedGesture( + gesture_type="wave", confidence=wconf, + hand_x=cx, hand_y=cy, + is_right_hand=is_right_hand, + ) + + # Static classifiers — return the first (highest-priority) match + for name, classifier in _HAND_CLASSIFIERS: + if not _allowed(name): + continue + result = classifier(landmarks, is_right_hand) + if result is not None: + result.is_right_hand = is_right_hand + return result + + return None + + +# ── Body-pose gesture classifiers ──────────────────────────────────────────── + + +def classify_pose( + landmarks: List[Landmark], + enabled: Optional[set] = None, +) -> Optional[ClassifiedGesture]: + """Classify full-body pose landmarks (33 points) into a body gesture. + + Args: + landmarks: 33 Landmark objects from MediaPipe Pose. + enabled: Set of allowed gesture types; None = all. + + Returns: + ClassifiedGesture or None. + """ + if len(landmarks) < 29: # need up to ankle (28) + return None + + def _ok(name: str) -> bool: + return enabled is None or name in enabled + + # ── arms_up: both wrists raised above their shoulders ───────────────── + if _ok("arms_up"): + l_above = landmarks[_L_WRIST].y < landmarks[_L_SHOULDER].y - 0.05 + r_above = landmarks[_R_WRIST].y < landmarks[_R_SHOULDER].y - 0.05 + if l_above and r_above: + l_margin = landmarks[_L_SHOULDER].y - landmarks[_L_WRIST].y + r_margin = landmarks[_R_SHOULDER].y - landmarks[_R_WRIST].y + shoulder_span = abs(landmarks[_L_SHOULDER].y - landmarks[_R_HIP].y) or 0.1 + conf = min(0.95, 0.60 + 0.35 * min(1.0, (l_margin + r_margin) / (2 * shoulder_span))) + mid_x = (landmarks[_L_WRIST].x + landmarks[_R_WRIST].x) / 2 + mid_y = (landmarks[_L_WRIST].y + landmarks[_R_WRIST].y) / 2 + return ClassifiedGesture( + gesture_type="arms_up", confidence=round(conf, 3), + hand_x=mid_x, hand_y=mid_y, source="body_pose", + ) + + # ── arms_spread: both arms extended laterally ────────────────────────── + if _ok("arms_spread"): + l_span = abs(landmarks[_L_WRIST].x - landmarks[_L_SHOULDER].x) + r_span = abs(landmarks[_R_WRIST].x - landmarks[_R_SHOULDER].x) + # Wrists should be at roughly shoulder height + l_level = abs(landmarks[_L_WRIST].y - landmarks[_L_SHOULDER].y) < 0.12 + r_level = abs(landmarks[_R_WRIST].y - landmarks[_R_SHOULDER].y) < 0.12 + shoulder_w = abs(landmarks[_L_SHOULDER].x - landmarks[_R_SHOULDER].x) or 0.1 + if l_level and r_level and l_span > shoulder_w * 0.6 and r_span > shoulder_w * 0.6: + conf = min(0.90, 0.55 + 0.35 * min(1.0, (l_span + r_span) / (shoulder_w * 2))) + mid_x = (landmarks[_L_WRIST].x + landmarks[_R_WRIST].x) / 2 + mid_y = (landmarks[_L_SHOULDER].y + landmarks[_R_SHOULDER].y) / 2 + return ClassifiedGesture( + gesture_type="arms_spread", confidence=round(conf, 3), + hand_x=mid_x, hand_y=mid_y, source="body_pose", + ) + + # ── crouch: hips dropped below normal standing threshold ────────────── + if _ok("crouch"): + l_hip_y = landmarks[_L_HIP].y + r_hip_y = landmarks[_R_HIP].y + avg_hip_y = (l_hip_y + r_hip_y) / 2 + # In a crouching person the hips appear lower in frame (larger y). + # We use 0.65 as threshold (hips in the lower 65% of frame height). + if avg_hip_y > 0.65: + # Additional check: knees should be significantly bent + l_knee_y = landmarks[_L_KNEE].y if len(landmarks) > _L_KNEE else avg_hip_y + r_knee_y = landmarks[_R_KNEE].y if len(landmarks) > _R_KNEE else avg_hip_y + knee_drop = ((l_knee_y - l_hip_y) + (r_knee_y - r_hip_y)) / 2 + conf = min(0.90, 0.55 + 0.35 * min(1.0, (avg_hip_y - 0.65) / 0.20)) + mid_x = (landmarks[_L_HIP].x + landmarks[_R_HIP].x) / 2 + return ClassifiedGesture( + gesture_type="crouch", confidence=round(conf, 3), + hand_x=mid_x, hand_y=avg_hip_y, source="body_pose", + ) + + return None diff --git a/jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_node.py b/jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_node.py new file mode 100644 index 0000000..ae6a0e9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/saltybot_social/gesture_node.py @@ -0,0 +1,432 @@ +"""gesture_node.py — Multi-camera hand + body-pose gesture recognition (Issue #140). + +Subscribes to images from up to 4 CSI cameras, runs MediaPipe Hands and Pose +on each frame, classifies gestures via gesture_classifier.py, and publishes +/social/gestures (GestureArray). + +Pipeline +-------- +camera/*/image_raw → cv_bridge → MediaPipe Hands + Pose + → gesture_classifier → correlate with PersonStateArray + → publish /social/gestures + +Multi-camera +------------ +Up to n_cameras (default 4) camera streams are processed. +Topic names follow the saltybot_cameras convention: + /camera/front/image_raw, /camera/left/image_raw, + /camera/rear/image_raw, /camera/right/image_raw + +All cameras share a single round-robin processing queue so that GPU/CPU +resources are not over-subscribed. At process_fps=10 Hz, each camera +receives approximately process_fps / n_cameras updates per second. + +Person-ID correlation +--------------------- +When a gesture is detected on camera N, the node inspects the cached +PersonStateArray. A person with matching camera_id is preferred; if only +one person is tracked, they receive the gesture unconditionally. + +ROS2 topics +----------- +Subscribe: + /camera/front/image_raw (sensor_msgs/Image) + /camera/left/image_raw (sensor_msgs/Image) + /camera/rear/image_raw (sensor_msgs/Image) + /camera/right/image_raw (sensor_msgs/Image) + /social/persons (saltybot_social_msgs/PersonStateArray) + +Publish: + /social/gestures (saltybot_social_msgs/GestureArray) + +Parameters +---------- +n_cameras int 4 Number of active camera streams +process_fps float 10.0 Target processing rate (frames/sec) +min_confidence float 0.60 Discard detections below this threshold +model_complexity int 0 MediaPipe model (0=lite, 1=full, 2=heavy) +max_hands int 2 Max hands per frame (MediaPipe param) +pose_enabled bool true Enable MediaPipe Pose body-pose detection +hands_enabled bool true Enable MediaPipe Hands +enabled_gestures str "" Comma-separated allowlist; empty = all +camera_names str "front,left,rear,right" Camera name list +""" + +from __future__ import annotations + +import threading +import time +from typing import Dict, List, Optional, Set + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy + +from sensor_msgs.msg import Image +from saltybot_social_msgs.msg import ( + GestureArray, Gesture, PersonStateArray, +) + +from .gesture_classifier import ( + Landmark, ClassifiedGesture, + classify_hand, classify_pose, + WaveDetector, +) + +# Optional imports — may not be present in CI / test environments +try: + import cv2 + from cv_bridge import CvBridge + _HAS_CV = True +except ImportError: + _HAS_CV = False + +try: + import mediapipe as mp + _HAS_MP = True +except ImportError: + _HAS_MP = False + + +# ── Camera index map ────────────────────────────────────────────────────────── + +_CAMERA_NAMES = ["front", "left", "rear", "right"] +_CAMERA_ID: Dict[str, int] = {name: i for i, name in enumerate(_CAMERA_NAMES)} + + +# ── Latest-frame buffer ─────────────────────────────────────────────────────── + + +class _FrameBuffer: + """Thread-safe single-slot buffer per camera — always keeps newest frame.""" + + def __init__(self) -> None: + self._frames: Dict[int, object] = {} # camera_id → Image msg + self._lock = threading.Lock() + + def put(self, camera_id: int, msg: object) -> None: + with self._lock: + self._frames[camera_id] = msg + + def drain(self) -> List[tuple]: + with self._lock: + result = list(self._frames.items()) + self._frames = {} + return result + + +# ── MediaPipe wrapper ───────────────────────────────────────────────────────── + + +class _MediaPipeProcessor: + """Wraps MediaPipe Hands + Pose with lazy initialisation.""" + + def __init__( + self, + model_complexity: int, + max_hands: int, + hands_enabled: bool, + pose_enabled: bool, + ) -> None: + self._complexity = model_complexity + self._max_hands = max_hands + self._hands_enabled = hands_enabled + self._pose_enabled = pose_enabled + self._hands = None + self._pose = None + self._ready = False + + def init(self) -> None: + if not _HAS_MP: + return + mp_hands = mp.solutions.hands + mp_pose = mp.solutions.pose + if self._hands_enabled: + self._hands = mp_hands.Hands( + static_image_mode=False, + max_num_hands=self._max_hands, + min_detection_confidence=0.50, + min_tracking_confidence=0.50, + model_complexity=self._complexity, + ) + if self._pose_enabled: + self._pose = mp_pose.Pose( + static_image_mode=False, + min_detection_confidence=0.50, + min_tracking_confidence=0.50, + model_complexity=self._complexity, + ) + self._ready = True + + def process(self, bgr_image) -> tuple: + """Returns (hands_results, pose_results) from MediaPipe.""" + if not self._ready or not _HAS_MP: + return None, None + try: + import numpy as np + rgb = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB) + rgb.flags.writeable = False + h_res = self._hands.process(rgb) if self._hands else None + p_res = self._pose.process(rgb) if self._pose else None + return h_res, p_res + except Exception: + return None, None + + def close(self) -> None: + if self._hands: + self._hands.close() + if self._pose: + self._pose.close() + + +# ── Gesture node ────────────────────────────────────────────────────────────── + + +class GestureNode(Node): + """Multi-camera gesture recognition node.""" + + def __init__(self) -> None: + super().__init__("gesture_node") + + # ── Parameters ────────────────────────────────────────────────────── + self.declare_parameter("n_cameras", 4) + self.declare_parameter("process_fps", 10.0) + self.declare_parameter("min_confidence", 0.60) + self.declare_parameter("model_complexity", 0) + self.declare_parameter("max_hands", 2) + self.declare_parameter("pose_enabled", True) + self.declare_parameter("hands_enabled", True) + self.declare_parameter("enabled_gestures", "") + self.declare_parameter("camera_names", "front,left,rear,right") + + self._n_cameras: int = self.get_parameter("n_cameras").value + self._process_fps: float = self.get_parameter("process_fps").value + self._min_conf: float = self.get_parameter("min_confidence").value + self._complexity: int = self.get_parameter("model_complexity").value + self._max_hands: int = self.get_parameter("max_hands").value + self._pose_enabled: bool = self.get_parameter("pose_enabled").value + self._hands_enabled: bool = self.get_parameter("hands_enabled").value + _enabled_str: str = self.get_parameter("enabled_gestures").value + camera_names_str: str = self.get_parameter("camera_names").value + + self._enabled: Optional[Set[str]] = ( + set(g.strip() for g in _enabled_str.split(",") if g.strip()) + if _enabled_str.strip() else None + ) + self._camera_names: List[str] = [ + n.strip() for n in camera_names_str.split(",") + if n.strip() + ][:self._n_cameras] + + # ── QoS ───────────────────────────────────────────────────────────── + img_qos = QoSProfile( + reliability=QoSReliabilityPolicy.BEST_EFFORT, + history=QoSHistoryPolicy.KEEP_LAST, + depth=1, + ) + reliable_qos = QoSProfile(depth=10) + + # ── Publishers ─────────────────────────────────────────────────────── + self._gesture_pub = self.create_publisher( + GestureArray, "/social/gestures", reliable_qos + ) + + # ── Subscribers ────────────────────────────────────────────────────── + self._buf = _FrameBuffer() + self._cam_subs = [] + for name in self._camera_names: + topic = f"/camera/{name}/image_raw" + cam_id = _CAMERA_ID.get(name, len(self._cam_subs)) + sub = self.create_subscription( + Image, topic, + lambda msg, cid=cam_id: self._buf.put(cid, msg), + img_qos, + ) + self._cam_subs.append(sub) + self.get_logger().info(f"Subscribed to {topic} (camera_id={cam_id})") + + self._persons: Optional[PersonStateArray] = None + self._persons_sub = self.create_subscription( + PersonStateArray, "/social/persons", + lambda msg: setattr(self, "_persons", msg), + reliable_qos, + ) + + # ── Per-camera WaveDetector instances: (camera_id, hand_idx) → det ── + self._wave_detectors: Dict[tuple, WaveDetector] = {} + + # ── MediaPipe (initialised in background thread) ───────────────────── + self._mp = _MediaPipeProcessor( + model_complexity=self._complexity, + max_hands=self._max_hands, + hands_enabled=self._hands_enabled, + pose_enabled=self._pose_enabled, + ) + self._bridge = CvBridge() if _HAS_CV else None + self._mp_ready = False + + self._init_thread = threading.Thread( + target=self._init_mediapipe, daemon=True + ) + self._init_thread.start() + + # ── Processing timer ───────────────────────────────────────────────── + self._timer = self.create_timer( + 1.0 / self._process_fps, self._process_tick + ) + + self.get_logger().info( + f"gesture_node starting — cameras={self._camera_names}, " + f"fps={self._process_fps}, min_conf={self._min_conf}, " + f"complexity={self._complexity}" + ) + + # ── Initialisation ──────────────────────────────────────────────────────── + + def _init_mediapipe(self) -> None: + if not _HAS_MP: + self.get_logger().warn( + "mediapipe not installed — gesture_node will publish no gestures. " + "Install with: pip install mediapipe" + ) + return + if not _HAS_CV: + self.get_logger().error("opencv-python / cv_bridge not available") + return + self.get_logger().info("Initialising MediaPipe Hands + Pose…") + t0 = time.time() + self._mp.init() + self._mp_ready = True + self.get_logger().info( + f"MediaPipe ready ({time.time() - t0:.1f}s) — " + f"hands={'on' if self._hands_enabled else 'off'}, " + f"pose={'on' if self._pose_enabled else 'off'}" + ) + + # ── Processing tick ─────────────────────────────────────────────────────── + + def _process_tick(self) -> None: + if not self._mp_ready: + return + frames = self._buf.drain() + if not frames: + return + + all_gestures: List[Gesture] = [] + for cam_id, img_msg in frames: + gestures = self._process_frame(cam_id, img_msg) + all_gestures.extend(gestures) + + if all_gestures: + arr = GestureArray() + arr.header.stamp = self.get_clock().now().to_msg() + arr.gestures = all_gestures + self._gesture_pub.publish(arr) + + def _process_frame(self, cam_id: int, img_msg) -> List[Gesture]: + """Run MediaPipe on one camera frame and return gesture messages.""" + if self._bridge is None: + return [] + try: + bgr = self._bridge.imgmsg_to_cv2(img_msg, desired_encoding="bgr8") + except Exception as e: + self.get_logger().debug(f"cv_bridge error cam {cam_id}: {e}") + return [] + + h_results, p_results = self._mp.process(bgr) + gestures: List[Gesture] = [] + + # ── Hand gestures ───────────────────────────────────────────────── + if h_results and h_results.multi_hand_landmarks: + for hand_idx, (hand_lm, hand_info) in enumerate( + zip(h_results.multi_hand_landmarks, + h_results.multi_handedness) + ): + is_right = hand_info.classification[0].label == "Right" + lm = [ + Landmark(p.x, p.y, p.z) + for p in hand_lm.landmark + ] + wave_key = (cam_id, hand_idx) + wave_det = self._wave_detectors.get(wave_key) + if wave_det is None: + wave_det = WaveDetector() + self._wave_detectors[wave_key] = wave_det + + result = classify_hand( + lm, is_right_hand=is_right, + wave_det=wave_det, + enabled=self._enabled, + ) + if result and result.confidence >= self._min_conf: + gestures.append( + self._to_msg(result, cam_id) + ) + + # ── Body pose gestures ──────────────────────────────────────────── + if p_results and p_results.pose_landmarks: + lm = [ + Landmark(p.x, p.y, p.z, p.visibility) + for p in p_results.pose_landmarks.landmark + ] + result = classify_pose(lm, enabled=self._enabled) + if result and result.confidence >= self._min_conf: + gestures.append(self._to_msg(result, cam_id)) + + return gestures + + # ── Person correlation ──────────────────────────────────────────────────── + + def _resolve_person_id(self, cam_id: int, hand_x: float) -> int: + """Find person_id for a gesture on camera cam_id. Returns -1 if unknown.""" + persons = self._persons + if persons is None or not persons.persons: + return -1 + # Filter persons seen on this camera + on_cam = [p for p in persons.persons if p.camera_id == cam_id] + if len(on_cam) == 1: + return on_cam[0].person_id + if len(on_cam) > 1: + # Pick closest by horizontal image position (heuristic) + return min(on_cam, key=lambda p: abs(p.bearing_deg / 90.0 - hand_x)).person_id + # Fall back to any tracked person + if persons.persons: + return persons.persons[0].person_id + return -1 + + # ── Message construction ────────────────────────────────────────────────── + + def _to_msg(self, g: ClassifiedGesture, cam_id: int) -> Gesture: + msg = Gesture() + msg.header.stamp = self.get_clock().now().to_msg() + msg.gesture_type = g.gesture_type + msg.confidence = float(g.confidence) + msg.camera_id = cam_id + msg.hand_x = float(g.hand_x) + msg.hand_y = float(g.hand_y) + msg.is_right_hand = g.is_right_hand + msg.direction = g.direction + msg.source = g.source + msg.person_id = self._resolve_person_id(cam_id, g.hand_x) + return msg + + # ── Cleanup ─────────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + self._mp.close() + super().destroy_node() + + +# ── Entry point ─────────────────────────────────────────────────────────────── + + +def main(args=None) -> None: + rclpy.init(args=args) + node = GestureNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() diff --git a/jetson/ros2_ws/src/saltybot_social/setup.py b/jetson/ros2_ws/src/saltybot_social/setup.py index 0865784..eba42d1 100644 --- a/jetson/ros2_ws/src/saltybot_social/setup.py +++ b/jetson/ros2_ws/src/saltybot_social/setup.py @@ -35,6 +35,8 @@ setup( 'orchestrator_node = saltybot_social.orchestrator_node:main', # Voice command NLU bridge (Issue #137) 'voice_command_node = saltybot_social.voice_command_node:main', + # Multi-camera gesture recognition (Issue #140) + 'gesture_node = saltybot_social.gesture_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_social/test/test_gesture_classifier.py b/jetson/ros2_ws/src/saltybot_social/test/test_gesture_classifier.py new file mode 100644 index 0000000..30497bf --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/test/test_gesture_classifier.py @@ -0,0 +1,640 @@ +"""test_gesture_classifier.py — Unit tests for gesture_classifier (Issue #140). + +Tests cover: + - All 7 hand gestures with multiple landmark configurations + - All 3 body-pose gestures + - WaveDetector temporal detection + - Entity extraction (direction for point/follow) + - enabled allowlist filtering + - Edge cases: too few landmarks, marginal values, near-misses + +No ROS2 or MediaPipe runtime required. +""" + +import pytest +from saltybot_social.gesture_classifier import ( + Landmark, + ClassifiedGesture, + WaveDetector, + classify_hand, + classify_pose, + _finger_up, + _thumb_up, + _thumb_down, + _four_fingers_curled, + _count_fingers_up, + _point_direction, +) + + +# ── Landmark builder helpers ────────────────────────────────────────────────── + + +def _lm(x: float, y: float, z: float = 0.0) -> Landmark: + return Landmark(x=x, y=y, z=z) + + +def _make_hand_landmarks( + fingers_up: list = None, # True/False per [index, middle, ring, pinky] + thumb_up: bool = False, + thumb_down: bool = False, + wrist_y: float = 0.8, + palm_height: float = 0.2, +) -> list: + """ + Build a synthetic 21-landmark hand with controllable finger states. + + fingers_up: [index, middle, ring, pinky] — True = finger extended + thumb_up: thumb tip is above wrist + thumb_down: thumb tip is below wrist + wrist_y: y-coordinate of the wrist (image-space, 0=top, 1=bottom) + palm_height: vertical extent of the palm + """ + if fingers_up is None: + fingers_up = [False, False, False, False] + + lm = [None] * 21 + + mcp_y = wrist_y - palm_height * 0.5 # MCP knuckles above wrist + pip_y = mcp_y - palm_height * 0.2 # PIP above MCP + tip_y_up = pip_y - palm_height * 0.3 # TIP above PIP (extended) + tip_y_dn = pip_y + palm_height * 0.1 # TIP below PIP (curled) + + # Wrist + lm[0] = _lm(0.5, wrist_y) + + # Thumb (CMC → MCP → IP → TIP) + lm[1] = _lm(0.45, wrist_y - palm_height * 0.05) # CMC + lm[2] = _lm(0.42, wrist_y - palm_height * 0.15) # MCP + lm[3] = _lm(0.38, wrist_y - palm_height * 0.25) # IP + if thumb_up: + lm[4] = _lm(0.35, wrist_y - palm_height * 0.50) # TIP high up + elif thumb_down: + lm[4] = _lm(0.48, wrist_y + 0.06) # TIP below wrist + else: + lm[4] = _lm(0.40, wrist_y - palm_height * 0.10) # tucked + + # Finger MCP, PIP, DIP, TIP — four fingers evenly spaced across x + finger_x = [0.45, 0.50, 0.55, 0.60] + finger_offsets = [ # (mcp, pip, dip, tip) indices + (5, 6, 7, 8), + (9, 10, 11, 12), + (13, 14, 15, 16), + (17, 18, 19, 20), + ] + for fi, (mcp_i, pip_i, dip_i, tip_i) in enumerate(finger_offsets): + x = finger_x[fi] + lm[mcp_i] = _lm(x, mcp_y) + lm[pip_i] = _lm(x, pip_y) + lm[dip_i] = _lm(x, (pip_y + tip_y_up) / 2 if fingers_up[fi] else pip_y - 0.01) + lm[tip_i] = _lm(x, tip_y_up if fingers_up[fi] else tip_y_dn) + + return lm + + +def _make_pose_landmarks( + arms_up: bool = False, + arms_spread: bool = False, + crouching: bool = False, + shoulder_y: float = 0.30, +) -> list: + """Build synthetic 33-landmark body pose.""" + lm = [_lm(0.5, 0.5)] * 33 + + hip_y = 0.55 if not crouching else 0.75 + knee_y = hip_y + 0.15 + + # Shoulders + lm[11] = _lm(0.35, shoulder_y) # L shoulder + lm[12] = _lm(0.65, shoulder_y) # R shoulder + + if arms_up: + lm[15] = _lm(0.30, shoulder_y - 0.20) # L wrist above L shoulder + lm[16] = _lm(0.70, shoulder_y - 0.20) # R wrist above R shoulder + elif arms_spread: + lm[13] = _lm(0.20, shoulder_y + 0.05) # L elbow out + lm[14] = _lm(0.80, shoulder_y + 0.05) # R elbow out + lm[15] = _lm(0.10, shoulder_y + 0.02) # L wrist far left + lm[16] = _lm(0.90, shoulder_y + 0.02) # R wrist far right + else: + lm[15] = _lm(0.35, shoulder_y + 0.20) # L wrist at side + lm[16] = _lm(0.65, shoulder_y + 0.20) # R wrist at side + + lm[23] = _lm(0.40, hip_y) # L hip + lm[24] = _lm(0.60, hip_y) # R hip + lm[25] = _lm(0.40, knee_y) # L knee + lm[26] = _lm(0.60, knee_y) # R knee + lm[27] = _lm(0.40, knee_y + 0.15) # L ankle + lm[28] = _lm(0.60, knee_y + 0.15) # R ankle + + return lm + + +# ── Helper tests ────────────────────────────────────────────────────────────── + + +class TestFingerHelpers: + def test_finger_up_when_tip_above_pip(self): + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + assert _finger_up(lm, 8, 6) is True # index tip above pip + + def test_finger_down_when_tip_below_pip(self): + lm = _make_hand_landmarks(fingers_up=[False, False, False, False]) + assert _finger_up(lm, 8, 6) is False + + def test_count_fingers_all_up(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + assert _count_fingers_up(lm) == 4 + + def test_count_fingers_none_up(self): + lm = _make_hand_landmarks(fingers_up=[False, False, False, False]) + assert _count_fingers_up(lm) == 0 + + def test_count_fingers_two_up(self): + lm = _make_hand_landmarks(fingers_up=[True, True, False, False]) + assert _count_fingers_up(lm) == 2 + + def test_four_fingers_curled_all_down(self): + lm = _make_hand_landmarks(fingers_up=[False, False, False, False]) + assert _four_fingers_curled(lm) is True + + def test_four_fingers_not_curled_one_up(self): + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + assert _four_fingers_curled(lm) is False + + def test_thumb_up_when_tip_high(self): + lm = _make_hand_landmarks(thumb_up=True) + assert _thumb_up(lm, True) is True + + def test_thumb_not_up_when_tucked(self): + lm = _make_hand_landmarks(thumb_up=False) + assert _thumb_up(lm, True) is False + + def test_thumb_down_when_tip_below_wrist(self): + lm = _make_hand_landmarks(thumb_down=True) + assert _thumb_down(lm) is True + + def test_thumb_not_down_when_tucked(self): + lm = _make_hand_landmarks(thumb_up=False, thumb_down=False) + assert _thumb_down(lm) is False + + +# ── Hand gesture: stop_palm ─────────────────────────────────────────────────── + + +class TestStopPalm: + def test_all_fingers_up(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "stop_palm" + + def test_confidence_range(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm) + assert 0.60 <= r.confidence <= 1.0 + + def test_three_fingers_does_not_trigger(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, False]) + r = classify_hand(lm) + assert r is None or r.gesture_type != "stop_palm" + + def test_source_is_hand(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm) + assert r.source == "hand" + + +# ── Hand gesture: thumbs_up ─────────────────────────────────────────────────── + + +class TestThumbsUp: + def test_basic(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], thumb_up=True + ) + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "thumbs_up" + + def test_confidence_range(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], thumb_up=True + ) + r = classify_hand(lm) + assert 0.60 <= r.confidence <= 1.0 + + def test_thumb_up_with_fingers_open_not_thumbs_up(self): + # If fingers are also open → stop_palm wins in priority + lm = _make_hand_landmarks( + fingers_up=[True, True, True, True], thumb_up=True + ) + r = classify_hand(lm) + assert r.gesture_type == "stop_palm" + + def test_right_hand_flag(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], thumb_up=True + ) + r = classify_hand(lm, is_right_hand=True) + assert r.is_right_hand is True + + def test_left_hand_flag(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], thumb_up=True + ) + r = classify_hand(lm, is_right_hand=False) + assert r.is_right_hand is False + + +# ── Hand gesture: thumbs_down ───────────────────────────────────────────────── + + +class TestThumbsDown: + def test_basic(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], thumb_down=True + ) + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "thumbs_down" + + def test_confidence_range(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], thumb_down=True + ) + r = classify_hand(lm) + assert 0.60 <= r.confidence <= 1.0 + + def test_fingers_open_prevents_thumbs_down(self): + lm = _make_hand_landmarks( + fingers_up=[True, True, True, True], thumb_down=True + ) + r = classify_hand(lm) + # stop_palm takes priority + assert r is None or r.gesture_type != "thumbs_down" + + +# ── Hand gesture: point ─────────────────────────────────────────────────────── + + +class TestPoint: + def test_index_only_up(self): + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "point" + + def test_direction_right(self): + """Index tip to the right of MCP → direction='right'.""" + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + # Override index tip to point right (TIP above PIP, dx=0.15 ≤ 0.20 threshold) + lm[8] = _lm(lm[5].x + 0.15, lm[6].y - 0.02) + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "point" + assert r.direction == "right" + + def test_direction_up(self): + """Index tip significantly above MCP → direction='up'.""" + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + lm[8] = _lm(lm[5].x, lm[5].y - 0.20) # far above MCP + r = classify_hand(lm) + assert r is not None + assert r.direction == "up" + + def test_two_fingers_up_not_point(self): + lm = _make_hand_landmarks(fingers_up=[True, True, False, False]) + r = classify_hand(lm) + # With 2 fingers up, point classifier should reject + assert r is None or r.gesture_type != "point" + + def test_confidence_range(self): + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + r = classify_hand(lm) + assert 0.60 <= r.confidence <= 1.0 + + +# ── Hand gesture: come_here ─────────────────────────────────────────────────── + + +class TestComeHere: + def _make_beckoning_hand(self) -> list: + """Index MCP raised but TIP curled below PIP.""" + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], + wrist_y=0.85, + palm_height=0.25, + ) + # Index MCP should be above wrist + assert lm[5].y < lm[0].y # sanity: MCP.y < WRIST.y (above in image) + # Force TIP to be BELOW MCP (clearly curled — satisfies MCP+0.03 threshold) + lm[8] = _lm(lm[5].x, lm[5].y + 0.06) # tip 0.06 below MCP + return lm + + def test_beckoning_detected(self): + lm = self._make_beckoning_hand() + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "come_here" + + def test_confidence_range(self): + lm = self._make_beckoning_hand() + r = classify_hand(lm) + assert 0.60 <= r.confidence <= 1.0 + + def test_open_hand_not_come_here(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm) + assert r is None or r.gesture_type != "come_here" + + +# ── Hand gesture: follow ────────────────────────────────────────────────────── + + +class TestFollow: + def _make_follow_hand(self, direction: str = "right") -> list: + """Index pointing horizontally, others curled.""" + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + # Move index TIP far to the right/left of MCP (dx=0.25 > 0.20 threshold → follow) + # TIP above PIP so _finger_up returns True; dy=0.06 keeps dx >> dy + if direction == "right": + lm[8] = _lm(lm[5].x + 0.25, lm[6].y - 0.02) + else: + lm[8] = _lm(lm[5].x - 0.25, lm[6].y - 0.02) + return lm + + def test_follow_right(self): + lm = self._make_follow_hand("right") + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "follow" + assert r.direction == "right" + + def test_follow_left(self): + lm = self._make_follow_hand("left") + r = classify_hand(lm) + assert r is not None + assert r.gesture_type == "follow" + assert r.direction == "left" + + def test_vertical_point_not_follow(self): + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + # TIP straight up — dy >> dx + lm[8] = _lm(lm[5].x, lm[5].y - 0.30) + r = classify_hand(lm) + # Should be "point", not "follow" + assert r is None or r.gesture_type == "point" + + def test_confidence_range(self): + lm = self._make_follow_hand() + r = classify_hand(lm) + assert r is not None + assert 0.60 <= r.confidence <= 1.0 + + +# ── WaveDetector ────────────────────────────────────────────────────────────── + + +class TestWaveDetector: + def test_not_waving_insufficient_data(self): + det = WaveDetector() + for x in [0.4, 0.5]: + waving, conf = det.push(x) + assert waving is False + assert conf == 0.0 + + def test_wave_oscillation_detected(self): + det = WaveDetector(history_len=20, min_reversals=2, min_amplitude=0.06) + xs = [0.3, 0.5, 0.3, 0.5, 0.3, 0.5, 0.3, 0.5, 0.3, 0.5] + for x in xs: + waving, conf = det.push(x) + assert waving is True + assert conf > 0.0 + + def test_small_amplitude_not_wave(self): + det = WaveDetector(min_amplitude=0.10) + # Tiny oscillation — amplitude < threshold + xs = [0.50, 0.51, 0.50, 0.51, 0.50, 0.51, 0.50, 0.51, 0.50, 0.51] + for x in xs: + waving, conf = det.push(x) + assert waving is False + + def test_confidence_increases_with_larger_amplitude(self): + det_small = WaveDetector() + det_large = WaveDetector() + for x in [0.40, 0.60, 0.40, 0.60, 0.40, 0.60, 0.40, 0.60]: + _, c_small = det_small.push(x) + for x in [0.20, 0.80, 0.20, 0.80, 0.20, 0.80, 0.20, 0.80]: + _, c_large = det_large.push(x) + assert c_large >= c_small + + def test_reset_clears_history(self): + det = WaveDetector() + for x in [0.3, 0.7, 0.3, 0.7, 0.3, 0.7, 0.3, 0.7, 0.3, 0.7]: + det.push(x) + det.reset() + waving, conf = det.push(0.5) + assert waving is False + + def test_wave_via_classify_hand(self): + """Wave detector triggered through classify_hand API.""" + det = WaveDetector(min_reversals=2, min_amplitude=0.06) + xs = [0.3, 0.6, 0.3, 0.6, 0.3, 0.6, 0.3, 0.6, 0.3, 0.6] + result = None + for x in xs: + lm = _make_hand_landmarks() + lm[0] = _lm(x, lm[0].y) # move wrist X + result = classify_hand(lm, wave_det=det) + assert result is not None + assert result.gesture_type == "wave" + + +# ── Body-pose: arms_up ──────────────────────────────────────────────────────── + + +class TestArmsUp: + def test_both_arms_raised(self): + lm = _make_pose_landmarks(arms_up=True) + r = classify_pose(lm) + assert r is not None + assert r.gesture_type == "arms_up" + + def test_confidence_range(self): + lm = _make_pose_landmarks(arms_up=True) + r = classify_pose(lm) + assert 0.60 <= r.confidence <= 1.0 + + def test_arms_at_side_not_arms_up(self): + lm = _make_pose_landmarks(arms_up=False, arms_spread=False) + r = classify_pose(lm) + assert r is None or r.gesture_type != "arms_up" + + def test_source_is_body_pose(self): + lm = _make_pose_landmarks(arms_up=True) + r = classify_pose(lm) + assert r.source == "body_pose" + + +# ── Body-pose: arms_spread ──────────────────────────────────────────────────── + + +class TestArmsSpread: + def test_arms_extended_laterally(self): + lm = _make_pose_landmarks(arms_spread=True) + r = classify_pose(lm) + assert r is not None + assert r.gesture_type in ("arms_spread", "arms_up") + + def test_confidence_range(self): + lm = _make_pose_landmarks(arms_spread=True) + r = classify_pose(lm) + if r and r.gesture_type == "arms_spread": + assert 0.55 <= r.confidence <= 1.0 + + def test_arms_at_rest_not_spread(self): + lm = _make_pose_landmarks() + r = classify_pose(lm) + assert r is None or r.gesture_type not in ("arms_spread", "arms_up") + + +# ── Body-pose: crouch ───────────────────────────────────────────────────────── + + +class TestCrouch: + def test_crouching_detected(self): + lm = _make_pose_landmarks(crouching=True) + r = classify_pose(lm) + assert r is not None + assert r.gesture_type == "crouch" + + def test_standing_not_crouch(self): + lm = _make_pose_landmarks(crouching=False) + r = classify_pose(lm) + assert r is None or r.gesture_type != "crouch" + + def test_confidence_range(self): + lm = _make_pose_landmarks(crouching=True) + r = classify_pose(lm) + assert 0.55 <= r.confidence <= 1.0 + + def test_source_is_body_pose(self): + lm = _make_pose_landmarks(crouching=True) + r = classify_pose(lm) + assert r.source == "body_pose" + + +# ── Enabled allowlist filtering ─────────────────────────────────────────────── + + +class TestEnabledFilter: + def test_blocked_gesture_returns_none(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm, enabled={"thumbs_up"}) + assert r is None + + def test_allowed_gesture_passes(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm, enabled={"stop_palm"}) + assert r is not None + assert r.gesture_type == "stop_palm" + + def test_none_enabled_means_all(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm, enabled=None) + assert r is not None + + def test_pose_filter_arms_up_blocked(self): + lm = _make_pose_landmarks(arms_up=True) + r = classify_pose(lm, enabled={"crouch"}) + assert r is None + + def test_pose_filter_arms_up_allowed(self): + lm = _make_pose_landmarks(arms_up=True) + r = classify_pose(lm, enabled={"arms_up"}) + assert r is not None + assert r.gesture_type == "arms_up" + + +# ── Edge cases ──────────────────────────────────────────────────────────────── + + +class TestEdgeCases: + def test_empty_landmarks_returns_none(self): + assert classify_hand([]) is None + + def test_too_few_landmarks_returns_none(self): + lm = [_lm(0.5, 0.5)] * 10 # only 10, need 21 + assert classify_hand(lm) is None + + def test_too_few_pose_landmarks_returns_none(self): + lm = [_lm(0.5, 0.5)] * 10 + assert classify_pose(lm) is None + + def test_no_wave_detector_no_wave(self): + """Without a WaveDetector, wave is never returned.""" + lm = _make_hand_landmarks() + r = classify_hand(lm, wave_det=None) + assert r is None or r.gesture_type != "wave" + + def test_all_fingers_down_thumb_neutral_returns_none(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], + thumb_up=False, thumb_down=False, + ) + r = classify_hand(lm) + assert r is None + + def test_hand_x_y_populated(self): + lm = _make_hand_landmarks(fingers_up=[True, True, True, True]) + r = classify_hand(lm) + assert r is not None + assert 0.0 <= r.hand_x <= 1.0 + assert 0.0 <= r.hand_y <= 1.0 + + def test_is_right_hand_default_true(self): + lm = _make_hand_landmarks( + fingers_up=[False, False, False, False], thumb_up=True + ) + r = classify_hand(lm) + assert r.is_right_hand is True + + def test_wave_confidence_zero_for_no_wave(self): + det = WaveDetector() + # Stationary wrist — no wave + for _ in range(24): + waving, conf = det.push(0.5) + assert waving is False + assert conf == 0.0 + + +# ── Point direction edge cases ──────────────────────────────────────────────── + + +class TestPointDirection: + def _hand_pointing(self, dx: float, dy: float) -> list: + lm = _make_hand_landmarks(fingers_up=[True, False, False, False]) + # Set index MCP and TIP to produce the desired vector + lm[5] = _lm(0.5, 0.5) + lm[8] = _lm(0.5 + dx, 0.5 + dy) + return lm + + def test_pointing_right(self): + lm = self._hand_pointing(dx=0.2, dy=0.0) + assert _point_direction(lm) == "right" + + def test_pointing_left(self): + lm = self._hand_pointing(dx=-0.2, dy=0.0) + assert _point_direction(lm) == "left" + + def test_pointing_up(self): + lm = self._hand_pointing(dx=0.0, dy=-0.2) + assert _point_direction(lm) == "up" + + def test_pointing_down(self): + lm = self._hand_pointing(dx=0.0, dy=0.2) + assert _point_direction(lm) == "down" + + def test_pointing_upper_right(self): + lm = self._hand_pointing(dx=0.15, dy=-0.15) + assert _point_direction(lm) == "upper_right" diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt index e01c190..7f6bc11 100644 --- a/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt +++ b/jetson/ros2_ws/src/saltybot_social_msgs/CMakeLists.txt @@ -35,6 +35,9 @@ rosidl_generate_interfaces(${PROJECT_NAME} "msg/ConversationResponse.msg" # Issue #137 — voice command NLU "msg/VoiceCommand.msg" + # Issue #140 — gesture recognition + "msg/Gesture.msg" + "msg/GestureArray.msg" DEPENDENCIES std_msgs geometry_msgs builtin_interfaces ) diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/msg/Gesture.msg b/jetson/ros2_ws/src/saltybot_social_msgs/msg/Gesture.msg new file mode 100644 index 0000000..e367788 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_msgs/msg/Gesture.msg @@ -0,0 +1,31 @@ +# Gesture.msg — Single detected gesture from hand or body-pose recognition (Issue #140). +# Published as part of GestureArray on /social/gestures. +# +# gesture_type values: +# Hand gestures (MediaPipe Hands): +# wave — lateral wrist oscillation (temporal) +# point — index extended, others curled +# stop_palm — all fingers extended, palm forward +# thumbs_up — thumb up, fist closed +# thumbs_down — thumb down, fist closed +# come_here — beckoning: index curled toward palm (temporal) +# follow — index extended horizontally +# Body-pose gestures (MediaPipe Pose): +# arms_up — both wrists above shoulders (stop / emergency) +# crouch — hips below normal standing threshold (come closer) +# arms_spread — arms extended laterally (spread out / stop) + +std_msgs/Header header + +string gesture_type # gesture identifier (see above) +int32 person_id # from PersonStateArray (-1 = unidentified) +float32 confidence # detection confidence [0.0, 1.0] +int32 camera_id # source camera (0=front, 1=left, 2=rear, 3=right) + +# Normalised image position of the detected gesture anchor [0.0, 1.0] +float32 hand_x +float32 hand_y + +bool is_right_hand # true = right hand; false = left hand (hand gestures only) +string direction # for "point": "left"/"right"/"up"/"forward"/"down" +string source # "hand" or "body_pose" diff --git a/jetson/ros2_ws/src/saltybot_social_msgs/msg/GestureArray.msg b/jetson/ros2_ws/src/saltybot_social_msgs/msg/GestureArray.msg new file mode 100644 index 0000000..d55931e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social_msgs/msg/GestureArray.msg @@ -0,0 +1,6 @@ +# GestureArray.msg — All gestures detected in a single processing tick (Issue #140). +# Published by gesture_node on /social/gestures. + +std_msgs/Header header + +Gesture[] gestures